Apache Spark The next Generation Cluster Computing Ivan Lozić, 04/25/2017
Ivan Lozić, software engineer & entrepreneur Scala & Spark, C#, Node.js, Swift Web page: www.deegloo.com E-Mail: ilozic@gmail.com LinkedIn: https://www.linkedin.com/in/ilozic/ Zagreb, Croatia
Contents ● Apache Spark and its relation to Hadoop MapReduce ● What makes Apache Spark run fast ● How to use Spark rich API to build batch ETL jobs ● Streaming capabilities ● Structured streaming 3
Apache Hadoop 44
Apache Hadoop ● Open Source framework for distributed storage and processing ● Origins are in the project “Nutch” back in 2002 (Cutting, Cafarella) ● 2006. Yahoo! Created Hadoop based on GFS and MapReduce ● Based on MapReduce programming model ● Fundamental assumption - all the modules are built to handle hardware failures automatically ● Clusters built of commodity hardware 5
6
Apache Spark 77
Motivation ● Hardware - CPU compute bottleneck ● Users - democratise access to data and improve usability ● Applications - necessity to build near real time big data applications 8
Apache Spark ● Open source fast and expressive cluster computing framework designed for Big data analytics ● Compatible with Apache Hadoop ● Developed at UC Berkley’s AMP Lab 2009. and donated to the Apache Software Foundation in 2013. ● Original author - Matei Zaharia ● Databricks inc. - company behind Apache Spark 9
Apache Spark ● General distributed computing engine which unifies: ○ SQL and DataFrames ○ Real-time streaming (Spark streaming) ○ Machine learning (SparkML/MLLib) ○ Graph processing (GraphX) 10
Apache Spark ● Runs everywhere - standalone, EC2, Hadoop YARN, Apache Mesos ● Reads and writes from/to: ○ File/Directory ○ HDFS/S3 ○ JDBC ○ JSON ○ CSV ○ Parquet ○ Cassandra, HBase, ... 11
Apache Spark - architecture 12 source: Databricks
Word count - MapReduce vs Spark 13 package org.myorg; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...")
Hadoop ecosystem 14
Who uses Apache Spark? 15
Core data abstractions 1616
Resilient Distributed Dataset ● RDDs are partitioned collections of objects - building blocks of Spark ● Immutable and provide fault tolerant computation ● Two types of operations: 1. Transformations - map, reduce, sort, filter, groupBy, ... 2. Actions - collect, count, take, first, foreach, saveToCassandra, ... 17
RDD ● Types of operations are based on Scala collection API ● Transformations are lazily evaluated DAG (Directed Acyclic Graph) constituents ● Actions invoke DAG creation and actual computation 18
RDD 19
Data shuffling ● Sending data over the network ● Slow - should be minimized as much as possible! ● Typical example - groupByKey (slow) vs reduceByKey (faster) 20
RDD - the problems ● They express the how better than what ● Operations and data type in clojure are black box for Spark - Spark cannot make optimizations 21 val category = spark.sparkContext.textFile("/data/SFPD_Incidents_2003.csv") .map(line => line.split(byCommaButNotUnderQuotes)(1)) .filter(cat => cat != "Category")
Structure (Structured APIs) 22
SparkSQL 23 ● Originally named “Shark” - to enable HiveQL queries ● As of Spark 2.0 - SQL 2003 support category.toDF("categoryName").createOrReplaceTempView("category") spark.sql(""" SELECT categoryName, count(*) AS Count FROM category GROUP BY categoryName ORDER BY 2 DESC """).show(5)
DataFrame ● Higher level abstraction (DSL) to manipulate with data ● Distributed collection of rows organized into named columns ● Modeled after Pandas DataFrame ● DataFrame has schema (something RDD is missing) 24 val categoryDF = category.toDF("categoryName") categoryDF .groupBy("categoryName") .count() .orderBy($"Count".desc) .show(5)
DataFrame 25
Structured APIs error-check comparison 26 source: Databricks
Dataset ● Extension to DataFrame ● Type-safe ● DataFrame = Dataset[Row] 27 case class Incident(Category: String, DayOfWeek: String) val incidents = spark .read .option("header", "true") .csv("/data/SFPD_Incidents_2003.csv") .select("Category", "DayOfWeek") .as[Incident] val days = Array("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday") val histogram = incidents.groupByKey(_.Category).mapGroups { case (category, daysOfWeek) => { val buckets = new Array[Int](7) daysOfWeek.map(_.DayOfWeek).foreach { dow => buckets(days.indexOf(dow)) += 1 } (category, buckets) } }
What makes Spark fast? 2828
In memory computation ● Fault tolerance is achieved by using HDFS ● Easy possible to spend 90% of time in Disk I/O only 29 iter. 1 input iter. 2 ... HDFS read HDFS write HDFS read HDFS write HDFS read ● Fault tolerance is provided by building lineage of transformations ● Data is not being replicated iter. 1 input iter. 2 ...
Catalyst - query optimizer 30 source: Databricks ● Applies transformations to convert unoptimized to optimized query plan
Project Tungsten ● Improve Spark execution memory and CPU efficiency by: ○ Performing explicit memory management instead of relying on JVM objects (Dataset encoders) ○ Generating code on the fly to fuse multiple operators into one (Whole stage codegen) ○ Introducing cache-aware computation ○ In-memory columnar format ● Bringing Spark closer to the bare metal 31
Dataset encoders ● Encoders translate between domain objects and Spark's internal format 32 source: Databricks
Dataset encoders ● Encoders bridge objects with data sources 33 { "Category": "THEFT", "IncidntNum": "150060275", "DayOfWeek": "Saturday" } case class Incident(IncidntNum: Int, Category: String, DayOfWeek: String)
Dataset benchmark Space efficiency 34 source: Databricks
Dataset benchmark Serialization/deserialization performance 35 source: Databricks
Whole stage codegen ● Fuse the operators together ● Generate code on the fly ● The idea: generate specialized code as if it was written manually to be fast Result: Spark 2.0 is 10x faster than Spark 1.6 36
Whole stage codegen 37 SELECT COUNT(*) FROM store_sales WHERE ss_item_sk=1000
Whole stage codegen Volcano iterator model 38
Whole stage codegen What if we would ask some intern to write this in c#? 39 long count = 0; foreach (var ss_item_sk in store_sales) { if (ss_item_sk == 1000) count++; }
Volcano vs Intern 40 Volcano Intern source: Databricks
Volcano vs Intern 41
Developing ETL with Spark 4242
Choose your favorite IDE 43
Define Spark job entry point 44 object IncidentsJob { def main(args: Array[String]) { val spark = SparkSession.builder() .appName("Incidents processing job") .config("spark.sql.shuffle.partitions", "16") .master("local[4]") .getOrCreate() { spark transformations and actions... } System.exit(0) }
Create build.sbt file 45 lazy val root = (project in file(".")). settings( organization := "com.mycompany", name := "spark.job.incidents", version := "1.0.0", scalaVersion := "2.11.8", mainClass in Compile := Some("com.mycompany.spark.job.incidents.main") ) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1" % "provided", "org.apache.spark" %% "spark-sql" % "2.0.1" % "provided", "org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided", "com.microsoft.sqlserver" % "sqljdbc4" % "4.0" )
Create application (fat) jar file $ sbt compile $ sbt test $ sbt assembly (sbt-assembly plugin) 46
Submit job via spark-submit command ./bin/spark-submit --class <main-class> --master <master-url> --deploy-mode <deploy-mode> --conf <key>=<value> ... # other options <application-jar> [application-arguments] 47
Example workflow 48 code 1. pull content 2. take build number (331) 3. build & test 4. copy to cluster job331.jar produce job artifact notification 5. create/schedule job job331 (http) 6. spark submit job331
Spark Streaming 4949
Apache Spark streaming ● Scalable fault tolerant streaming system ● Receivers receive data streams and chop them into batches ● Spark processes batches and pushes out the result 50 ● Input: Files, Socket, Kafka, Flume, Kinesis...
Apache Spark streaming 51 def main(args: Array[String]) { val conf = new SparkConf() .setMaster("local[2]") .setAppName("Incidents processing job - Stream") val ssc = new StreamingContext(conf, Seconds(1)) val topics = Set( Topics.Incident, val directKafkaStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( ssc, kafkaParams, topics) // process batches directKafkaStream.map(_._2).flatMap(_.split(“ “))... // Start the computation ssc.start() ssc.awaitTermination() System.exit(0) }
Apache Spark streaming ● Integrates with the rest of the ecosystem ○ Combine batch and stream processing ○ Combine machine learning with streaming ○ Combine SQL with streaming 52
Structured streaming 53 [Alpha version in Spark 2.1] 53
Structured streaming (continuous apps) ● High-level streaming API built on DataFrames ● Catalyst optimizer creates incremental execution plan ● Unifies streaming, interactive and batch queries ● Supports multiple sources and sinks ● E.g. aggregate data in a stream, then serve using JDBC 54
Structured streaming key idea The simplest way to perform streaming analytics is not having to reason about streaming. 55
Structured streaming 56
Structured streaming ● Reusing same API 57 val categories = spark .read .option("header", "true") .schema(schema) .csv("/data/source") .select("Category") val categories = spark .readStream .option("header", "true") .schema(schema) .csv("/data/source") .select("Category") finite infinite
Structured streaming ● Reusing same API 58 categories .write .format("parquet") .save("/data/warehouse/categories.parquet") categories .writeStream .format("parquet") .start("/data/warehouse/categories.parquet") finite infinite
Structured streaming 59
Useful resources ● Spark home page: https://spark.apache.org/ ● Spark summit page: https://spark-summit.org/ ● Apache Spark Docker image: https://github.com/dylanmei/docker-zeppelin ● SFPD Incidents: https://data.sfgov.org/Public-Safety/Police-Department-Incidents/tmn f-yvry 60
Thank you for the attention! 61
References 62 ● Michael Armbrust - STRUCTURING SPARK: DATAFRAMES, DATASETS AND STREAMING - https://spark-summit.org/2016/events/structuring-spark-dataframes-datasets-and-streaming/ ● Apache Parquet - https://parquet.apache.org/ ● Spark Performance: What's Next - https://spark-summit.org/east-2016/events/spark-performance-whats-next/ ● Avoid groupByKey - https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reduceby key_over_groupbykey.html

Apache Spark, the Next Generation Cluster Computing

  • 1.
    Apache Spark The nextGeneration Cluster Computing Ivan Lozić, 04/25/2017
  • 2.
    Ivan Lozić, softwareengineer & entrepreneur Scala & Spark, C#, Node.js, Swift Web page: www.deegloo.com E-Mail: ilozic@gmail.com LinkedIn: https://www.linkedin.com/in/ilozic/ Zagreb, Croatia
  • 3.
    Contents ● Apache Sparkand its relation to Hadoop MapReduce ● What makes Apache Spark run fast ● How to use Spark rich API to build batch ETL jobs ● Streaming capabilities ● Structured streaming 3
  • 4.
  • 5.
    Apache Hadoop ● OpenSource framework for distributed storage and processing ● Origins are in the project “Nutch” back in 2002 (Cutting, Cafarella) ● 2006. Yahoo! Created Hadoop based on GFS and MapReduce ● Based on MapReduce programming model ● Fundamental assumption - all the modules are built to handle hardware failures automatically ● Clusters built of commodity hardware 5
  • 6.
  • 7.
  • 8.
    Motivation ● Hardware -CPU compute bottleneck ● Users - democratise access to data and improve usability ● Applications - necessity to build near real time big data applications 8
  • 9.
    Apache Spark ● Opensource fast and expressive cluster computing framework designed for Big data analytics ● Compatible with Apache Hadoop ● Developed at UC Berkley’s AMP Lab 2009. and donated to the Apache Software Foundation in 2013. ● Original author - Matei Zaharia ● Databricks inc. - company behind Apache Spark 9
  • 10.
    Apache Spark ● Generaldistributed computing engine which unifies: ○ SQL and DataFrames ○ Real-time streaming (Spark streaming) ○ Machine learning (SparkML/MLLib) ○ Graph processing (GraphX) 10
  • 11.
    Apache Spark ● Runseverywhere - standalone, EC2, Hadoop YARN, Apache Mesos ● Reads and writes from/to: ○ File/Directory ○ HDFS/S3 ○ JDBC ○ JSON ○ CSV ○ Parquet ○ Cassandra, HBase, ... 11
  • 12.
    Apache Spark -architecture 12 source: Databricks
  • 13.
    Word count -MapReduce vs Spark 13 package org.myorg; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } } val file = spark.textFile("hdfs://...") val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...")
  • 14.
  • 15.
    Who uses ApacheSpark? 15
  • 16.
  • 17.
    Resilient Distributed Dataset ●RDDs are partitioned collections of objects - building blocks of Spark ● Immutable and provide fault tolerant computation ● Two types of operations: 1. Transformations - map, reduce, sort, filter, groupBy, ... 2. Actions - collect, count, take, first, foreach, saveToCassandra, ... 17
  • 18.
    RDD ● Types ofoperations are based on Scala collection API ● Transformations are lazily evaluated DAG (Directed Acyclic Graph) constituents ● Actions invoke DAG creation and actual computation 18
  • 19.
  • 20.
    Data shuffling ● Sendingdata over the network ● Slow - should be minimized as much as possible! ● Typical example - groupByKey (slow) vs reduceByKey (faster) 20
  • 21.
    RDD - theproblems ● They express the how better than what ● Operations and data type in clojure are black box for Spark - Spark cannot make optimizations 21 val category = spark.sparkContext.textFile("/data/SFPD_Incidents_2003.csv") .map(line => line.split(byCommaButNotUnderQuotes)(1)) .filter(cat => cat != "Category")
  • 22.
  • 23.
    SparkSQL 23 ● Originally named“Shark” - to enable HiveQL queries ● As of Spark 2.0 - SQL 2003 support category.toDF("categoryName").createOrReplaceTempView("category") spark.sql(""" SELECT categoryName, count(*) AS Count FROM category GROUP BY categoryName ORDER BY 2 DESC """).show(5)
  • 24.
    DataFrame ● Higher levelabstraction (DSL) to manipulate with data ● Distributed collection of rows organized into named columns ● Modeled after Pandas DataFrame ● DataFrame has schema (something RDD is missing) 24 val categoryDF = category.toDF("categoryName") categoryDF .groupBy("categoryName") .count() .orderBy($"Count".desc) .show(5)
  • 25.
  • 26.
    Structured APIs error-checkcomparison 26 source: Databricks
  • 27.
    Dataset ● Extension toDataFrame ● Type-safe ● DataFrame = Dataset[Row] 27 case class Incident(Category: String, DayOfWeek: String) val incidents = spark .read .option("header", "true") .csv("/data/SFPD_Incidents_2003.csv") .select("Category", "DayOfWeek") .as[Incident] val days = Array("Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday") val histogram = incidents.groupByKey(_.Category).mapGroups { case (category, daysOfWeek) => { val buckets = new Array[Int](7) daysOfWeek.map(_.DayOfWeek).foreach { dow => buckets(days.indexOf(dow)) += 1 } (category, buckets) } }
  • 28.
  • 29.
    In memory computation ●Fault tolerance is achieved by using HDFS ● Easy possible to spend 90% of time in Disk I/O only 29 iter. 1 input iter. 2 ... HDFS read HDFS write HDFS read HDFS write HDFS read ● Fault tolerance is provided by building lineage of transformations ● Data is not being replicated iter. 1 input iter. 2 ...
  • 30.
    Catalyst - queryoptimizer 30 source: Databricks ● Applies transformations to convert unoptimized to optimized query plan
  • 31.
    Project Tungsten ● ImproveSpark execution memory and CPU efficiency by: ○ Performing explicit memory management instead of relying on JVM objects (Dataset encoders) ○ Generating code on the fly to fuse multiple operators into one (Whole stage codegen) ○ Introducing cache-aware computation ○ In-memory columnar format ● Bringing Spark closer to the bare metal 31
  • 32.
    Dataset encoders ● Encoderstranslate between domain objects and Spark's internal format 32 source: Databricks
  • 33.
    Dataset encoders ● Encodersbridge objects with data sources 33 { "Category": "THEFT", "IncidntNum": "150060275", "DayOfWeek": "Saturday" } case class Incident(IncidntNum: Int, Category: String, DayOfWeek: String)
  • 34.
  • 35.
  • 36.
    Whole stage codegen ●Fuse the operators together ● Generate code on the fly ● The idea: generate specialized code as if it was written manually to be fast Result: Spark 2.0 is 10x faster than Spark 1.6 36
  • 37.
    Whole stage codegen 37 SELECTCOUNT(*) FROM store_sales WHERE ss_item_sk=1000
  • 38.
    Whole stage codegen Volcanoiterator model 38
  • 39.
    Whole stage codegen Whatif we would ask some intern to write this in c#? 39 long count = 0; foreach (var ss_item_sk in store_sales) { if (ss_item_sk == 1000) count++; }
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
    Define Spark jobentry point 44 object IncidentsJob { def main(args: Array[String]) { val spark = SparkSession.builder() .appName("Incidents processing job") .config("spark.sql.shuffle.partitions", "16") .master("local[4]") .getOrCreate() { spark transformations and actions... } System.exit(0) }
  • 45.
    Create build.sbt file 45 lazyval root = (project in file(".")). settings( organization := "com.mycompany", name := "spark.job.incidents", version := "1.0.0", scalaVersion := "2.11.8", mainClass in Compile := Some("com.mycompany.spark.job.incidents.main") ) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1" % "provided", "org.apache.spark" %% "spark-sql" % "2.0.1" % "provided", "org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided", "com.microsoft.sqlserver" % "sqljdbc4" % "4.0" )
  • 46.
    Create application (fat)jar file $ sbt compile $ sbt test $ sbt assembly (sbt-assembly plugin) 46
  • 47.
    Submit job viaspark-submit command ./bin/spark-submit --class <main-class> --master <master-url> --deploy-mode <deploy-mode> --conf <key>=<value> ... # other options <application-jar> [application-arguments] 47
  • 48.
    Example workflow 48 code 1. pullcontent 2. take build number (331) 3. build & test 4. copy to cluster job331.jar produce job artifact notification 5. create/schedule job job331 (http) 6. spark submit job331
  • 49.
  • 50.
    Apache Spark streaming ●Scalable fault tolerant streaming system ● Receivers receive data streams and chop them into batches ● Spark processes batches and pushes out the result 50 ● Input: Files, Socket, Kafka, Flume, Kinesis...
  • 51.
    Apache Spark streaming 51 defmain(args: Array[String]) { val conf = new SparkConf() .setMaster("local[2]") .setAppName("Incidents processing job - Stream") val ssc = new StreamingContext(conf, Seconds(1)) val topics = Set( Topics.Incident, val directKafkaStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder]( ssc, kafkaParams, topics) // process batches directKafkaStream.map(_._2).flatMap(_.split(“ “))... // Start the computation ssc.start() ssc.awaitTermination() System.exit(0) }
  • 52.
    Apache Spark streaming ●Integrates with the rest of the ecosystem ○ Combine batch and stream processing ○ Combine machine learning with streaming ○ Combine SQL with streaming 52
  • 53.
  • 54.
    Structured streaming (continuousapps) ● High-level streaming API built on DataFrames ● Catalyst optimizer creates incremental execution plan ● Unifies streaming, interactive and batch queries ● Supports multiple sources and sinks ● E.g. aggregate data in a stream, then serve using JDBC 54
  • 55.
    Structured streaming keyidea The simplest way to perform streaming analytics is not having to reason about streaming. 55
  • 56.
  • 57.
    Structured streaming ● Reusingsame API 57 val categories = spark .read .option("header", "true") .schema(schema) .csv("/data/source") .select("Category") val categories = spark .readStream .option("header", "true") .schema(schema) .csv("/data/source") .select("Category") finite infinite
  • 58.
    Structured streaming ● Reusingsame API 58 categories .write .format("parquet") .save("/data/warehouse/categories.parquet") categories .writeStream .format("parquet") .start("/data/warehouse/categories.parquet") finite infinite
  • 59.
  • 60.
    Useful resources ● Sparkhome page: https://spark.apache.org/ ● Spark summit page: https://spark-summit.org/ ● Apache Spark Docker image: https://github.com/dylanmei/docker-zeppelin ● SFPD Incidents: https://data.sfgov.org/Public-Safety/Police-Department-Incidents/tmn f-yvry 60
  • 61.
    Thank you forthe attention! 61
  • 62.
    References 62 ● Michael Armbrust- STRUCTURING SPARK: DATAFRAMES, DATASETS AND STREAMING - https://spark-summit.org/2016/events/structuring-spark-dataframes-datasets-and-streaming/ ● Apache Parquet - https://parquet.apache.org/ ● Spark Performance: What's Next - https://spark-summit.org/east-2016/events/spark-performance-whats-next/ ● Avoid groupByKey - https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reduceby key_over_groupbykey.html