twitter: @rabbitonweb, email: paul.szulc@gmail.com Apache Spark Workshops by Paweł Szulc
twitter: @rabbitonweb, email: paul.szulc@gmail.com Before we start Make sure you’ve installed: JDK, Scala, SBT Clone project: https://github.com/rabbitonweb/spark-workshop Run `sbt compile` on it to fetch all dependencies
twitter: @rabbitonweb, email: paul.szulc@gmail.com What we’re going to cover?
twitter: @rabbitonweb, email: paul.szulc@gmail.com What is Apache Spark?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Apache Spark
twitter: @rabbitonweb, email: paul.szulc@gmail.com Apache Spark “Apache Spark™ is a fast and general engine for large-scale data processing.”
twitter: @rabbitonweb, email: paul.szulc@gmail.com Apache Spark “Apache Spark™ is a fast and general engine for large-scale data processing.”
twitter: @rabbitonweb, email: paul.szulc@gmail.com Why?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Why? buzzword: Big Data
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is like...
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is like... “Big Data is like teenage sex:
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is like... “Big Data is like teenage sex: everyone talks about it,
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is like... “Big Data is like teenage sex: everyone talks about it, nobody really knows how to do it,
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is like... “Big Data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it,
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is like... “Big Data is like teenage sex: everyone talks about it, nobody really knows how to do it, everyone thinks everyone else is doing it, so everyone claims they are doing it”
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is all about...
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is all about... ● well, the data :)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is all about... ● well, the data :) ● It is said that 2.5 exabytes (2.5×10^18) of data is being created around the world every single day
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is all about... “Every two days, we generate as much information as we did from the dawn of civilization until 2003” -- Eric Schmidt Former CEO Google
twitter: @rabbitonweb, email: paul.szulc@gmail.com source: http://papyrus.greenville.edu/2014/03/selfiesteem/
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is all about... ● well, the data :) ● It is said that 2.5 exabytes (2.5×10^18) of data is being created around the world every single day
twitter: @rabbitonweb, email: paul.szulc@gmail.com Big Data is all about... ● well, the data :) ● It is said that 2.5 exabytes (2.5×10^18) of data is being created around the world every single day ● It's a capacity on which you can not any longer use standard tools and methods of evaluation
twitter: @rabbitonweb, email: paul.szulc@gmail.com Challenges of Big Data ● The gathering ● Processing and discovery ● Present it to business ● Hardware and network failures
twitter: @rabbitonweb, email: paul.szulc@gmail.com Challenges of Big Data ● The gathering ● Processing and discovery ● Present it to business ● Hardware and network failures
twitter: @rabbitonweb, email: paul.szulc@gmail.com What was before?
twitter: @rabbitonweb, email: paul.szulc@gmail.com To the rescue MAP REDUCE
twitter: @rabbitonweb, email: paul.szulc@gmail.com To the rescue MAP REDUCE “'MapReduce' is a framework for processing parallelizable problems across huge datasets using a cluster, taking into consideration scalability and fault-tolerance”
twitter: @rabbitonweb, email: paul.szulc@gmail.com MapReduce - phases (1) Map Reduce is combined of sequences of two phases:
twitter: @rabbitonweb, email: paul.szulc@gmail.com MapReduce - phases (1) Map Reduce is combined of sequences of two phases: 1. Map
twitter: @rabbitonweb, email: paul.szulc@gmail.com MapReduce - phases (1) Map Reduce is combined of sequences of two phases: 1. Map 2. Reduce
twitter: @rabbitonweb, email: paul.szulc@gmail.com MapReduce - phases (1) Map Reduce is combined of sequences of two phases: 1. Map 2. Reduce
twitter: @rabbitonweb, email: paul.szulc@gmail.com MapReduce - phases (2) Map Reduce is combined of sequences of two phases: 1. Map 2. Reduce
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word Count ● The “Hello World” of Big Data world. ● For initial input of multiple lines, extract all words with number of occurrences To be or not to be Let it be Be me It must be Let it be be 7 to 2 let 2 or 1 not 1 must 2 me 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input To be or not to be Let it be Be me It must be Let it be
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting Mapping To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me to 1 be 1 or 1 not 1 to 1 be 1 let 1 it 1 be 1 be 1 me 1 let 1 it 1 be 1 it 1 must 1 be 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting Mapping Shuffling To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me to 1 be 1 or 1 not 1 to 1 be 1 let 1 it 1 be 1 be 1 me 1 let 1 it 1 be 1 it 1 must 1 be 1 be 1 be 1 be 1 be 1 be 1 be 1 to 1 to 1 or 1 not 1 let 1 let 1 must 1 me 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting Mapping Shuffling To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me to 1 be 1 or 1 not 1 to 1 be 1 let 1 it 1 be 1 be 1 me 1 let 1 it 1 be 1 it 1 must 1 be 1 be 1 be 1 be 1 be 1 be 1 be 1 to 1 to 1 or 1 not 1 let 1 let 1 must 1 me 1 EXPENSIVE
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting Mapping Shuffling To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me to 1 be 1 or 1 not 1 to 1 be 1 let 1 it 1 be 1 be 1 me 1 let 1 it 1 be 1 it 1 must 1 be 1 be 1 be 1 be 1 be 1 be 1 be 1 to 1 to 1 or 1 not 1 let 1 let 1 must 1 me 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting Mapping Shuffling Reducing To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me to 1 be 1 or 1 not 1 to 1 be 1 let 1 it 1 be 1 be 1 me 1 let 1 it 1 be 1 it 1 must 1 be 1 be 1 be 1 be 1 be 1 be 1 be 1 to 1 to 1 or 1 not 1 let 1 let 1 must 1 me 1 be 6 to 2 or 1 not 1 let 2 must 1 me 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Input Splitting Mapping Shuffling Reducing Final result To be or not to be Let it be Be me It must be Let it be To be or not to be Let it be It must be Let it be Be me to 1 be 1 or 1 not 1 to 1 be 1 let 1 it 1 be 1 be 1 me 1 let 1 it 1 be 1 it 1 must 1 be 1 be 1 be 1 be 1 be 1 be 1 be 1 to 1 to 1 or 1 not 1 let 1 let 1 must 1 me 1 be 6 to 2 or 1 not 1 let 2 must 1 me 1 be 6 to 2 let 2 or 1 not 1 must 2 me 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count - pseudo-code function map(String name, String document): for each word w in document: emit (w, 1)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count - pseudo-code function map(String name, String document): for each word w in document: emit (w, 1) function reduce(String word, Iterator partialCounts): sum = 0 for each pc in partialCounts: sum += ParseInt(pc) emit (word, sum)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Map-Reduce-Map-Reduce-Map-Red..
twitter: @rabbitonweb, email: paul.szulc@gmail.com Why?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Why Apache Spark? We have MapReduce open-sourced implementation (Hadoop) running successfully for the last 12 years. Why to bother?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Problems with Map Reduce 1. MapReduce provides a difficult programming model for developers
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count - revisited function map(String name, String document): for each word w in document: emit (w, 1) function reduce(String word, Iterator partialCounts): sum = 0 for each pc in partialCounts: sum += ParseInt(pc) emit (word, sum)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count: Hadoop implementation 15 public class WordCount { 16 17 public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { 18 private final static IntWritable one = new IntWritable(1); 19 private Text word = new Text(); 20 21 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 22 String line = value.toString(); 23 StringTokenizer tokenizer = new StringTokenizer(line); 24 while (tokenizer.hasMoreTokens()) { 25 word.set(tokenizer.nextToken()); 26 context.write(word, one); 27 } 28 } 29 } 30 31 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { 33 public void reduce(Text key, Iterable<IntWritable> values, Context context) 34 throws IOException, InterruptedException { 35 int sum = 0; 36 for (IntWritable val : values) { sum += val.get(); } 39 context.write(key, new IntWritable(sum)); 40 } 41 } 43 public static void main(String[] args) throws Exception { 44 Configuration conf = new Configuration(); 46 Job job = new Job(conf, "wordcount"); 48 job.setOutputKeyClass(Text.class); 49 job.setOutputValueClass(IntWritable.class); 51 job.setMapperClass(Map.class); 52 job.setReducerClass(Reduce.class); 54 job.setInputFormatClass(TextInputFormat.class);
twitter: @rabbitonweb, email: paul.szulc@gmail.com Problems with Map Reduce 1. MapReduce provides a difficult programming model for developers
twitter: @rabbitonweb, email: paul.szulc@gmail.com Problems with Map Reduce 1. MapReduce provides a difficult programming model for developers 2. It suffers from a number of performance issues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Performance issues ● Map-Reduce pair combination
twitter: @rabbitonweb, email: paul.szulc@gmail.com Performance issues ● Map-Reduce pair combination ● Output saved to the file
twitter: @rabbitonweb, email: paul.szulc@gmail.com Performance issues ● Map-Reduce pair combination ● Output saved to the file ● Iterative algorithms go through IO path again and again
twitter: @rabbitonweb, email: paul.szulc@gmail.com Performance issues ● Map-Reduce pair combination ● Output saved to the file ● Iterative algorithms go through IO path again and again ● Poor API (key, value), even basic join requires expensive code
twitter: @rabbitonweb, email: paul.szulc@gmail.com Problems with Map Reduce 1. MapReduce provides a difficult programming model for developers 2. It suffers from a number of performance issues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Problems with Map Reduce 1. MapReduce provides a difficult programming model for developers 2. It suffers from a number of performance issues 3. While batch-mode analysis is still important, reacting to events as they arrive has become more important (lack support of “almost” real-time)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Apache Spark to the rescue
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Cluster (Standalone, Yarn, Mesos)
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos)
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) SPARK API: 1. Scala 2. Java 3. Python
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) SPARK API: 1. Scala 2. Java 3. Python Master
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) Master
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt”
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master)
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf)
twitter: @rabbitonweb, email: paul.szulc@gmail.com The Big Picture Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) Executor 1 Executor 2 Executor 3
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - the definition
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - the definition RDD stands for resilient distributed dataset
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - the definition RDD stands for resilient distributed dataset Resilient - if data is lost, data can be recreated
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - the definition RDD stands for resilient distributed dataset Resilient - if data is lost, data can be recreated Distributed - stored in nodes among the cluster
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - the definition RDD stands for resilient distributed dataset Resilient - if data is lost, data can be recreated Distributed - stored in nodes among the cluster Dataset - initial data comes from a file or can be created programmatically
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("hdfs://logs.txt")
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("hdfs://logs.txt") From Hadoop Distributed File System
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("hdfs://logs.txt") From Hadoop Distributed File System This is the RDD
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("/home/rabbit/logs.txt") From local file system (must be available on executors) This is the RDD
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.parallelize(List(1, 2, 3, 4)) Programmatically from a collection of elements This is the RDD
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt")
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase)
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) Creates a new RDD
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”))
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”)) And yet another RDD
twitter: @rabbitonweb, email: paul.szulc@gmail.com Getting started with Spark https://github.com/rabbitonweb/spark-workshop /src/main/scala/sw/ex1/ /src/main/resources/all-shakespeare.txt
twitter: @rabbitonweb, email: paul.szulc@gmail.com Getting started with Spark https://github.com/rabbitonweb/spark-workshop ● Make it a love story: Print out all lines that have both Juliet & Romeo in it http://spark.apache. org/docs/latest/api/scala/index.html#org.
twitter: @rabbitonweb, email: paul.szulc@gmail.com Getting started with Spark https://github.com/rabbitonweb/spark-workshop ● Make it a love story: Print out all lines that have both Juliet & Romeo in it ● Would be nice to have a REPL
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”)) And yet another RDD
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”)) And yet another RDD Performance Alert?!?!
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - Operations 1. Transformations a. Map b. Filter c. FlatMap d. Sample e. Union f. Intersect g. Distinct h. GroupByKey i. …. 2. Actions a. Reduce b. Collect c. Count d. First e. Take(n) f. TakeSample g. SaveAsTextFile h. ….
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”))
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”)) val numberOfErrors = errors.count
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”)) val numberOfErrors = errors.count This will trigger the computation
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - example val logs = sc.textFile("logs.txt") val lcLogs = logs.map(_.toLowerCase) val errors = lcLogs.filter(_.contains(“error”)) val numberOfErrors = errors.count This will trigger the computation This will the calculated value (Int)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Example 2 - other actions https://github.com/rabbitonweb/spark-workshop /src/main/scala/ex2/
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 2: ● Save results of your calculations as text file
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 2: ● Save results of your calculations as text file ● Hint: saveAsTextFile
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 2: ● Save results of your calculations as text file ● Hint: saveAsTextFile ● Why the output is so weird?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Partitions? A partition represents subset of data within your distributed collection.
twitter: @rabbitonweb, email: paul.szulc@gmail.com Partitions? A partition represents subset of data within your distributed collection. Number of partitions tightly coupled with level of parallelism.
Partitions evaluation val counted = sc.textFile(..).count
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
Partitions evaluation val counted = sc.textFile(..).count node 1 node 2 node 3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) Executor 1 Executor 2 Executor 3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) Executor 1 Executor 2 Executor 3 HDFS, GlusterFS, locality
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Task = partition + calculation Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Example 3 - working with key-value https://github.com/rabbitonweb/spark-workshop /src/main/scala/ex3/
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 3 - working with key-value ● Change sw.ex3.Startings to sort the result by key ● Write program that answers following: which char starts most often in all-shakespeare.txt
twitter: @rabbitonweb, email: paul.szulc@gmail.com Pipeline
twitter: @rabbitonweb, email: paul.szulc@gmail.com Pipeline map
twitter: @rabbitonweb, email: paul.szulc@gmail.com Pipeline map count
twitter: @rabbitonweb, email: paul.szulc@gmail.com Pipeline map count task
twitter: @rabbitonweb, email: paul.szulc@gmail.com Pipeline map count task
twitter: @rabbitonweb, email: paul.szulc@gmail.com Pipeline map count task
twitter: @rabbitonweb, email: paul.szulc@gmail.com But what if... val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com But what if... val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com But what if... filter val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com And now what? filter val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com And now what? filter mapValues val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com And now what? filter val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter groupyBy val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter mapValuesgroupyBy val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length }
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKeygroupyBy val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length } mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKeygroupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task Wait for calculations on all partitions before moving on groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task groupyBy Data flying around through cluster mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKey task task groupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Shuffling filter reduceByKeygroupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com stage1 Stage filter reduceByKeygroupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com sda stage2stage1 Stage filter reduceByKeygroupyBy mapValues
twitter: @rabbitonweb, email: paul.szulc@gmail.com Directed Acyclic Graph
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 4 - DAG that ● Open sw.ex3.Ex4.scala ● You will find three programs: ○ StagesStagesA ○ StagesStagesB ○ StagesStagesC ● Can you tell how DAG will look like for all three?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Directed Acyclic Graph val startings = allShakespeare .filter(_.trim != "") .map(line => (line.charAt(0), line)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length } println(startings.toDebugString)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Directed Acyclic Graph val startings = allShakespeare .filter(_.trim != "") .map(line => (line.charAt(0), line)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length } println(startings.toDebugString) (2) ShuffledRDD[5] at reduceByKey at Ex3.scala:18 [] +-(2) MapPartitionsRDD[4] at mapValues at Ex3.scala:17 [] | MapPartitionsRDD[3] at map at Ex3.scala:16 [] | MapPartitionsRDD[2] at filter at Ex3.scala:15 [] | src/main/resources/all-shakespeare.txt MapPartitionsRDD[1] | src/main/resources/all-shakespeare.txt HadoopRDD[0] at textFile
twitter: @rabbitonweb, email: paul.szulc@gmail.com Directed Acyclic Graph val startings = allShakespeare .filter(_.trim != "") .groupBy(_.charAt(0)) .mapValues(_.size) .reduceByKey { case (acc, length) => acc + length } println(startings.toDebugString) (2) MapPartitionsRDD[6] at reduceByKey at Ex3.scala:42 | MapPartitionsRDD[5] at mapValues at Ex3.scala:41 | ShuffledRDD[4] at groupBy at Ex3.scala:40 +-(2) MapPartitionsRDD[3] at groupBy at Ex3.scala:40 | MapPartitionsRDD[2] at filter at Ex3.scala:39 | src/main/resources/all-shakespeare.txt MapPartitionsRDD[1] | src/main/resources/all-shakespeare.txt HadoopRDD[0]
twitter: @rabbitonweb, email: paul.szulc@gmail.com RDD - the definition RDD stands for resilient distributed dataset Resilient - if data is lost, data can be recreated Distributed - stored in nodes among the cluster Dataset - initial data comes from a file or can be created programmatically
twitter: @rabbitonweb, email: paul.szulc@gmail.com What about Resilience? RDD stands for resilient distributed dataset Resilient - if data is lost, data can be recreated Distributed - stored in nodes among the cluster Dataset - initial data comes from a file or can be created programmatically
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 Executor 3 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 EDeDEADutor 3 T1 T2
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Resilience Driver Program Cluster (Standalone, Yarn, Mesos) Master val master = “spark://host:pt” val conf = new SparkConf() .setMaster(master) val sc = new SparkContext (conf) val logs = sc.textFile(“logs. txt”) println(logs.count()) Executor 1 Executor 2 T1 T2 T3
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 5 - The Big Data problem ● Write a Word Count program using Spark ● Use all-shakespeare.txt as input To be or not to be Let it be Be me It must be Let it be be 7 to 2 let 2 or 1 not 1 must 2 me 1
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines Scala solution
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) Scala solution
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq Scala solution
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) Scala solution
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) .map { case (word, group) => (word, group.size) } Scala solution
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) .map { case (word, group) => (word, group.size) } val wc = new SparkContext(“local”, “Word Count”).textFile(args(0)) Scala solution Spark solution (in Scala language)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) .map { case (word, group) => (word, group.size) } val wc = new SparkContext(“local”, “Word Count”).textFile(args(0)) .map(line => line.toLowerCase) Scala solution Spark solution (in Scala language)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) .map { case (word, group) => (word, group.size) } val wc = new SparkContext(“local”, “Word Count”).textFile(args(0)) .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")) Scala solution Spark solution (in Scala language)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) .map { case (word, group) => (word, group.size) } val wc = new SparkContext(“local”, “Word Count”).textFile(args(0)) .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")) .groupBy(identity) Scala solution Spark solution (in Scala language)
twitter: @rabbitonweb, email: paul.szulc@gmail.com Word count once again val wc = scala.io.Source.fromFile(args(0)).getLines .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")).toSeq .groupBy(identity) .map { case (word, group) => (word, group.size) } val wc = new SparkContext(“local”, “Word Count”).textFile(args(0)) .map(line => line.toLowerCase) .flatMap(line => line.split("""W+""")) .groupBy(identity) .map { case (word, group) => (word, group.size) } Scala solution Spark solution (in Scala language)
twitter: @rabbitonweb, email: paul.szulc@gmail.com But that solution has major flaw
twitter: @rabbitonweb, email: paul.szulc@gmail.com But that solution has major flaw ● Flaw: groupBy
twitter: @rabbitonweb, email: paul.szulc@gmail.com But that solution has major flaw ● Flaw: groupBy ● But before we do understand it, we have to: ○ instantiate a Standalone cluster ○ understand how cluster works ○ talk about serialization (& uber jar!) ○ see the Spark UI ○ talk about Spark configuration
twitter: @rabbitonweb, email: paul.szulc@gmail.com But that solution has major flaw ● Flaw: groupBy ● But before we do understand it, we have to: ○ instantiate a Standalone cluster ○ understand how cluster works ○ talk about serialization ○ see the Spark UI ○ talk about Spark configuration ● http://spark.apache. org/docs/latest/configuration.html
twitter: @rabbitonweb, email: paul.szulc@gmail.com But that solution has major flaw ● What can we do about it? ● Something spooky: let’s see Spark code!
twitter: @rabbitonweb, email: paul.szulc@gmail.com Mid-term exam ● Given all-shakespeare.txt ● Given names popularity in male-names.txt & female-names. txt ● Show how given name is popular nowadays & how many times it occurred in Shakespeare ● Result: key-value pair (key: name, value: pair) E.g Romeo is mentioned 340 in Shakespeare Romeo is nowadays 688th popular name So result will be: (romeo,(688,340))
What is a RDD?
What is a RDD? Resilient Distributed Dataset
What is a RDD? Resilient Distributed Dataset
... 10 10/05/2015 10:14:01 UserInitialized Ania Nowak 10 10/05/2015 10:14:55 FirstNameChanged Anna 12 10/05/2015 10:17:03 UserLoggedIn 12 10/05/2015 10:21:31 UserLoggedOut … 198 13/05/2015 21:10:11 UserInitialized Jan Kowalski What is a RDD?
node 1 ... 10 10/05/2015 10:14:01 UserInitialized Ania Nowak 10 10/05/2015 10:14:55 FirstNameChanged Anna 12 10/05/2015 10:17:03 UserLoggedIn 12 10/05/2015 10:21:31 UserLoggedOut … 198 13/05/2015 21:10:11 UserInitialized Jan Kowalski node 2 node 3 What is a RDD?
node 1 ... 10 10/05/2015 10:14:01 UserInitialized Ania Nowak 10 10/05/2015 10:14:55 FirstNameChanged Anna 12 10/05/2015 10:17:03 UserLoggedIn 12 10/05/2015 10:21:31 UserLoggedOut … 198 13/05/2015 21:10:11 UserInitialized Jan Kowalski ... 10 10/05/2015 10:14:01 UserInitialized Ania Nowak 10 10/05/2015 10:14:55 FirstNameChanged Anna 12 10/05/2015 10:17:03 UserLoggedIn 12 10/05/2015 10:21:31 UserLoggedOut … 198 13/05/2015 21:10:11 UserInitialized Jan Kowalski node 2 node 3 ... 10 10/05/2015 10:14:01 UserInitialized Ania Nowak 10 10/05/2015 10:14:55 FirstNameChanged Anna 12 10/05/2015 10:17:03 UserLoggedIn 12 10/05/2015 10:21:31 UserLoggedOut … 198 13/05/2015 21:10:11 UserInitialized Jan Kowalski ... 10 10/05/2015 10:14:01 UserInitialized Ania Nowak 10 10/05/2015 10:14:55 FirstNameChanged Anna 12 10/05/2015 10:17:03 UserLoggedIn 12 10/05/2015 10:21:31 UserLoggedOut … 198 13/05/2015 21:10:11 UserInitialized Jan Kowalski What is a RDD?
What is a RDD?
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work:
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how to evaluate its internal data
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how to evaluate its internal data
What is a partition? A partition represents subset of data within your distributed collection.
What is a partition? A partition represents subset of data within your distributed collection. override def getPartitions: Array[Partition] = ???
What is a partition? A partition represents subset of data within your distributed collection. override def getPartitions: Array[Partition] = ??? How this subset is defined depends on type of the RDD
example: HadoopRDD val journal = sc.textFile(“hdfs://journal/*”)
example: HadoopRDD val journal = sc.textFile(“hdfs://journal/*”) How HadoopRDD is partitioned?
example: HadoopRDD val journal = sc.textFile(“hdfs://journal/*”) How HadoopRDD is partitioned? In HadoopRDD partition is exactly the same as file chunks in HDFS
example: HadoopRDD 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
example: HadoopRDD node 1 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni node 2 node 3 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
example: HadoopRDD node 1 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni node 2 node 3 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
example: HadoopRDD node 1 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni node 2 node 3 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
example: HadoopRDD node 1 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni node 2 node 3 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
example: HadoopRDD node 1 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni node 2 node 3 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
example: HadoopRDD class HadoopRDD[K, V](...) extends RDD[(K, V)](sc, Nil) with Logging { ... override def getPartitions: Array[Partition] = { val jobConf = getJobConf() SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array }
example: HadoopRDD class HadoopRDD[K, V](...) extends RDD[(K, V)](sc, Nil) with Logging { ... override def getPartitions: Array[Partition] = { val jobConf = getJobConf() SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array }
example: HadoopRDD class HadoopRDD[K, V](...) extends RDD[(K, V)](sc, Nil) with Logging { ... override def getPartitions: Array[Partition] = { val jobConf = getJobConf() SparkHadoopUtil.get.addCredentials(jobConf) val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) } val inputSplits = inputFormat.getSplits(jobConf, minPartitions) val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array }
example: MapPartitionsRDD val journal = sc.textFile(“hdfs://journal/*”) val fromMarch = journal.filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
example: MapPartitionsRDD val journal = sc.textFile(“hdfs://journal/*”) val fromMarch = journal.filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } How MapPartitionsRDD is partitioned?
example: MapPartitionsRDD val journal = sc.textFile(“hdfs://journal/*”) val fromMarch = journal.filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } How MapPartitionsRDD is partitioned? MapPartitionsRDD inherits partition information from its parent RDD
example: MapPartitionsRDD class MapPartitionsRDD[U: ClassTag, T: ClassTag](...) extends RDD[U](prev) { ... override def getPartitions: Array[Partition] = firstParent[T].partitions
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how to evaluate its internal data
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how to evaluate its internal data
RDD parent sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } .take(300) .foreach(println)
RDD parent sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } .take(300) .foreach(println)
RDD parent sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD ShuffeledRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach() Two types of parent dependencies: 1. narrow dependency 2. wider dependency
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach() Two types of parent dependencies: 1. narrow dependency 2. wider dependency
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach() Two types of parent dependencies: 1. narrow dependency 2. wider dependency
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach() Tasks
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach() Tasks
Directed acyclic graph HadoopRDD ShuffeledRDD MapPartRDD MapPartRDD sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Directed acyclic graph sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
Stage 1 Stage 2 Directed acyclic graph sc.textFile() .groupBy() .map { } .filter { } .take() .foreach()
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data
Stage 1 Stage 2 Running Job aka materializing DAG sc.textFile() .groupBy() .map { } .filter { }
Stage 1 Stage 2 Running Job aka materializing DAG sc.textFile() .groupBy() .map { } .filter { } .collect()
Stage 1 Stage 2 Running Job aka materializing DAG sc.textFile() .groupBy() .map { } .filter { } .collect() action
Stage 1 Stage 2 Running Job aka materializing DAG sc.textFile() .groupBy() .map { } .filter { } .collect() action Actions are implemented using sc.runJob method
Running Job aka materializing DAG /** * Run a function on a given set of partitions in an RDD and return the results as an array. */ def runJob[T, U]( ): Array[U]
Running Job aka materializing DAG /** * Run a function on a given set of partitions in an RDD and return the results as an array. */ def runJob[T, U]( rdd: RDD[T], ): Array[U]
Running Job aka materializing DAG /** * Run a function on a given set of partitions in an RDD and return the results as an array. */ def runJob[T, U]( rdd: RDD[T], partitions: Seq[Int], ): Array[U]
Running Job aka materializing DAG /** * Run a function on a given set of partitions in an RDD and return the results as an array. */ def runJob[T, U]( rdd: RDD[T], partitions: Seq[Int], func: Iterator[T] => U, ): Array[U]
Running Job aka materializing DAG /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
Running Job aka materializing DAG /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } /** * Return the number of elements in the RDD. */ def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
Multiple jobs for single action /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the results from that partition to estimate the number of additional partitions needed to satisfy the limit. */ def take(num: Int): Array[T] = { (….) val left = num - buf.size val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) (….) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += numPartsToTry (….) buf.toArray }
Lets test what we’ve learned
Towards efficiency val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Towards efficiency val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } scala> events.toDebugString (4) MapPartitionsRDD[22] at filter at <console>:50 [] | MapPartitionsRDD[21] at map at <console>:49 [] | ShuffledRDD[20] at groupBy at <console>:48 [] +-(6) HadoopRDD[17] at textFile at <console>:47 []
Towards efficiency val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } scala> events.toDebugString (4) MapPartitionsRDD[22] at filter at <console>:50 [] | MapPartitionsRDD[21] at map at <console>:49 [] | ShuffledRDD[20] at groupBy at <console>:48 [] +-(6) HadoopRDD[17] at textFile at <console>:47 [] events.count
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 1 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Everyday I’m Shuffling
Everyday I’m Shuffling val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Everyday I’m Shuffling val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Everyday I’m Shuffling val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Everyday I’m Shuffling val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Everyday I’m Shuffling val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) } node 1 node 2 node 3
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Stage 2 node 1 node 2 node 3 val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } .filter { case (date, size) => LocalDate.parse(date) isAfter LocalDate.of(2015,3,1) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _) .map { case (date, events) => (date, events.size) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _) .map { case (date, events) => (date, events.size) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } node 1 node 2 node 3
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } node 1 node 2 node 3
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _) .map { case (date, events) => (date, events.size) } node 1 node 2 node 3
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _, 6) .map { case (date, events) => (date, events.size) } node 1 node 2 node 3
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _, 6) .map { case (date, events) => (date, events.size) } node 1 node 2 node 3
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _, 6) .map { case (date, events) => (date, events.size) } node 1 node 2 node 3
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .groupBy(extractDate _, 6) .map { case (date, events) => (date, events.size) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .map( e => (extractDate(e), e)) .combineByKey(e => 1, (counter: Int,e: String) => counter + 1,(c1: Int, c2: Int) => c1 + c2) .groupBy(extractDate _, 6) .map { case (date, events) => (date, events.size) }
Let's refactor val events = sc.textFile(“hdfs://journal/*”) .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .map( e => (extractDate(e), e)) .combineByKey(e => 1, (counter: Int,e: String) => counter + 1,(c1: Int, c2: Int) => c1 + c2)
A bit more about partitions val events = sc.textFile(“hdfs://journal/*”) // here small number of partitions, let’s say 4 .map( e => (extractDate(e), e))
A bit more about partitions val events = sc.textFile(“hdfs://journal/*”) // here small number of partitions, let’s say 4 .repartition(256) // note, this will cause a shuffle .map( e => (extractDate(e), e))
A bit more about partitions val events = sc.textFile(“hdfs://journal/*”) // here a lot of partitions, let’s say 1024 .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .map( e => (extractDate(e), e))
A bit more about partitions val events = sc.textFile(“hdfs://journal/*”) // here a lot of partitions, let’s say 1024 .filter { LocalDate.parse(extractDate _) isAfter LocalDate.of(2015,3,1) } .coalesce(64) // this will NOT shuffle .map( e => (extractDate(e), e))
What is a RDD? RDD needs to hold 3 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data
What is a RDD? RDD needs to hold 3 + 2 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data 4. data locality 5. paritioner
What is a RDD? RDD needs to hold 3 + 2 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data 4. data locality 5. paritioner
Data Locality: HDFS example node 1 10 10/05/2015 10:14:01 UserInit 3 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 4 10/05/2015 10:21:31 UserLo 5 13/05/2015 21:10:11 UserIni node 2 node 3 16 10/05/2015 10:14:01 UserInit 20 10/05/2015 10:14:55 FirstNa 42 10/05/2015 10:17:03 UserLo 67 10/05/2015 10:21:31 UserLo 12 13/05/2015 21:10:11 UserIni 10 10/05/2015 10:14:01 UserInit 10 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 12 10/05/2015 10:21:31 UserLo 198 13/05/2015 21:10:11 UserIni 5 10/05/2015 10:14:01 UserInit 4 10/05/2015 10:14:55 FirstNa 12 10/05/2015 10:17:03 UserLo 142 10/05/2015 10:21:31 UserLo 158 13/05/2015 21:10:11 UserIni
What is a RDD? RDD needs to hold 3 + 2 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data 4. data locality 5. paritioner
What is a RDD? RDD needs to hold 3 + 2 chunks of information in order to do its work: 1. pointer to his parent 2. how its internal data is partitioned 3. how evaluate its internal data 4. data locality 5. paritioner
Spark performance - shuffle optimization map groupBy
Spark performance - shuffle optimization map groupBy
Spark performance - shuffle optimization map groupBy join
Spark performance - shuffle optimization map groupBy join
Spark performance - shuffle optimization map groupBy join Optimization: shuffle avoided if data is already partitioned
twitter: @rabbitonweb, email: paul.szulc@gmail.com Example 6 - Using partitioner ● sw/ex6/Ex6.scala
Spark performance - shuffle optimization map groupBy join Optimization: shuffle avoided if data is already partitioned
Spark performance - shuffle optimization map groupBy map
Spark performance - shuffle optimization map groupBy map
Spark performance - shuffle optimization map groupBy map join
Spark performance - shuffle optimization map groupBy map join
Spark performance - shuffle optimization map groupBy mapValues
Spark performance - shuffle optimization map groupBy mapValues
Spark performance - shuffle optimization map groupBy mapValues join
Spark performance - shuffle optimization map groupBy mapValues join
twitter: @rabbitonweb, email: paul.szulc@gmail.com Example 6 - Using partitioner ● sw/ex6/Ex6.scala
twitter: @rabbitonweb, email: paul.szulc@gmail.com Example 6 - Using partitioner ● sw/ex6/Ex6.scala ● How can I know which transformations preserve partitioner?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Exercise 6 - Can I be better? ● Open sw/ex6/Ex6.scala ● Program ‘Join’ is not performing well ○ Can you tell why? ○ What should be done to fix it?
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching map groupBy
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching map groupBy
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching map groupBy filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching ● .persist() & .cache() methods map groupBy persist
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching ● .persist() & .cache() methods map groupBy persist filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching - where is it cached? ● How cache is stored depends on storage level
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching - where is it cached? ● How cache is stored depends on storage level ● Levels: ○ MEMORY_ONLY ○ MEMORY_AND_DISK ○ MEMORY_ONLY_SER ○ MEMORY_AND_DISK_SER ○ DISK_ONLY ○ OFF_HEAP
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching ● .persist() & .cache() methods map groupBy persist filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching ● .persist() & .cache() methods ● caching is fault-tolerant! map groupBy persist filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching ● .persist() & .cache() methods ● caching is fault-tolerant! map groupBy persist filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com Caching - when it can be useful? http://people.csail.mit. edu/matei/papers/2012/nsdi_spark.pdf
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - caching
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - caching
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - vs Hadoop (3) “(...) we decided to participate in the Sort Benchmark (...), an industry benchmark on how fast a system can sort 100 TB of data (1 trillion records).
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - vs Hadoop (3) “(...) we decided to participate in the Sort Benchmark (...), an industry benchmark on how fast a system can sort 100 TB of data (1 trillion records). The previous world record was 72 minutes, set by (...) Hadoop (...) cluster of 2100 nodes.
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - vs Hadoop (3) “(...) we decided to participate in the Sort Benchmark (...), an industry benchmark on how fast a system can sort 100 TB of data (1 trillion records). The previous world record was 72 minutes, set by (...) Hadoop (...) cluster of 2100 nodes. Using Spark on 206 nodes, we completed the benchmark in 23 minutes.
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - vs Hadoop (3) “(...) we decided to participate in the Sort Benchmark (...), an industry benchmark on how fast a system can sort 100 TB of data (1 trillion records). The previous world record was 72 minutes, set by (...) Hadoop (...) cluster of 2100 nodes. Using Spark on 206 nodes, we completed the benchmark in 23 minutes. This means that Spark sorted the same data 3X faster using 10X fewer machines.
twitter: @rabbitonweb, email: paul.szulc@gmail.com Spark performance - vs Hadoop (3) “(...) we decided to participate in the Sort Benchmark (...), an industry benchmark on how fast a system can sort 100 TB of data (1 trillion records). The previous world record was 72 minutes, set by (...) Hadoop (...) cluster of 2100 nodes. Using Spark on 206 nodes, we completed the benchmark in 23 minutes. This means that Spark sorted the same data 3X faster using 10X fewer machines. All (...) without using Spark’s in-memory cache.”
twitter: @rabbitonweb, email: paul.szulc@gmail.com Example 7 - Save me maybe ● /sw/ex7/Ex7.scala
twitter: @rabbitonweb, email: paul.szulc@gmail.com Checkpointing ● .checkpoint() map groupBy filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com Checkpointing ● .checkpoint() checkpoint filter
twitter: @rabbitonweb, email: paul.szulc@gmail.com And that is all folks! Pawel Szulc Email: paul.szulc@gmail.com Twitter: @rabbitonweb Blog: http://rabbitonweb.com
twitter: @rabbitonweb, email: paul.szulc@gmail.com And that is all folks!
twitter: @rabbitonweb, email: paul.szulc@gmail.com And that is all folks! Pawel Szulc
twitter: @rabbitonweb, email: paul.szulc@gmail.com And that is all folks! Pawel Szulc Email: paul.szulc@gmail.com
twitter: @rabbitonweb, email: paul.szulc@gmail.com And that is all folks! Pawel Szulc Email: paul.szulc@gmail.com Twitter: @rabbitonweb
twitter: @rabbitonweb, email: paul.szulc@gmail.com And that is all folks! Pawel Szulc Email: paul.szulc@gmail.com Twitter: @rabbitonweb Blog: http://rabbitonweb.com

Apache spark workshop