The Nitty Gritty of Advanced Analytics Using Apache Spark in Python Miklos Christine Solutions Architect mwc@databricks.com, @Miklos_C
About Me Miklos Christine Solutions Architect @ Databricks - mwc@databricks.com - Miklos_C@twitter Systems Engineer @ Cloudera Supported a few of the largest clusters in the world Software Engineer @ Cisco UC Berkeley Graduate
We are Databricks, the company behind Spark Founded by the creators of Apache Spark in 2013 Share of Spark code contributed by Databricks in 2014 75% 3 Data Value Created Databricks on top of Spark to make big data simple.
… Apache Spark Engine Spark Core Spark Streaming Spark SQL MLlib GraphX Unified engine across diverse workloads & environments Scale out, fault tolerant Python, Java, Scala, and R APIs Standard libraries
2012 started @ Berkeley 2010 research paper 2013 Databricks started & donated to ASF 2014 Spark 1.0 & libraries (SQL, ML, GraphX) 2015 DataFrames Tungsten ML Pipelines 2016 Spark 2.0
Spark Community Growth • Spark Survey 2015 Highlights • End of Year Spark Highlights
2015: A Great Year for Spark Most active open source project in (big) data • 1000+ code contributors New language: R Widespread industry support & adoption
HOW RESPONDENTS ARE RUNNING SPARK 51% on a public cloud TOP ROLES USING SPARK of respondents identify themselves as Data Engineers 41% of respondents identify themselves as Data Scientists 22%
Spark User Highlights
NOTABLE USERS THAT PRESENTED AT SPARK SUMMIT 2015 SAN FRANCISCO Source: Slide 5 of Spark Community Update
Large-Scale Usage Largest cluster: 8000 Nodes (Tencent) Largest single job: 1 PB (Alibaba, Databricks) Top Streaming Intake: 1 TB/hour (HHMI Janelia Farm) 2014 On-Disk Sort Record Fastest Open Source Engine for sorting a PB
Spark API Performance
History of Spark APIs RDD (2011) DataFrame (2013) Distribute collection of JVM objects Functional Operators (map, filter, etc.) Distribute collection of Row objects Expression-based operations and UDFs Logical plans and optimizer Fast/efficient internal representations DataSet (2015) Internally rows, externally JVM objects Almost the “Best of both worlds”: type safe + fast But slower than DF Not as good for interactive analysis, especially Python
Benefit of Logical Plan: Performance Parity Across Languages DataFrame RDD
ETL with Spark
ETL: Extract, Transform, Load ● Key factor for big data platforms ● Provides Speed Improvements in All Workloads ● Typically Executed by Data Engineers
File Formats ● Text File Formats ○ CSV ○ JSON ● Avro Row Format ● Parquet Columnar Format
File Formats + Compression ● File Formats ○ JSON ○ CSV ○ Avro ○ Parquet ● Compression Codecs ○ No compression ○ Snappy ○ Gzip ○ LZO
● Industry Standard File Format: Parquet ○ Write to Parquet: df.write.format(“parquet”).save(“namesAndAges.parquet”) df.write.format(“parquet”).saveAsTable(“myTestTable”) ○ For compression: spark.sql.parquet.compression.codec = (gzip, snappy) Spark Parquet Properties
Small Files Problem ● Small files problem still exists ● Metadata loading ● APIs: df.coalesce(N) df.repartition(N) Ref: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
● RDD / DataFrame Partitions df.rdd.getNumPartitions() ● SparkSQL Shuffle Partitions spark.sql.shuffle.partitions ● Table Level Partitions df.write.partitionBy(“year”). save(“data.parquet”) All About Partitions
# CSV df = sqlContext.read. format('com.databricks.spark.csv'). options(header='true', inferSchema='true'). load('/path/to/data') # JSON df = sqlContext.read.json("/tmp/test.json") df.write.json("/tmp/test_output.json") PySpark ETL APIs - Text Formats
PySpark ETL APIs - Container Formats # Binary Container Formats # Avro df = sqlContext.read. format("com.databricks.spark.avro"). load("/path/to/files/") # Parquet df = sqlContext.read.parquet("/path/to/files/") df.write.parquet("/path/to/files/")
● Manage Number of Files ○ APIs manage the number of files per directory df.repartition(80). write. parquet("/path/to/parquet/") df.repartition(80) partitionBy("year") write. parquet("/path/to/parquet/") PySpark ETL APIs
Common ETL Problems ● Malformed JSON Records sqlContext.sql("SELECT _corrupt_record FROM jsonTable WHERE _corrupt_record IS NOT NULL") ● Mismatched DataFrame Schema ○ Null Representation vs Schema DataType ● Many Small Files / No Partition Strategy ○ Parquet Files: ~128MB - 256MB Compressed Ref: https://databricks.gitbooks.io/databricks-spark-knowledge- base/content/best_practices/dealing_with_bad_data.html
Debugging Spark Spark Driver Error: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 362.0 failed 4 times, most recent failure: Lost task 1.3 in stage 362.0 (TID 275202, ip-10-111-225-98.ec2.internal): java.nio.channels.ClosedChannelException Spark Executor Error: 16/04/13 20:02:16 ERROR DefaultWriterContainer: Aborting task. java.text.ParseException: Unparseable number: "N" at java.text.NumberFormat.parse(NumberFormat.java:385) at com.databricks.spark.csv.util.TypeCast$$anonfun$castTo$4.apply$mcD$sp(TypeCast.scala:58) at com.databricks.spark.csv.util.TypeCast$$anonfun$castTo$4.apply(TypeCast.scala:58) at com.databricks.spark.csv.util.TypeCast$$anonfun$castTo$4.apply(TypeCast.scala:58) at scala.util.Try.getOrElse(Try.scala:77) at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:58)
Debugging Spark
SQL with Spark
SparkSQL Best Practices ● DataFrames and SparkSQL are synonyms ● Use builtin functions instead of custom UDFs ○ import pyspark.sql.functions ● Examples: ○ to_date() ○ get_json_object() ○ regexp_extract() ○ hour() / minute() Ref: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
SparkSQL Best Practices ● Large Table Joins ○ Largest Table on LHS ○ Increase Spark Shuffle Partitions ○ Leverage “cluster by” API included in Spark 1.6 sqlCtx.sql("select * from large_table_1 cluster by num1") .registerTempTable("sorted_large_table_1"); sqlCtx.sql(“cache table sorted_large_table_1”);
PySpark API Best Practices ● User Defined Functions (UDFs) from pyspark.sql import functions as F add_n = udf(lambda x, y: x + y, IntegerType()) # We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type. df = df.withColumn('id_offset', add_n( F.lit(1000), df.id.cast(IntegerType())))
PySpark API Best Practices ● Built-in Functions corpus_df = df.select( F.lower( F.col('body')).alias('corpus'), F.monotonicallyIncreasingId().alias('id')) corpus_df = df.select( F.date_format( F.from_utc_timestamp( F.from_unixtime(F.col('created_utc'), "PST"), 'EEEE')).alias ('dayofweek')) Ref: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
PySpark API Best Practices ● User Defined Functions (UDFs) def squared(s): return s * s sqlContext.udf.register("squaredWithPython", squared) display(df.select("id", squared_udf("id").alias("id_squared")))
ML with Spark
Data Science Time 40
Why Spark ML Provide general purpose ML algorithms on top of Spark • Let Spark handle the distribution of data and queries; scalability • Leverage its improvements (e.g. DataFrames, Datasets, Tungsten) Advantages of MLlib’s Design: • Simplicity • Scalability • Streamlined end-to-end • Compatibility
High-level functionality in MLlib Learning tasks Classification Regression Recommendation Clustering Frequent itemsets 42 Workflow utilities • Model import/export • Pipelines • DataFrames • Cross validation Data utilities • Feature extraction & selection • Statistics • Linear algebra
Machine Learning: What and Why? ML uses data to identify patterns and make decisions. Core value of ML is automated decision making • Especially important when dealing with TB or PB of data Many Use Cases including: • Marketing and advertising optimization • Security monitoring / fraud detection • Operational optimizations
Algorithm coverage in MLlib Classification • Logistic regression w/ elastic net • Naive Bayes • Streaming logistic regression • Linear SVMs • Decision trees • Random forests • Gradient-boosted trees • Multilayer perceptron • One-vs-rest Regression • Least squares w/ elastic net • Isotonic regression • Decision trees • Random forests • Gradient-boosted trees • Streaming linear methods Recommendation • Alternating Least Squares Frequent itemsets • FP-growth • Prefix span Clustering • Gaussian mixture models • K-Means • Streaming K-Means • Latent Dirichlet Allocation • Power Iteration Clustering Statistics • Pearson correlation • Spearman correlation • Online summarization • Chi-squared test • Kernel density estimation Linear algebra • Local dense & sparse vectors & matrices • Distributed matrices • Block-partitioned matrix • Row matrix • Indexed row matrix • Coordinate matrix • Matrix decompositions Model import/export Pipelines Feature extraction & selection • Binarizer • Bucketizer • Chi-Squared selection • CountVectorizer • Discrete cosine transform • ElementwiseProduct • Hashing term frequency • Inverse document frequency • MinMaxScaler • Ngram • Normalizer • One-Hot Encoder • PCA • PolynomialExpansion • RFormula • SQLTransformer • Standard scaler • StopWordsRemover • StringIndexer • Tokenizer • StringIndexer • VectorAssembler • VectorIndexer • VectorSlicer • Word2Vec List based on Spark 1.5 44
Spark ML Best Practices ● Spark MLLib vs SparkML ○ Understand the differences ● Don’t Pipeline Too Many Stages ○ Check Results Between Stages
PySpark ML API Best Practices
PySpark ML API Best Practices
● DataFrame to RDD Mapping def tokenize(text): tokens = word_tokenize(text) lowercased = [t.lower() for t in tokens] no_punctuation = [] for word in lowercased: punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION]) no_punctuation.append(punct_removed) no_stopwords = [w for w in no_punctuation if not w in STOPWORDS] stemmed = [STEMMER.stem(w) for w in no_stopwords] return [w for w in stemmed if w] rdd = wordsDataFrame.map(lambda x: (x.__getitem__('id'), tokenize(x.__getitem__('corpus')))) PySpark ML API Best Practices
Learning more about MLlib Guides & examples • Example workflow using ML Pipelines (Python) • The above 2 links are part of the Databricks Guide, which contains many more examples and references. References • Apache Spark MLlib User Guide • The MLlib User Guide contains code snippets for almost all algorithms, as well as links to API documentation. • Meng et al. “MLlib: Machine Learning in Apache Spark.” 2015. http://arxiv. org/abs/1505.06807 (academic paper) 49
Spark Demo
Thanks! Sign Up For Databricks Community Edition! http://go.databricks.com/databricks-community- edition-beta-waitlist

