Introduction to Apache Spark Brendan Dillon Javier Arrieta
Spark Core Your Applications The Stack Spark SQL MLLib GraphX Spark Streaming Mesos YARN Standalone sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
Distributed Execution Driver Spark Context Worker Node Executor Task Task Worker Node Executor Task Task
Resilient Distributed Datasets (RDD) • Immutable: never modified – just transformed to new RDDs • Distributed: split into multiple partitions and spread across multiple servers in a cluster • Resilient: can be re-computed if they get destroyed • Created by: – Loading external data – Distributing a collection of objects in the driver program
RDD Implementation • Array of partitions • List of dependencies on parent RDDs • Function to compute a partition given its parents – Returns Iterator over a partition • Preferred locations: list of strings for each partition (Nil by default) • Partitioner (None by default)
Persistence / Caching • By default RDDs (and all of their dependencies) are recomputed every time an action is called on them! • Need to explicitly tell Spark when to persist • Options: – Default: stored in heap as unserialized objects (pickled objects for Python) – Memory only: serialized or not – Memory and disk: spills to disk, option to serialize in memory – Disk only • Tachyon: off-heap distributed caching – Aims to make Spark more resilient – Avoid GC overheads
Dependency Types: Narrow E.g. map, filter E.g. union E.g. join with co-partitioned input Each partition of parent is used by at most one partition of the child
Dependency Types: Wide E.g. groupByKey E.g. join with inputs non co-partitioned Each partition of the parent is used by more than one partition of the child
Transformations • Return a new RDD • Lazy evaluation • Single RDD transformations: map, flatMap, filter, distinct • Pair RDDs: keyBy, reduceByKey, groupByKey, combineByKey, mapValues, flatMapValues, sortByKey • Two RDD transformations: union, intersection, subtract, cartesian • Two pair RDDs: join, rightOuterJoin, leftOuterJoin, cogroup
Actions • Force evaluation of the transformations and return a value to the driver program or write to external storage • Actions on RDDs: – reduce, fold, aggregate – foreach(func), collect – count, countByValue – top(num) – take(num), takeOrdered(num)(ordering) • Actions on pair RDDs: – countByKey – collectAsMap – lookup(key)
Single RDD Transformations
map and flatMap • map takes a function that transforms each element of a collection: map(f: T => U) • RDD[T] => RDD[U] • flatMap takes a function that transforms a single element of a collection into a sequence of elements: flatMap(f: T => Seq[U]) • Flattens out the output into a single sequence • RDD[T] => RDD[U]
filter, distinct • filter takes a (predicate) function that returns true if an element should be in the output collection: map(f: T => Bool) • distinct removes duplicates from the RDD • Both filter and distinct transform from RDD[T] => RDD[T]
Actions
reduce, fold & aggregate • reduce takes a function that combines pairwise element of a collection: reduce(f: (T, T) => T) • fold is like reduce except it takes a zero value i.e. fold(zero: T) (f: (T, T) => T) • reduce and fold: RDD[T] => T • aggregate is the most general form • aggregate(zero: U)(seqOp: (U, T) => U, combOp: (U, U) => U) • aggregate: RDD[T] => U
Pair RDD Transformations
keyBy, reduceByKey • keyBy creates tuples of the elements in an RDD by applying a function: keyBy(f: T => K) • RDD[ T ] => RDD[ (K, T) ] • reduceByKey takes a function that takes a two values and returns a single value: reduceByKey(f: (V,V) => V) • RDD[ (K, V) ] => RDD[ (K, V) ]
groupByKey • Takes a collection of key-value pairs and no parameters • Returns a sequence of values associated with each key • RDD[ ( K, V ) ] => RDD[ ( K, Iterable[V] ) ] • Results must fit in memory • Can be slow – use aggregateByKey or reduceByKey where possible • Ordering of values not guaranteed and can vary on every evaluation
combineByKey • def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) • RDD [ (K, V) ] => RDD[ (K, C) ] • createCombiner called per partition when a new key is found • mergeValue combines a new value to an existing accumulator • mergeCombiners with results from different partitions • Sometimes map-size combine not useful e.g. groupByKey • groupByKey, aggregateByKey and reduceByKey all implemented using combineByKey
map vs mapValues • map takes a function that transforms each element of a collection: map(f: T => U) • RDD[T] => RDD[U] • When T is a tuple we may want to only act on the values – not the keys • mapValues takes a function that maps the values in the inputs to the values in the output: mapValues(f: V => W) • Where RDD[ (K, V) ] => RDD[ (K, W) ] • NB: use mapValues when you can: avoids reshuffle when data is partitioned by key
Two RDD Transformations
Pseudo-set: union, intersection, subtract, cartesian • rdd.union(otherRdd): RRD containing elements from both • rdd.intersection(otherRdd): RDD containing only elements found in both • rdd.subtract(otherRdd): remove content of one from the other e.g. removing training data • rdd.cartesian(otherRdd): Cartesian product of two RDDs e.g. similarity of pairs: RDD[T] RDD[U] => RDD[ (T, U) ]
Two Pair RDD Transformations
join, rightOuterJoin, leftOuterJoin, cogroup • Join: RDD[ ( K, V) ] and RDD[ (K, W) ] => RDD[ ( K, (V,W) ) ] • Cogroup: RDD[ ( K, V) ] and RDD[ (K, W) ] => RDD[ ( K, ( Seq[V], Seq[W] ) ) ] • rightOuterJoin and leftRightJoin when keys must be present in left / right RDD
Partition-specific Transformations and Actions
mapPartitions, mapPartitionsWithIndex, and foreachPartition • Same as map and foreach except they operate on a per partition basis • Useful for when you have setup code (DB, RNG etc.) but don’t want to call it for each partition • You can set preservesPartitioning when you are not altering the keys used for partitioning to avoid unnecessary shuffling – As with mapValues in the last slide
Data Frames
Data Frames & Catalyst Optimizer
DataFrame creation and operations val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.jsonFile("examples/src/main/resources/people.json”) // Show the content of the DataFrame df.show() // Print the schema in a tree format df.printSchema() // Select only the "name" column df.select("name”) // Select everybody, but increment the age by 1 df.select("name", df("age") + 1) // Select people older than 21 df.filter(df("name") > 21) // Count people by age df.groupBy("age").count()
Spark Streaming
Introduction
Alternatives Apache Storm Trident Programming Model Micro-Batch One at a time Micro-batch Stream Primitive DStream Stream Tuple, Tuple Batch, Partition Distributed Stream Dataflow Stream Source ReceiverInputDStr eam Container Spouts, Trident Spouts Data Stream Computation Maps/windows/op erations on Dstream StreamTask, Window, join Filters, functions, aggregations, joins Maps/windows/op erations on Data Stream Resource mgmt YARN/Mesos YARN YARN/Mesos YARN Resilience Require WAL to DFS (HDFS/S3) Checkpointing (Kafka) Nimbus reassigns and failed batch replayed Lightweight Distributed Snapshots
Scala collections programming model, map, flatMap, window, reduce (fold) share code between batch and streaming, both share the same programming model (although different semantics) microbatches allow aggregation on the batches, improved throughput with a latency cost Why Spark Streaming
Spark Streaming Execution Driver Spark Context Worker Node Executor Task Task Worker Node Executor Task Task Worker Node Executor Task Task Streaming Producer
Example overview
Code val metaStream = stream.map { case (k, v) => (k, DocumentMetadata.fromMutable(recordDecoder.decode(v).asInstanceOf[GenericRecord])) } private val pdfFiles = metaStream.filter(_._2.contentType == "application/pdf") .map { case (k, meta) => (meta, fetchFileFromMessage(k, meta)) } val pdfDocs = pdfFiles.map { case (meta, file) => (meta, TextExtractor.parseFile(file)) } val texts = pdfDocs.map { case (meta, doc) => (meta, TextExtractor.extractText(doc)) }.cache() val wordStream = texts.map { case (meta, text) => (meta, text.split("""[ nrtu00a0]+""").toList.map(_.replaceAll("""[,;.]$""", "").trim.toLowerCase()).filter(_.length > 1)) } texts.foreachRDD( rdd => rdd.foreach { case (meta,text) => indexText(meta.id, text) } ) val wordCountStream = wordStream.flatMap(_._2).map(word => (word, 1)).reduceByKey(_ + _) val totalWordCountStream = wordStream.map(_._2.size) val totalWords = totalWordCountStream.reduce(_+_) val sortedWordCount = wordCountStream.transform(rdd => rdd.sortBy(_._2, ascending = false)) sortedWordCount.foreachRDD(rdd => println(rdd.toDebugString)) sortedWordCount.print(30) totalWords.print()
Q & A

