Tuning and Debugging in Apache Spark Patrick Wendell @pwendell February 20, 2015
About Me Apache Spark committer and PMC, release manager Worked on Spark at UC Berkeley when the project started Today, managing Spark efforts at Databricks 2
About Databricks Founded by creators of Spark in 2013 Donated Spark to ASF and remain largest contributor End-to-End hosted service: Databricks Cloud 3
Today’s Talk Help you understand and debug Spark programs Related talk this afternoon: Assumes you know Spark core API concepts, focused on internals 4
5 Spark’s Execution Model
6 The key to tuning Spark apps is a sound grasp of Spark’s internal mechanisms.
Key Question How does a user program get translated into units of physical execution: jobs, stages, and tasks: 7 ?
RDD API Refresher RDDs are a distributed collection of records rdd = spark.parallelize(range(10000), 10) Transformations create new RDDs from existing ones errors = rdd.filter(lambda line: “ERROR” in line) Actions materialize a value in the user program size = errors.count() 8
RDD API Example // Read input file val input = sc.textFile("input.txt") val tokenized = input .map(line => line.split(" ")) .filter(words => words.size > 0) // remove empty lines val counts = tokenized // frequency of log levels .map(words => (words(0), 1)). .reduceByKey{ (a, b) => a + b, 2 } 9 INFO Server started INFO Bound to port 8080 WARN Cannot find srv.conf input.txt
RDD API Example // Read input file val input = sc.textFile( ) val tokenized = input .map(line => line.split(" ")) .filter(words => words.size > 0) // remove empty lines val counts = tokenized // frequency of log levels .map(words => (words(0), 1)). .reduceByKey{ (a, b) => a + b } 10
Transformations sc.textFile().map().filter().map().reduceByKey() 11
DAG View of RDD’s textFile() map() filter() map() reduceByKey() 12 Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 input tokenized counts
Transformations build up a DAG, but don’t “do anything” 13
Evaluation of the DAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 14
Evaluation of the DAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 15
Evaluation of the DAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 16
Evaluation of the DAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 17
How runJob Works Needs to compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 18 Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 input tokenized counts runJob(counts)
Physical Optimizations 1. Certain types of transformations can be pipelined. 1. If dependent RDD’s have already been cached (or persisted in a shuffle) the graph can be truncated. Once pipelining and truncation occur, Spark produces a a set of stages each stage is composed of tasks 19
How runJob Works Needs to compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 20 Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 input tokenized counts runJob(counts)
How runJob Works Needs to compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 21 input tokenized counts Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 runJob(counts)
How runJob Works Needs to compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 22 input tokenized counts Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 runJob(counts)
Stage Graph 23 Task 1 Task 2 Task 3 Task 1 Task 2 Stage 1 Stage 2 Each task will: 1. Read Hadoop input 2. Perform maps and filters 3. Write partial sums Each task will: 1. Read partial sums 2. Invoke user function passed to runJob.Shuffle write Shuffle readInput
Units of Physical Execution Jobs: Work required to compute RDD in runJob. Stages: A wave of work within a job, corresponding to one or more pipelined RDD’s. Tasks: A unit of work within a stage, corresponding to one RDD partition. Shuffle: The transfer of data between stages. 24
Seeing this on your own scala> counts.toDebugString res84: String = (2) ShuffledRDD[296] at reduceByKey at <console>:17 +-(3) MappedRDD[295] at map at <console>:17 | FilteredRDD[294] at filter at <console>:15 | MappedRDD[293] at map at <console>:15 | input.text MappedRDD[292] at textFile at <console>:13 | input.text HadoopRDD[291] at textFile at <console>:13 25 (indentations indicate a shuffle boundary)
Example: count() action class RDD { def count(): Long = { results = sc.runJob( this, 1. RDD = self 0 until partitions.size, 2. Partitions = all partitions it => it.size() 3. Function = size of the partition ) return results.sum } 26
Example: take(N) action class RDD { def take(n: Int) { val results = new ArrayBuffer[T] var partition = 0 while (results.size < n) { result ++= sc.runJob(this, partition, it => it.toArray) partition = partition + 1 } return results.take(n) } } 27
Putting it All Together 28 Named after action calling runJob Named after last RDD in pipeline
29 Determinants of Performance in Spark
Quantity of Data Shuffled In general, avoiding shuffle will make your program run faster. 1. Use the built in aggregateByKey() operator instead of writing your own aggregations. 2. Filter input earlier in the program rather than later. 3. Go to this afternoon’s talk! 30
Degree of Parallelism > input = sc.textFile("s3n://log-files/2014/*.log.gz") #matches thousands of files > input.getNumPartitions() 35154 > lines = input.filter(lambda line: line.startswith("2014-10-17 08:")) # selective > lines.getNumPartitions() 35154 > lines = lines.coalesce(5).cache() # We coalesce the lines RDD before caching > lines.getNumPartitions() 5 >>> lines.count() # occurs on coalesced RDD 31
Degree of Parallelism If you have a huge number of mostly idle tasks (e.g. 10’s of thousands), then it’s often good to coalesce. If you are not using all slots in your cluster, repartition can increase parallelism. 32
Choice of Serializer Serialization is sometimes a bottleneck when shuffling and caching data. Using the Kryo serializer is often faster. val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Be strict about class registration conf.set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass])) 33
Cache Format By default Spark will cache() data using MEMORY_ONLY level, deserialized JVM objects MEMORY_ONLY_SER can help cut down on GC MEMORY_AND_DISK can avoid expensive recompuations 34
Hardware Spark scales horizontally, so more is better Disk/Memory/Network balance depends on workload: CPU intensive ML jobs vs IO intensive ETL jobs Good to keep executor heap size to 64GB or less (can run multiple on each node) 35
Other Performance Tweaks Switching to LZF compression can improve shuffle performance (sacrifices some robustness for massive shuffles): conf.set(“spark.io.compression.codec”, “lzf”) Turn on speculative execution to help prevent stragglers conf.set(“spark.speculation”, “true”) 36
Other Performance Tweaks Make sure to give Spark as many disks as possible to allow striping shuffle output SPARK_LOCAL_DIRS in Mesos/Standalone In YARN mode, inherits YARN’s local directories 37
38 One Weird Trick for Great Performance
Use Higher Level API’s! DataFrame APIs for core processing Works across Scala, Java, Python and R Spark ML for machine learning Spark SQL for structured query processing 39
40 See also Chapter 8: Tuning and Debugging Spark.
Come to Spark Summit 2015! 41 June 15-17 in San Francisco
Other Spark Happenings Today Spark team “Ask Us Anything” at 2:20 in 211 B Tips for writing better Spark programs at 4:00 in 230C I’ll be around Databricks booth after this 42
Thank you. Any questions? 43
Extra Slides 44
Internals of the RDD Interface 45 1) List of partitions 2) Set of dependencies on parent RDDs 3) Function to compute a partition, given parents 4) Optional partitioning info for k/v RDDs (Partitioner) RDD Partition 1 Partition 2 Partition 3
Example: Hadoop RDD 46 Partitions = 1 per HDFS block Dependencies = None compute(partition) = read corresponding HDFS block Partitioner = None > rdd = spark.hadoopFile(“hdfs://click_logs/”)
Example: Filtered RDD 47 Partitions = parent partitions Dependencies = a single parent compute(partition) = call parent.compute(partition) and filter Partitioner = parent partitioner > filtered = rdd.filter(lambda x: x contains “ERROR”)
Example: Joined RDD 48 Partitions = number chosen by user or heuristics Dependencies = ShuffleDependency on two or more parents compute(partition) = read and join data from all parents Partitioner = HashPartitioner(# partitions)
49 A More Complex DAG Joined RDD Partition 1 Partition 2 Partition 3 Filtered RDDPartition 1 Partition 2 Mapped RDDPartition 1 Partition 2 Hadoop RDDPartition 1 Partition 2 JDBC RDD Partition 1 Partition 2 Filtered RDDPartition 1 Partition 2 Partition 3 .count()
50 A More Complex DAG Stage 3 Task 1 Task 2 Task 3 Stage 2 Task 1 Task 2 Stage 1 Task 1 Task 2 Shuffle Read Shuffle Write
51 RDD Partition 1 Partition 2 Partition 3 Parent Partition 1 Partition 2 Partition 3 Narrow and Wide Transformations RDD Partition 1 Partition 2 Partition 3 Parent 1 Partition 1 Partition 2 Parent 2 Partition 1 Partition 2 FilteredRDD JoinedRDD

Tuning and Debugging in Apache Spark

  • 1.
    Tuning and Debuggingin Apache Spark Patrick Wendell @pwendell February 20, 2015
  • 2.
    About Me Apache Sparkcommitter and PMC, release manager Worked on Spark at UC Berkeley when the project started Today, managing Spark efforts at Databricks 2
  • 3.
    About Databricks Founded bycreators of Spark in 2013 Donated Spark to ASF and remain largest contributor End-to-End hosted service: Databricks Cloud 3
  • 4.
    Today’s Talk Help youunderstand and debug Spark programs Related talk this afternoon: Assumes you know Spark core API concepts, focused on internals 4
  • 5.
  • 6.
    6 The key totuning Spark apps is a sound grasp of Spark’s internal mechanisms.
  • 7.
    Key Question How doesa user program get translated into units of physical execution: jobs, stages, and tasks: 7 ?
  • 8.
    RDD API Refresher RDDsare a distributed collection of records rdd = spark.parallelize(range(10000), 10) Transformations create new RDDs from existing ones errors = rdd.filter(lambda line: “ERROR” in line) Actions materialize a value in the user program size = errors.count() 8
  • 9.
    RDD API Example //Read input file val input = sc.textFile("input.txt") val tokenized = input .map(line => line.split(" ")) .filter(words => words.size > 0) // remove empty lines val counts = tokenized // frequency of log levels .map(words => (words(0), 1)). .reduceByKey{ (a, b) => a + b, 2 } 9 INFO Server started INFO Bound to port 8080 WARN Cannot find srv.conf input.txt
  • 10.
    RDD API Example //Read input file val input = sc.textFile( ) val tokenized = input .map(line => line.split(" ")) .filter(words => words.size > 0) // remove empty lines val counts = tokenized // frequency of log levels .map(words => (words(0), 1)). .reduceByKey{ (a, b) => a + b } 10
  • 11.
  • 12.
    DAG View ofRDD’s textFile() map() filter() map() reduceByKey() 12 Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 input tokenized counts
  • 13.
    Transformations build upa DAG, but don’t “do anything” 13
  • 14.
    Evaluation of theDAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 14
  • 15.
    Evaluation of theDAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 15
  • 16.
    Evaluation of theDAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 16
  • 17.
    Evaluation of theDAG We mentioned “actions” a few slides ago. Let’s forget them for a minute. DAG’s are materialized through a method sc.runJob: def runJob[T, U]( rdd: RDD[T], 1. RDD to compute partitions: Seq[Int], 2. Which partitions func: (Iterator[T]) => U)) 3. Fn to produce results 17
  • 18.
    How runJob Works Needsto compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 18 Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 input tokenized counts runJob(counts)
  • 19.
    Physical Optimizations 1. Certaintypes of transformations can be pipelined. 1. If dependent RDD’s have already been cached (or persisted in a shuffle) the graph can be truncated. Once pipelining and truncation occur, Spark produces a a set of stages each stage is composed of tasks 19
  • 20.
    How runJob Works Needsto compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 20 Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 input tokenized counts runJob(counts)
  • 21.
    How runJob Works Needsto compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 21 input tokenized counts Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 runJob(counts)
  • 22.
    How runJob Works Needsto compute my parents, parents, parents, etc all the way back to an RDD with no dependencies (e.g. HadoopRDD). 22 input tokenized counts Mapped RDD Partition 1 Partition 2 Partition 3 Filtered RDD Partition 1 Partition 2 Partition 3 Mapped RDD Partition 1 Partition 2 Partition 3 Shuffle RDD Partition 1 Partition 2 Hadoop RDD Partition 1 Partition 2 Partition 3 runJob(counts)
  • 23.
    Stage Graph 23 Task 1 Task2 Task 3 Task 1 Task 2 Stage 1 Stage 2 Each task will: 1. Read Hadoop input 2. Perform maps and filters 3. Write partial sums Each task will: 1. Read partial sums 2. Invoke user function passed to runJob.Shuffle write Shuffle readInput
  • 24.
    Units of PhysicalExecution Jobs: Work required to compute RDD in runJob. Stages: A wave of work within a job, corresponding to one or more pipelined RDD’s. Tasks: A unit of work within a stage, corresponding to one RDD partition. Shuffle: The transfer of data between stages. 24
  • 25.
    Seeing this onyour own scala> counts.toDebugString res84: String = (2) ShuffledRDD[296] at reduceByKey at <console>:17 +-(3) MappedRDD[295] at map at <console>:17 | FilteredRDD[294] at filter at <console>:15 | MappedRDD[293] at map at <console>:15 | input.text MappedRDD[292] at textFile at <console>:13 | input.text HadoopRDD[291] at textFile at <console>:13 25 (indentations indicate a shuffle boundary)
  • 26.
    Example: count() action classRDD { def count(): Long = { results = sc.runJob( this, 1. RDD = self 0 until partitions.size, 2. Partitions = all partitions it => it.size() 3. Function = size of the partition ) return results.sum } 26
  • 27.
    Example: take(N) action classRDD { def take(n: Int) { val results = new ArrayBuffer[T] var partition = 0 while (results.size < n) { result ++= sc.runJob(this, partition, it => it.toArray) partition = partition + 1 } return results.take(n) } } 27
  • 28.
    Putting it AllTogether 28 Named after action calling runJob Named after last RDD in pipeline
  • 29.
  • 30.
    Quantity of DataShuffled In general, avoiding shuffle will make your program run faster. 1. Use the built in aggregateByKey() operator instead of writing your own aggregations. 2. Filter input earlier in the program rather than later. 3. Go to this afternoon’s talk! 30
  • 31.
    Degree of Parallelism >input = sc.textFile("s3n://log-files/2014/*.log.gz") #matches thousands of files > input.getNumPartitions() 35154 > lines = input.filter(lambda line: line.startswith("2014-10-17 08:")) # selective > lines.getNumPartitions() 35154 > lines = lines.coalesce(5).cache() # We coalesce the lines RDD before caching > lines.getNumPartitions() 5 >>> lines.count() # occurs on coalesced RDD 31
  • 32.
    Degree of Parallelism Ifyou have a huge number of mostly idle tasks (e.g. 10’s of thousands), then it’s often good to coalesce. If you are not using all slots in your cluster, repartition can increase parallelism. 32
  • 33.
    Choice of Serializer Serializationis sometimes a bottleneck when shuffling and caching data. Using the Kryo serializer is often faster. val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") // Be strict about class registration conf.set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass])) 33
  • 34.
    Cache Format By defaultSpark will cache() data using MEMORY_ONLY level, deserialized JVM objects MEMORY_ONLY_SER can help cut down on GC MEMORY_AND_DISK can avoid expensive recompuations 34
  • 35.
    Hardware Spark scales horizontally,so more is better Disk/Memory/Network balance depends on workload: CPU intensive ML jobs vs IO intensive ETL jobs Good to keep executor heap size to 64GB or less (can run multiple on each node) 35
  • 36.
    Other Performance Tweaks Switchingto LZF compression can improve shuffle performance (sacrifices some robustness for massive shuffles): conf.set(“spark.io.compression.codec”, “lzf”) Turn on speculative execution to help prevent stragglers conf.set(“spark.speculation”, “true”) 36
  • 37.
    Other Performance Tweaks Makesure to give Spark as many disks as possible to allow striping shuffle output SPARK_LOCAL_DIRS in Mesos/Standalone In YARN mode, inherits YARN’s local directories 37
  • 38.
    38 One Weird Trickfor Great Performance
  • 39.
    Use Higher LevelAPI’s! DataFrame APIs for core processing Works across Scala, Java, Python and R Spark ML for machine learning Spark SQL for structured query processing 39
  • 40.
    40 See also Chapter 8:Tuning and Debugging Spark.
  • 41.
    Come to SparkSummit 2015! 41 June 15-17 in San Francisco
  • 42.
    Other Spark HappeningsToday Spark team “Ask Us Anything” at 2:20 in 211 B Tips for writing better Spark programs at 4:00 in 230C I’ll be around Databricks booth after this 42
  • 43.
  • 44.
  • 45.
    Internals of theRDD Interface 45 1) List of partitions 2) Set of dependencies on parent RDDs 3) Function to compute a partition, given parents 4) Optional partitioning info for k/v RDDs (Partitioner) RDD Partition 1 Partition 2 Partition 3
  • 46.
    Example: Hadoop RDD 46 Partitions= 1 per HDFS block Dependencies = None compute(partition) = read corresponding HDFS block Partitioner = None > rdd = spark.hadoopFile(“hdfs://click_logs/”)
  • 47.
    Example: Filtered RDD 47 Partitions= parent partitions Dependencies = a single parent compute(partition) = call parent.compute(partition) and filter Partitioner = parent partitioner > filtered = rdd.filter(lambda x: x contains “ERROR”)
  • 48.
    Example: Joined RDD 48 Partitions= number chosen by user or heuristics Dependencies = ShuffleDependency on two or more parents compute(partition) = read and join data from all parents Partitioner = HashPartitioner(# partitions)
  • 49.
    49 A More ComplexDAG Joined RDD Partition 1 Partition 2 Partition 3 Filtered RDDPartition 1 Partition 2 Mapped RDDPartition 1 Partition 2 Hadoop RDDPartition 1 Partition 2 JDBC RDD Partition 1 Partition 2 Filtered RDDPartition 1 Partition 2 Partition 3 .count()
  • 50.
    50 A More ComplexDAG Stage 3 Task 1 Task 2 Task 3 Stage 2 Task 1 Task 2 Stage 1 Task 1 Task 2 Shuffle Read Shuffle Write
  • 51.
    51 RDD Partition 1 Partition 2 Partition 3 Parent Partition 1 Partition 2 Partition 3 Narrow and WideTransformations RDD Partition 1 Partition 2 Partition 3 Parent 1 Partition 1 Partition 2 Parent 2 Partition 1 Partition 2 FilteredRDD JoinedRDD