The Nitty Gritty of Advanced Analytics Using Apache Spark in Python

  • 1.
    The Nitty Grittyof Advanced Analytics Using Apache Spark in Python Miklos Christine Solutions Architect mwc@databricks.com, @Miklos_C
  • 2.
    About Me Miklos Christine SolutionsArchitect @ Databricks - mwc@databricks.com - Miklos_C@twitter Systems Engineer @ Cloudera Supported a few of the largest clusters in the world Software Engineer @ Cisco UC Berkeley Graduate
  • 3.
    We are Databricks,the company behind Spark Founded by the creators of Apache Spark in 2013 Share of Spark code contributed by Databricks in 2014 75% 3 Data Value Created Databricks on top of Spark to make big data simple.
  • 4.
    … Apache Spark Engine SparkCore Spark Streaming Spark SQL MLlib GraphX Unified engine across diverse workloads & environments Scale out, fault tolerant Python, Java, Scala, and R APIs Standard libraries
  • 6.
    2012 started @ Berkeley 2010 research paper 2013 Databricks started & donated to ASF 2014 Spark1.0 & libraries (SQL, ML, GraphX) 2015 DataFrames Tungsten ML Pipelines 2016 Spark 2.0
  • 7.
    Spark Community Growth •Spark Survey 2015 Highlights • End of Year Spark Highlights
  • 8.
    2015: A GreatYear for Spark Most active open source project in (big) data • 1000+ code contributors New language: R Widespread industry support & adoption
  • 13.
    HOW RESPONDENTS ARERUNNING SPARK 51% on a public cloud TOP ROLES USING SPARK of respondents identify themselves as Data Engineers 41% of respondents identify themselves as Data Scientists 22%
  • 14.
  • 15.
    NOTABLE USERS THATPRESENTED AT SPARK SUMMIT 2015 SAN FRANCISCO Source: Slide 5 of Spark Community Update
  • 16.
    Large-Scale Usage Largest cluster: 8000Nodes (Tencent) Largest single job: 1 PB (Alibaba, Databricks) Top Streaming Intake: 1 TB/hour (HHMI Janelia Farm) 2014 On-Disk Sort Record Fastest Open Source Engine for sorting a PB
  • 17.
  • 18.
    History of SparkAPIs RDD (2011) DataFrame (2013) Distribute collection of JVM objects Functional Operators (map, filter, etc.) Distribute collection of Row objects Expression-based operations and UDFs Logical plans and optimizer Fast/efficient internal representations DataSet (2015) Internally rows, externally JVM objects Almost the “Best of both worlds”: type safe + fast But slower than DF Not as good for interactive analysis, especially Python
  • 19.
    Benefit of LogicalPlan: Performance Parity Across Languages DataFrame RDD
  • 20.
  • 21.
    ETL: Extract, Transform,Load ● Key factor for big data platforms ● Provides Speed Improvements in All Workloads ● Typically Executed by Data Engineers
  • 22.
    File Formats ● TextFile Formats ○ CSV ○ JSON ● Avro Row Format ● Parquet Columnar Format
  • 23.
    File Formats +Compression ● File Formats ○ JSON ○ CSV ○ Avro ○ Parquet ● Compression Codecs ○ No compression ○ Snappy ○ Gzip ○ LZO
  • 24.
    ● Industry StandardFile Format: Parquet ○ Write to Parquet: df.write.format(“parquet”).save(“namesAndAges.parquet”) df.write.format(“parquet”).saveAsTable(“myTestTable”) ○ For compression: spark.sql.parquet.compression.codec = (gzip, snappy) Spark Parquet Properties
  • 25.
    Small Files Problem ●Small files problem still exists ● Metadata loading ● APIs: df.coalesce(N) df.repartition(N) Ref: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
  • 26.
    ● RDD /DataFrame Partitions df.rdd.getNumPartitions() ● SparkSQL Shuffle Partitions spark.sql.shuffle.partitions ● Table Level Partitions df.write.partitionBy(“year”). save(“data.parquet”) All About Partitions
  • 27.
    # CSV df =sqlContext.read. format('com.databricks.spark.csv'). options(header='true', inferSchema='true'). load('/path/to/data') # JSON df = sqlContext.read.json("/tmp/test.json") df.write.json("/tmp/test_output.json") PySpark ETL APIs - Text Formats
  • 28.
    PySpark ETL APIs- Container Formats # Binary Container Formats # Avro df = sqlContext.read. format("com.databricks.spark.avro"). load("/path/to/files/") # Parquet df = sqlContext.read.parquet("/path/to/files/") df.write.parquet("/path/to/files/")
  • 29.
    ● Manage Numberof Files ○ APIs manage the number of files per directory df.repartition(80). write. parquet("/path/to/parquet/") df.repartition(80) partitionBy("year") write. parquet("/path/to/parquet/") PySpark ETL APIs
  • 30.
    Common ETL Problems ●Malformed JSON Records sqlContext.sql("SELECT _corrupt_record FROM jsonTable WHERE _corrupt_record IS NOT NULL") ● Mismatched DataFrame Schema ○ Null Representation vs Schema DataType ● Many Small Files / No Partition Strategy ○ Parquet Files: ~128MB - 256MB Compressed Ref: https://databricks.gitbooks.io/databricks-spark-knowledge- base/content/best_practices/dealing_with_bad_data.html
  • 31.
    Debugging Spark Spark DriverError: Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 362.0 failed 4 times, most recent failure: Lost task 1.3 in stage 362.0 (TID 275202, ip-10-111-225-98.ec2.internal): java.nio.channels.ClosedChannelException Spark Executor Error: 16/04/13 20:02:16 ERROR DefaultWriterContainer: Aborting task. java.text.ParseException: Unparseable number: "N" at java.text.NumberFormat.parse(NumberFormat.java:385) at com.databricks.spark.csv.util.TypeCast$$anonfun$castTo$4.apply$mcD$sp(TypeCast.scala:58) at com.databricks.spark.csv.util.TypeCast$$anonfun$castTo$4.apply(TypeCast.scala:58) at com.databricks.spark.csv.util.TypeCast$$anonfun$castTo$4.apply(TypeCast.scala:58) at scala.util.Try.getOrElse(Try.scala:77) at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:58)
  • 32.
  • 33.
  • 34.
    SparkSQL Best Practices ●DataFrames and SparkSQL are synonyms ● Use builtin functions instead of custom UDFs ○ import pyspark.sql.functions ● Examples: ○ to_date() ○ get_json_object() ○ regexp_extract() ○ hour() / minute() Ref: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
  • 35.
    SparkSQL Best Practices ●Large Table Joins ○ Largest Table on LHS ○ Increase Spark Shuffle Partitions ○ Leverage “cluster by” API included in Spark 1.6 sqlCtx.sql("select * from large_table_1 cluster by num1") .registerTempTable("sorted_large_table_1"); sqlCtx.sql(“cache table sorted_large_table_1”);
  • 36.
    PySpark API BestPractices ● User Defined Functions (UDFs) from pyspark.sql import functions as F add_n = udf(lambda x, y: x + y, IntegerType()) # We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type. df = df.withColumn('id_offset', add_n( F.lit(1000), df.id.cast(IntegerType())))
  • 37.
    PySpark API BestPractices ● Built-in Functions corpus_df = df.select( F.lower( F.col('body')).alias('corpus'), F.monotonicallyIncreasingId().alias('id')) corpus_df = df.select( F.date_format( F.from_utc_timestamp( F.from_unixtime(F.col('created_utc'), "PST"), 'EEEE')).alias ('dayofweek')) Ref: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
  • 38.
    PySpark API BestPractices ● User Defined Functions (UDFs) def squared(s): return s * s sqlContext.udf.register("squaredWithPython", squared) display(df.select("id", squared_udf("id").alias("id_squared")))
  • 39.
  • 40.
  • 41.
    Why Spark ML Providegeneral purpose ML algorithms on top of Spark • Let Spark handle the distribution of data and queries; scalability • Leverage its improvements (e.g. DataFrames, Datasets, Tungsten) Advantages of MLlib’s Design: • Simplicity • Scalability • Streamlined end-to-end • Compatibility
  • 42.
    High-level functionality inMLlib Learning tasks Classification Regression Recommendation Clustering Frequent itemsets 42 Workflow utilities • Model import/export • Pipelines • DataFrames • Cross validation Data utilities • Feature extraction & selection • Statistics • Linear algebra
  • 43.
    Machine Learning: Whatand Why? ML uses data to identify patterns and make decisions. Core value of ML is automated decision making • Especially important when dealing with TB or PB of data Many Use Cases including: • Marketing and advertising optimization • Security monitoring / fraud detection • Operational optimizations
  • 44.
    Algorithm coverage inMLlib Classification • Logistic regression w/ elastic net • Naive Bayes • Streaming logistic regression • Linear SVMs • Decision trees • Random forests • Gradient-boosted trees • Multilayer perceptron • One-vs-rest Regression • Least squares w/ elastic net • Isotonic regression • Decision trees • Random forests • Gradient-boosted trees • Streaming linear methods Recommendation • Alternating Least Squares Frequent itemsets • FP-growth • Prefix span Clustering • Gaussian mixture models • K-Means • Streaming K-Means • Latent Dirichlet Allocation • Power Iteration Clustering Statistics • Pearson correlation • Spearman correlation • Online summarization • Chi-squared test • Kernel density estimation Linear algebra • Local dense & sparse vectors & matrices • Distributed matrices • Block-partitioned matrix • Row matrix • Indexed row matrix • Coordinate matrix • Matrix decompositions Model import/export Pipelines Feature extraction & selection • Binarizer • Bucketizer • Chi-Squared selection • CountVectorizer • Discrete cosine transform • ElementwiseProduct • Hashing term frequency • Inverse document frequency • MinMaxScaler • Ngram • Normalizer • One-Hot Encoder • PCA • PolynomialExpansion • RFormula • SQLTransformer • Standard scaler • StopWordsRemover • StringIndexer • Tokenizer • StringIndexer • VectorAssembler • VectorIndexer • VectorSlicer • Word2Vec List based on Spark 1.5 44
  • 45.
    Spark ML BestPractices ● Spark MLLib vs SparkML ○ Understand the differences ● Don’t Pipeline Too Many Stages ○ Check Results Between Stages
  • 46.
    PySpark ML APIBest Practices
  • 47.
    PySpark ML APIBest Practices
  • 48.
    ● DataFrame toRDD Mapping def tokenize(text): tokens = word_tokenize(text) lowercased = [t.lower() for t in tokens] no_punctuation = [] for word in lowercased: punct_removed = ''.join([letter for letter in word if not letter in PUNCTUATION]) no_punctuation.append(punct_removed) no_stopwords = [w for w in no_punctuation if not w in STOPWORDS] stemmed = [STEMMER.stem(w) for w in no_stopwords] return [w for w in stemmed if w] rdd = wordsDataFrame.map(lambda x: (x.__getitem__('id'), tokenize(x.__getitem__('corpus')))) PySpark ML API Best Practices
  • 49.
    Learning more aboutMLlib Guides & examples • Example workflow using ML Pipelines (Python) • The above 2 links are part of the Databricks Guide, which contains many more examples and references. References • Apache Spark MLlib User Guide • The MLlib User Guide contains code snippets for almost all algorithms, as well as links to API documentation. • Meng et al. “MLlib: Machine Learning in Apache Spark.” 2015. http://arxiv. org/abs/1505.06807 (academic paper) 49
  • 50.
  • 51.
    Thanks! Sign Up ForDatabricks Community Edition! http://go.databricks.com/databricks-community- edition-beta-waitlist