Scala meetup - Intro to spark

  • 1.
    Introduction to ApacheSpark Brendan Dillon Javier Arrieta
  • 2.
    Spark Core Your Applications TheStack Spark SQL MLLib GraphX Spark Streaming Mesos YARN Standalone sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19").map(t => "Name: " + t(0)).collect().foreach(println)
  • 3.
  • 4.
    Resilient Distributed Datasets(RDD) • Immutable: never modified – just transformed to new RDDs • Distributed: split into multiple partitions and spread across multiple servers in a cluster • Resilient: can be re-computed if they get destroyed • Created by: – Loading external data – Distributing a collection of objects in the driver program
  • 5.
    RDD Implementation • Arrayof partitions • List of dependencies on parent RDDs • Function to compute a partition given its parents – Returns Iterator over a partition • Preferred locations: list of strings for each partition (Nil by default) • Partitioner (None by default)
  • 6.
    Persistence / Caching •By default RDDs (and all of their dependencies) are recomputed every time an action is called on them! • Need to explicitly tell Spark when to persist • Options: – Default: stored in heap as unserialized objects (pickled objects for Python) – Memory only: serialized or not – Memory and disk: spills to disk, option to serialize in memory – Disk only • Tachyon: off-heap distributed caching – Aims to make Spark more resilient – Avoid GC overheads
  • 7.
    Dependency Types: Narrow E.g.map, filter E.g. union E.g. join with co-partitioned input Each partition of parent is used by at most one partition of the child
  • 8.
    Dependency Types: Wide E.g.groupByKey E.g. join with inputs non co-partitioned Each partition of the parent is used by more than one partition of the child
  • 9.
    Transformations • Return anew RDD • Lazy evaluation • Single RDD transformations: map, flatMap, filter, distinct • Pair RDDs: keyBy, reduceByKey, groupByKey, combineByKey, mapValues, flatMapValues, sortByKey • Two RDD transformations: union, intersection, subtract, cartesian • Two pair RDDs: join, rightOuterJoin, leftOuterJoin, cogroup
  • 10.
    Actions • Force evaluationof the transformations and return a value to the driver program or write to external storage • Actions on RDDs: – reduce, fold, aggregate – foreach(func), collect – count, countByValue – top(num) – take(num), takeOrdered(num)(ordering) • Actions on pair RDDs: – countByKey – collectAsMap – lookup(key)
  • 11.
  • 12.
    map and flatMap •map takes a function that transforms each element of a collection: map(f: T => U) • RDD[T] => RDD[U] • flatMap takes a function that transforms a single element of a collection into a sequence of elements: flatMap(f: T => Seq[U]) • Flattens out the output into a single sequence • RDD[T] => RDD[U]
  • 13.
    filter, distinct • filtertakes a (predicate) function that returns true if an element should be in the output collection: map(f: T => Bool) • distinct removes duplicates from the RDD • Both filter and distinct transform from RDD[T] => RDD[T]
  • 14.
  • 15.
    reduce, fold &aggregate • reduce takes a function that combines pairwise element of a collection: reduce(f: (T, T) => T) • fold is like reduce except it takes a zero value i.e. fold(zero: T) (f: (T, T) => T) • reduce and fold: RDD[T] => T • aggregate is the most general form • aggregate(zero: U)(seqOp: (U, T) => U, combOp: (U, U) => U) • aggregate: RDD[T] => U
  • 16.
  • 17.
    keyBy, reduceByKey • keyBycreates tuples of the elements in an RDD by applying a function: keyBy(f: T => K) • RDD[ T ] => RDD[ (K, T) ] • reduceByKey takes a function that takes a two values and returns a single value: reduceByKey(f: (V,V) => V) • RDD[ (K, V) ] => RDD[ (K, V) ]
  • 18.
    groupByKey • Takes acollection of key-value pairs and no parameters • Returns a sequence of values associated with each key • RDD[ ( K, V ) ] => RDD[ ( K, Iterable[V] ) ] • Results must fit in memory • Can be slow – use aggregateByKey or reduceByKey where possible • Ordering of values not guaranteed and can vary on every evaluation
  • 19.
    combineByKey • def combineByKey[C](createCombiner:V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null) • RDD [ (K, V) ] => RDD[ (K, C) ] • createCombiner called per partition when a new key is found • mergeValue combines a new value to an existing accumulator • mergeCombiners with results from different partitions • Sometimes map-size combine not useful e.g. groupByKey • groupByKey, aggregateByKey and reduceByKey all implemented using combineByKey
  • 20.
    map vs mapValues •map takes a function that transforms each element of a collection: map(f: T => U) • RDD[T] => RDD[U] • When T is a tuple we may want to only act on the values – not the keys • mapValues takes a function that maps the values in the inputs to the values in the output: mapValues(f: V => W) • Where RDD[ (K, V) ] => RDD[ (K, W) ] • NB: use mapValues when you can: avoids reshuffle when data is partitioned by key
  • 21.
  • 22.
    Pseudo-set: union, intersection,subtract, cartesian • rdd.union(otherRdd): RRD containing elements from both • rdd.intersection(otherRdd): RDD containing only elements found in both • rdd.subtract(otherRdd): remove content of one from the other e.g. removing training data • rdd.cartesian(otherRdd): Cartesian product of two RDDs e.g. similarity of pairs: RDD[T] RDD[U] => RDD[ (T, U) ]
  • 23.
    Two Pair RDDTransformations
  • 24.
    join, rightOuterJoin, leftOuterJoin,cogroup • Join: RDD[ ( K, V) ] and RDD[ (K, W) ] => RDD[ ( K, (V,W) ) ] • Cogroup: RDD[ ( K, V) ] and RDD[ (K, W) ] => RDD[ ( K, ( Seq[V], Seq[W] ) ) ] • rightOuterJoin and leftRightJoin when keys must be present in left / right RDD
  • 25.
  • 26.
    mapPartitions, mapPartitionsWithIndex, and foreachPartition •Same as map and foreach except they operate on a per partition basis • Useful for when you have setup code (DB, RNG etc.) but don’t want to call it for each partition • You can set preservesPartitioning when you are not altering the keys used for partitioning to avoid unnecessary shuffling – As with mapValues in the last slide
  • 27.
  • 28.
    Data Frames &Catalyst Optimizer
  • 29.
    DataFrame creation andoperations val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame val df = sqlContext.jsonFile("examples/src/main/resources/people.json”) // Show the content of the DataFrame df.show() // Print the schema in a tree format df.printSchema() // Select only the "name" column df.select("name”) // Select everybody, but increment the age by 1 df.select("name", df("age") + 1) // Select people older than 21 df.filter(df("name") > 21) // Count people by age df.groupBy("age").count()
  • 30.
  • 31.
  • 32.
    Alternatives Apache Storm Trident Programming Model Micro-Batch Oneat a time Micro-batch Stream Primitive DStream Stream Tuple, Tuple Batch, Partition Distributed Stream Dataflow Stream Source ReceiverInputDStr eam Container Spouts, Trident Spouts Data Stream Computation Maps/windows/op erations on Dstream StreamTask, Window, join Filters, functions, aggregations, joins Maps/windows/op erations on Data Stream Resource mgmt YARN/Mesos YARN YARN/Mesos YARN Resilience Require WAL to DFS (HDFS/S3) Checkpointing (Kafka) Nimbus reassigns and failed batch replayed Lightweight Distributed Snapshots
  • 33.
    Scala collections programmingmodel, map, flatMap, window, reduce (fold) share code between batch and streaming, both share the same programming model (although different semantics) microbatches allow aggregation on the batches, improved throughput with a latency cost Why Spark Streaming
  • 34.
    Spark Streaming Execution Driver Spark Context WorkerNode Executor Task Task Worker Node Executor Task Task Worker Node Executor Task Task Streaming Producer
  • 35.
  • 36.
    Code val metaStream =stream.map { case (k, v) => (k, DocumentMetadata.fromMutable(recordDecoder.decode(v).asInstanceOf[GenericRecord])) } private val pdfFiles = metaStream.filter(_._2.contentType == "application/pdf") .map { case (k, meta) => (meta, fetchFileFromMessage(k, meta)) } val pdfDocs = pdfFiles.map { case (meta, file) => (meta, TextExtractor.parseFile(file)) } val texts = pdfDocs.map { case (meta, doc) => (meta, TextExtractor.extractText(doc)) }.cache() val wordStream = texts.map { case (meta, text) => (meta, text.split("""[ nrtu00a0]+""").toList.map(_.replaceAll("""[,;.]$""", "").trim.toLowerCase()).filter(_.length > 1)) } texts.foreachRDD( rdd => rdd.foreach { case (meta,text) => indexText(meta.id, text) } ) val wordCountStream = wordStream.flatMap(_._2).map(word => (word, 1)).reduceByKey(_ + _) val totalWordCountStream = wordStream.map(_._2.size) val totalWords = totalWordCountStream.reduce(_+_) val sortedWordCount = wordCountStream.transform(rdd => rdd.sortBy(_._2, ascending = false)) sortedWordCount.foreachRDD(rdd => println(rdd.toDebugString)) sortedWordCount.print(30) totalWords.print()
  • 37.