Structuring Spark SQL, DataFrames, Datasets, and Streaming Michael Armbrust- @michaelarmbrust Spark Summit East 2016
Background: What is in an RDD? •Dependencies •Partitions (with optional locality info) •Compute function: Partition => Iterator[T] 2
Background: What is in an RDD? •Dependencies •Partitions (with optional locality info) •Compute function: Partition => Iterator[T] 3 Opaque Computation
Background: What is in an RDD? •Dependencies •Partitions (with optional locality info) •Compute function: Partition => Iterator[T] 4 Opaque Data
Struc·ture [ˈstrək(t)SHər] verb 1. construct or arrange according to a plan; give a pattern or organization to. 5
Why structure? • By definition, structure will limit what can be expressed. • In practice, we can accommodate the vast majority of computations. 6 Limiting the space of what can be expressed enables optimizations.
Structured APIs In Spark 7 SQL DataFrames Datasets Syntax Errors Analysis Errors Runtime Compile Time Runtime Compile Time Compile Time Runtime Analysis errors reported before a distributed job starts
Type-safe: operate on domain objects with compiled lambda functions 8 Datasets API val df = ctx.read.json("people.json") // Convert data to domain objects. case class Person(name: String, age: Int) val ds: Dataset[Person] = df.as[Person] ds.filter(_.age > 30) // Compute histogram of age by name. val hist = ds.groupBy(_.name).mapGroups { case (name, people: Iter[Person]) => val buckets = new Array[Int](10) people.map(_.age).foreach { a => buckets(a / 10) += 1 } (name, buckets) }
DataFrame = Dataset[Row] •Spark 2.0 will unify these APIs •Stringly-typed methods will downcast to generic Row objects •Ask Spark SQL to enforce types on generic rows using df.as[MyClass] 9
What about ? Some of the goals of the Dataset API have always been available! 10 df.map(lambda x: x.name) df.map(x => x(0).asInstanceOf[String])
Shared Optimization & Execution 11 SQL AST DataFrame Unresolved Logical Plan Logical Plan Optimized Logical Plan RDDs Selected Physical Plan Analysis Logical Optimization Physical Planning CostModel Physical Plans Code Generation Catalog DataFrames, Datasets and SQL sharethe same optimization/execution pipeline Dataset
Structuring Computation 12
Columns col("x") === 1 df("x") === 1 expr("x = 1") sql("SELECT … WHERE x = 1") 13 New value, computed based on input values. DSL SQL Parser
• 100+ native functionswith optimized codegen implementations – String manipulation – concat, format_string, lower, lpad – Data/Time – current_timestamp, date_format, date_add, … – Math – sqrt, randn, … – Other – monotonicallyIncreasingId, sparkPartitionId, … 14 Complex Columns With Functions from pyspark.sql.functions import * yesterday = date_sub(current_date(), 1) df2 = df.filter(df.created_at > yesterday) import org.apache.spark.sql.functions._ val yesterday = date_sub(current_date(), 1) val df2 = df.filter(df("created_at") > yesterday)
Functions 15 (x: Int) => x == 1 Columns col("x") === 1You Type Spark Sees class $anonfun$1	{ def apply(Int): Boolean } EqualTo(x, Lit(1))
Columns: Predicate pushdown sqlContext.read .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "people") .load() .where($"name" === "michael") 16 You Write Spark Translates For Postgres SELECT * FROM people WHERE name = 'michael'
Columns: Efficient Joins df1.join(df2, col("x") == col("y")) 17 df1 df2 SortMergeJoin myUDF = udf(lambda x, y: x == y) df1.join(df2, myUDF(col("x"), col("y"))) df1 df2 Cartisian Filter n2 n log n Equal values sort to the same place
Structuring Data 18
Spark's Structured Data Model • Primitives: Byte, Short, Integer,Long, Float, Double, Decimal,String, Binary, Boolean, Timestamp, Date • Array[Type]: variable length collection • Struct: fixed # of nested columns with fixed types • Map[Type, Type]: variable length association 19
6 “bricks” Tungsten’s Compact Encoding 20 0x0 123 32L 48L 4 “data” (123, “data”, “bricks”) Null bitmap Offset to data Offset to data Field lengths
Encoders 21 6 “bricks”0x0 123 32L 48L 4 “data” JVM Object Internal Representation MyClass(123, “data”, “bricks”) Encoders translate between domain objects and Spark's internal format
Bridge Objects with Data Sources 22 { "name": "Michael", "zip": "94709" "languages": ["scala"] } case class Person( name: String, languages: Seq[String], zip: Int) Encoders map columns to fields by name { JSON } JDBC
Space Efficiency 23
Serialization performance 24
Operate Directly On Serialized Data 25 df.where(df("year") > 2015) GreaterThan(year#234, Literal(2015)) bool filter(Object baseObject) { int offset = baseOffset + bitSetWidthInBytes + 3*8L; int value = Platform.getInt(baseObject, offset); return value34 > 2015; } DataFrame Code / SQL Catalyst Expressions Low-level bytecode JVM intrinsic JIT-ed to pointer arithmetic Platform.getInt(baseObject, offset);
Structured Streaming 26
The simplest way to perform streaming analytics is not having to reason about streaming.
Spark 2.0 Continuous DataFrames Spark 1.3 Static DataFrames Single API !
Structured Streaming • High-level streaming API built on SparkSQL engine • Runsthe same querieson DataFrames • Eventtime, windowing,sessions,sources& sinks • Unifies streaming, interactive and batch queries • Aggregate data in a stream, then serve using JDBC • Change queriesatruntime • Build and apply ML models
logs = ctx.read.format("json").open("s3://logs") logs.groupBy(logs.user_id).agg(sum(logs.time)) .write.format("jdbc") .save("jdbc:mysql//...") Example: Batch Aggregation
logs = ctx.read.format("json").stream("s3://logs") logs.groupBy(logs.user_id).agg(sum(logs.time)) .write.format("jdbc") .stream("jdbc:mysql//...") Example: Continuous Aggregation
Logically: DataFrame operations on static data (i.e. as easyto understand as batch) Physically: Spark automatically runs the queryin streaming fashion (i.e. incrementally and continuously) DataFrame Logical Plan Continuous, incremental execution Catalyst optimizer Execution
Incrementalized By Spark Scan Files Aggregate Write to MySQL Scan New Files Stateful Aggregate Update MySQL Batch Continuous Transformation requires information about the structure
What's Coming? • Spark 2.0 • Unification ofthe APIs • Basic streaming API • Event-time aggregations • Spark 2.1+ • Other streaming sources / sinks • Machine learning • Structurein other libraries: MLlib, GraphFrames 34
Questions? @michaelarmbrust

Structuring Spark: DataFrames, Datasets, and Streaming

  • 1.
    Structuring Spark SQL, DataFrames,Datasets, and Streaming Michael Armbrust- @michaelarmbrust Spark Summit East 2016
  • 2.
    Background: What isin an RDD? •Dependencies •Partitions (with optional locality info) •Compute function: Partition => Iterator[T] 2
  • 3.
    Background: What isin an RDD? •Dependencies •Partitions (with optional locality info) •Compute function: Partition => Iterator[T] 3 Opaque Computation
  • 4.
    Background: What isin an RDD? •Dependencies •Partitions (with optional locality info) •Compute function: Partition => Iterator[T] 4 Opaque Data
  • 5.
    Struc·ture [ˈstrək(t)SHər] verb 1. construct orarrange according to a plan; give a pattern or organization to. 5
  • 6.
    Why structure? • Bydefinition, structure will limit what can be expressed. • In practice, we can accommodate the vast majority of computations. 6 Limiting the space of what can be expressed enables optimizations.
  • 7.
    Structured APIs InSpark 7 SQL DataFrames Datasets Syntax Errors Analysis Errors Runtime Compile Time Runtime Compile Time Compile Time Runtime Analysis errors reported before a distributed job starts
  • 8.
    Type-safe: operate on domainobjects with compiled lambda functions 8 Datasets API val df = ctx.read.json("people.json") // Convert data to domain objects. case class Person(name: String, age: Int) val ds: Dataset[Person] = df.as[Person] ds.filter(_.age > 30) // Compute histogram of age by name. val hist = ds.groupBy(_.name).mapGroups { case (name, people: Iter[Person]) => val buckets = new Array[Int](10) people.map(_.age).foreach { a => buckets(a / 10) += 1 } (name, buckets) }
  • 9.
    DataFrame = Dataset[Row] •Spark2.0 will unify these APIs •Stringly-typed methods will downcast to generic Row objects •Ask Spark SQL to enforce types on generic rows using df.as[MyClass] 9
  • 10.
    What about ? Someof the goals of the Dataset API have always been available! 10 df.map(lambda x: x.name) df.map(x => x(0).asInstanceOf[String])
  • 11.
    Shared Optimization &Execution 11 SQL AST DataFrame Unresolved Logical Plan Logical Plan Optimized Logical Plan RDDs Selected Physical Plan Analysis Logical Optimization Physical Planning CostModel Physical Plans Code Generation Catalog DataFrames, Datasets and SQL sharethe same optimization/execution pipeline Dataset
  • 12.
  • 13.
    Columns col("x") === 1 df("x")=== 1 expr("x = 1") sql("SELECT … WHERE x = 1") 13 New value, computed based on input values. DSL SQL Parser
  • 14.
    • 100+ nativefunctionswith optimized codegen implementations – String manipulation – concat, format_string, lower, lpad – Data/Time – current_timestamp, date_format, date_add, … – Math – sqrt, randn, … – Other – monotonicallyIncreasingId, sparkPartitionId, … 14 Complex Columns With Functions from pyspark.sql.functions import * yesterday = date_sub(current_date(), 1) df2 = df.filter(df.created_at > yesterday) import org.apache.spark.sql.functions._ val yesterday = date_sub(current_date(), 1) val df2 = df.filter(df("created_at") > yesterday)
  • 15.
    Functions 15 (x: Int) =>x == 1 Columns col("x") === 1You Type Spark Sees class $anonfun$1 { def apply(Int): Boolean } EqualTo(x, Lit(1))
  • 16.
    Columns: Predicate pushdown sqlContext.read .format("jdbc") .option("url","jdbc:postgresql:dbserver") .option("dbtable", "people") .load() .where($"name" === "michael") 16 You Write Spark Translates For Postgres SELECT * FROM people WHERE name = 'michael'
  • 17.
    Columns: Efficient Joins df1.join(df2,col("x") == col("y")) 17 df1 df2 SortMergeJoin myUDF = udf(lambda x, y: x == y) df1.join(df2, myUDF(col("x"), col("y"))) df1 df2 Cartisian Filter n2 n log n Equal values sort to the same place
  • 18.
  • 19.
    Spark's Structured DataModel • Primitives: Byte, Short, Integer,Long, Float, Double, Decimal,String, Binary, Boolean, Timestamp, Date • Array[Type]: variable length collection • Struct: fixed # of nested columns with fixed types • Map[Type, Type]: variable length association 19
  • 20.
    6 “bricks” Tungsten’s CompactEncoding 20 0x0 123 32L 48L 4 “data” (123, “data”, “bricks”) Null bitmap Offset to data Offset to data Field lengths
  • 21.
    Encoders 21 6 “bricks”0x0 12332L 48L 4 “data” JVM Object Internal Representation MyClass(123, “data”, “bricks”) Encoders translate between domain objects and Spark's internal format
  • 22.
    Bridge Objects withData Sources 22 { "name": "Michael", "zip": "94709" "languages": ["scala"] } case class Person( name: String, languages: Seq[String], zip: Int) Encoders map columns to fields by name { JSON } JDBC
  • 23.
  • 24.
  • 25.
    Operate Directly OnSerialized Data 25 df.where(df("year") > 2015) GreaterThan(year#234, Literal(2015)) bool filter(Object baseObject) { int offset = baseOffset + bitSetWidthInBytes + 3*8L; int value = Platform.getInt(baseObject, offset); return value34 > 2015; } DataFrame Code / SQL Catalyst Expressions Low-level bytecode JVM intrinsic JIT-ed to pointer arithmetic Platform.getInt(baseObject, offset);
  • 26.
  • 27.
    The simplest wayto perform streaming analytics is not having to reason about streaming.
  • 28.
    Spark 2.0 Continuous DataFrames Spark1.3 Static DataFrames Single API !
  • 29.
    Structured Streaming • High-levelstreaming API built on SparkSQL engine • Runsthe same querieson DataFrames • Eventtime, windowing,sessions,sources& sinks • Unifies streaming, interactive and batch queries • Aggregate data in a stream, then serve using JDBC • Change queriesatruntime • Build and apply ML models
  • 30.
  • 31.
  • 32.
    Logically: DataFrame operations onstatic data (i.e. as easyto understand as batch) Physically: Spark automatically runs the queryin streaming fashion (i.e. incrementally and continuously) DataFrame Logical Plan Continuous, incremental execution Catalyst optimizer Execution
  • 33.
    Incrementalized By Spark ScanFiles Aggregate Write to MySQL Scan New Files Stateful Aggregate Update MySQL Batch Continuous Transformation requires information about the structure
  • 34.
    What's Coming? • Spark2.0 • Unification ofthe APIs • Basic streaming API • Event-time aggregations • Spark 2.1+ • Other streaming sources / sinks • Machine learning • Structurein other libraries: MLlib, GraphFrames 34
  • 35.