Writing Continuous Applications with Structured Streaming in PySpark Jules S. Damji Spark + AI Summit , SF April 24, 2019
I have used Apache Spark 2.x Before…
Apache Spark Community & DeveloperAdvocate@ Databricks DeveloperAdvocate@ Hortonworks Software engineering @Sun Microsystems, Netscape, @Home, VeriSign, Scalix, Centrify, LoudCloud/Opsware, ProQuest Program Chair Spark + AI Summit https://www.linkedin.com/in/dmatrix @2twitme
Accelerate innovation by unifying data science, engineering and business • Original creators of • 2000+ global companies use our platform across big data & machine learning lifecycle VISION WHO WE ARE Unified Analytics PlatformSOLUTION
Agenda for Today’s Talk • Why Apache Spark • Why Streaming Applications are Difficult • What’s Structured Streaming • Anatomy of a Continunous Application • Tutorials • Q & A
How to think about data in 2019 - 2020 “Data is the new currency" 10101010. . . 10101010. . .
Why Apache Spark?
What is Apache Spark? • General cluster computing engine that extends MapReduce • Rich set of APIs and libraries • Unified Engine • Large community: 1000+ orgs, clusters up to 8000 nodes • Supports DL Frameworks Apache Spark, Spark and Apache are trademarks of the Apache Software Foundation SQLStreaming ML Graph … DL
Unique Thing about Spark • Unification: same engine and same API for diverse use cases • Streaming, batch, or interactive • ETL, SQL, machine learning, or graph • Deep Learning Frameworks w/Horovod – TensorFlow – Keras – PyTorch
Faster, Easier to Use, Unified 10 First	Distributed Processing	Engine Specialized	Data Processing	Engines Unified	Data Processing	Engine
Benefits of Unification 1. Simpler to use and operate 2. Code reuse: e.g. only write monitoring, FT, etc once 3. New apps that span processing types: e.g. interactive queries on a stream, online machine learning
An Analogy Specialized devices Unified device New applications
Why Streaming Applications are Inherently Difficult?
building robust stream processing apps is hard
Complexities in stream processing COMPLEX DATA Diverse data formats (json, avro, txt, csv, binary, …) Data can be dirty, And tardy (out-of-order) COMPLEX SYSTEMS Diverse storage systems (Kafka, S3, Kinesis, RDBMS, …) System failures COMPLEX WORKLOADS Combining streaming with interactive queries Machine learning
Structured Streaming stream processing on Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems
you should not have to reason about streaming
Treat Streams as Unbounded Tables data stream unbounded inputtable newdata in the data stream = newrows appended to a unboundedtable
you should write queries & Apache Spark should continuously update the answer
DataFrames, Datasets, SQL input = spark.readStream .format("kafka") .option("subscribe", "topic") .load() result = input .select("device", "signal") .where("signal > 15") result.writeStream .format("parquet") .start("dest-path") Logical Plan Read from Kafka Project device, signal Filter signal > 15 Writeto Parquet Apache Spark automatically streamifies! Spark SQL converts batch-like query to a series of incremental execution plans operating on new batches of data Series of Incremental Execution Plans Kafka Source Optimized Operator codegen, off- heap, etc. Parquet Sink Optimized Physical Plan process newdata t = 1 t = 2 t = 3 process newdata process newdata
Structured Streaming – Processing Modes 21
Structured Streaming Processing Modes 22
Anatomy of a Continunous Application
Streaming word count Anatomy of a Streaming Query Simple Streaming ETL
Anatomy of a Streaming Query: Step 1 spark.readStream .format("kafka") .option("subscribe", "input") .load() . Source • Specify one or more locations to read data from • Built in support for Files/Kafka/Socket, pluggable.
Anatomy of a Streaming Query: Step 2 spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as “value”) Transformation • Using DataFrames,Datasets and/or SQL. • Internal processingalways exactly- once.
Anatomy of a Streaming Query: Step 3 spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as “value”) .writeStream() .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") .start() Sink • Accepts the output of each batch. • When supported sinks are transactional and exactly once (Files).
Anatomy of a Streaming Query: Output Modes from pyspark.sql import Trigger spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as 'value’) .writeStream() .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .start() Output mode – What's output • Complete – Output the whole answer every time • Update – Output changed rows • Append– Output new rowsonly Trigger – When to output • Specifiedas a time, eventually supportsdata size • No trigger means as fast as possible
Streaming Query: Output Modes Output mode – What's output • Complete – Output the whole answer every time • Update – Output changed rows • Append– Output new rows only
Anatomy of a Streaming Query: Checkpoint from pyspark.sql import Trigger spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as 'value) .writeStream() .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .withWatermark(“timestamp” “2 minutes”) .start() Checkpoint & Watermark • Tracks the progress of a query in persistent storage • Can be used to restart the query if there is a failure. • trigger( Trigger. Continunous(“ 1 second”)) Set checkpoint location & watermark to drop very late events
Fault-tolerance with Checkpointing Checkpointing – tracks progress (offsets) of consuming data from the source and intermediate state. Offsets and metadata saved as JSON Can resume after changing your streaming transformations end-to-end exactly-once guarantees process newdata t = 1 t = 2 t = 3 process newdata process newdata write ahead log
Complex Streaming ETL
Traditional ETL • Raw, dirty, un/semi-structured is data dumped as files • Periodic jobs run every few hours to convert raw data to structured data ready for further analytics • Problem: • Hours of delay beforetaking decisions on latest data • Unacceptablewhen timeis ofessence – [intrusion , anomaly or fraud detection, monitoring IoTdevices, etc.] file dump seconds hours table SQL Web ML10101010...
1. Streaming ETL w/ Structured Streaming Structured Streaming Changes the Equation: • eliminates latencies • adds immediacy • transforms data continuously seconds table 10101010... SQL Web ML
2. Streaming ETL w/ Structured Streaming & Delta Lake seconds 10101010... Transactional Log Parquet	Files
Transactional Log Parquet	Files Delta Lake ensures data reliability Streaming ● ACID Transactions ● Schema Enforcement ● Unified Batch & Streaming ● Time Travel/Data Snapshots Key Features High Quality & Reliable Data always ready for analytics Batch Updates/Delete s
Streaming ETL w/ Structured Streaming Example 1. Json data being received in Kafka 2. Parse nested json and flatten it 3. Store in structured Parquet table 4. Get end-to-end failure guarantees from pyspark.sql import Trigger rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") .load() parsedData = rawData .selectExpr("cast (value as string) as json")) .select(from_json("json", schema).as("data")) .select("data.*") # do your ETL/Transformation query = parsedData.writeStream .option("checkpointLocation", "/checkpoint") .partitionBy("date") .format("parquet") .format(”delta") .trigger( Trigger. Continunous(“5 second”)) .start("/parquetTable") .start("/deltaTable”)
Reading from Kafka raw_data_df = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") .load() rawData dataframe has the following columns key value topic partition offset timestamp [binary] [binary] "topicA" 0 345 1486087873 [binary] [binary] "topicB" 3 2890 1486086721
Transforming Data Cast binary value to string Name it column json Parse json string and expand into nested columns, name it data parsedData = rawData .selectExpr("cast (value as string) as json") .select(from_json("json", schema).as("data")) .select("data.*") json { "timestamp": 1486087873, "device": "devA", …} { "timestamp": 1486082418, "device": "devX", …} data (nested) timestamp device … 1486087873 devA … 1486086721 devX … from_json("json") as "data"
Transforming Data Cast binary value to string Name it column json Parse json string and expand into nested columns, name it data Flatten the nested columns parsedData = rawData .selectExpr("cast (value as string) as json") .select(from_json("json", schema).as("data")) .select("data.*") powerful built-in Python APIs to perform complex data transformations from_json, to_json, explode,... 100s offunctions (see our blogpost & tutorial)
Writing to Save parsed data as Parquet table or Delta table in the given path Partition files by date so that future queries on time slices of data is fast e.g. query on last 48 hours of data queryP = parsedData.writeStream .option("checkpointLocation", ...) .partitionBy("date") .format("parquet") .start("/parquetTable") #pathname queryD = parsedData.writeStream .option("checkpointLocation", ...) .partitionBy("date") .format("delta") .start("/deltaTable") #pathname
Tutorials
https://dbricks.co/sais_pyspark_sf 43 Enter your cluster name Use DBR 5.3 and Apache Spark 2.4, Scala 2.11
Summary • Apache Spark best suited for unified analytics & processing at scale • Structured Streaming APIs Enables Continunous Applications • Populate in Parquet tables or Delta Lake • Demonstrated Continunous Application
Resources • Getting Started Guide with Apache Spark on Databricks • docs.databricks.com • Spark Programming Guide • Structured Streaming Programming Guide • Anthology of Technical Assets for Structured Streaming • Databricks Engineering Blogs • https://databricks.com/training/instructor-led-training • https://delta.io
Thank You J jules@databricks.com @2twitme https://www.linkedin.com/in/dmatrix/

Writing Continuous Applications with Structured Streaming PySpark API

  • 1.
    Writing Continuous Applications withStructured Streaming in PySpark Jules S. Damji Spark + AI Summit , SF April 24, 2019
  • 2.
    I have usedApache Spark 2.x Before…
  • 3.
    Apache Spark Community& DeveloperAdvocate@ Databricks DeveloperAdvocate@ Hortonworks Software engineering @Sun Microsystems, Netscape, @Home, VeriSign, Scalix, Centrify, LoudCloud/Opsware, ProQuest Program Chair Spark + AI Summit https://www.linkedin.com/in/dmatrix @2twitme
  • 4.
    Accelerate innovation byunifying data science, engineering and business • Original creators of • 2000+ global companies use our platform across big data & machine learning lifecycle VISION WHO WE ARE Unified Analytics PlatformSOLUTION
  • 5.
    Agenda for Today’sTalk • Why Apache Spark • Why Streaming Applications are Difficult • What’s Structured Streaming • Anatomy of a Continunous Application • Tutorials • Q & A
  • 6.
    How to thinkabout data in 2019 - 2020 “Data is the new currency" 10101010. . . 10101010. . .
  • 7.
  • 8.
    What is ApacheSpark? • General cluster computing engine that extends MapReduce • Rich set of APIs and libraries • Unified Engine • Large community: 1000+ orgs, clusters up to 8000 nodes • Supports DL Frameworks Apache Spark, Spark and Apache are trademarks of the Apache Software Foundation SQLStreaming ML Graph … DL
  • 9.
    Unique Thing aboutSpark • Unification: same engine and same API for diverse use cases • Streaming, batch, or interactive • ETL, SQL, machine learning, or graph • Deep Learning Frameworks w/Horovod – TensorFlow – Keras – PyTorch
  • 10.
    Faster, Easier toUse, Unified 10 First Distributed Processing Engine Specialized Data Processing Engines Unified Data Processing Engine
  • 11.
    Benefits of Unification 1.Simpler to use and operate 2. Code reuse: e.g. only write monitoring, FT, etc once 3. New apps that span processing types: e.g. interactive queries on a stream, online machine learning
  • 12.
    An Analogy Specialized devicesUnified device New applications
  • 13.
    Why Streaming Applications areInherently Difficult?
  • 14.
  • 15.
    Complexities in streamprocessing COMPLEX DATA Diverse data formats (json, avro, txt, csv, binary, …) Data can be dirty, And tardy (out-of-order) COMPLEX SYSTEMS Diverse storage systems (Kafka, S3, Kinesis, RDBMS, …) System failures COMPLEX WORKLOADS Combining streaming with interactive queries Machine learning
  • 16.
    Structured Streaming stream processingon Spark SQL engine fast, scalable, fault-tolerant rich, unified, high level APIs deal with complex data and complex workloads rich ecosystem of data sources integrate with many storage systems
  • 17.
    you should not haveto reason about streaming
  • 18.
    Treat Streams asUnbounded Tables data stream unbounded inputtable newdata in the data stream = newrows appended to a unboundedtable
  • 19.
    you should write queries & ApacheSpark should continuously update the answer
  • 20.
    DataFrames, Datasets, SQL input =spark.readStream .format("kafka") .option("subscribe", "topic") .load() result = input .select("device", "signal") .where("signal > 15") result.writeStream .format("parquet") .start("dest-path") Logical Plan Read from Kafka Project device, signal Filter signal > 15 Writeto Parquet Apache Spark automatically streamifies! Spark SQL converts batch-like query to a series of incremental execution plans operating on new batches of data Series of Incremental Execution Plans Kafka Source Optimized Operator codegen, off- heap, etc. Parquet Sink Optimized Physical Plan process newdata t = 1 t = 2 t = 3 process newdata process newdata
  • 21.
    Structured Streaming –Processing Modes 21
  • 22.
  • 23.
    Anatomy of aContinunous Application
  • 24.
    Streaming word count Anatomyof a Streaming Query Simple Streaming ETL
  • 25.
    Anatomy of aStreaming Query: Step 1 spark.readStream .format("kafka") .option("subscribe", "input") .load() . Source • Specify one or more locations to read data from • Built in support for Files/Kafka/Socket, pluggable.
  • 26.
    Anatomy of aStreaming Query: Step 2 spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as “value”) Transformation • Using DataFrames,Datasets and/or SQL. • Internal processingalways exactly- once.
  • 27.
    Anatomy of aStreaming Query: Step 3 spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as “value”) .writeStream() .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode(OutputMode.Complete()) .option("checkpointLocation", "…") .start() Sink • Accepts the output of each batch. • When supported sinks are transactional and exactly once (Files).
  • 28.
    Anatomy of aStreaming Query: Output Modes from pyspark.sql import Trigger spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as 'value’) .writeStream() .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .start() Output mode – What's output • Complete – Output the whole answer every time • Update – Output changed rows • Append– Output new rowsonly Trigger – When to output • Specifiedas a time, eventually supportsdata size • No trigger means as fast as possible
  • 29.
    Streaming Query: OutputModes Output mode – What's output • Complete – Output the whole answer every time • Update – Output changed rows • Append– Output new rows only
  • 30.
    Anatomy of aStreaming Query: Checkpoint from pyspark.sql import Trigger spark.readStream .format("kafka") .option("subscribe", "input") .load() .groupBy(“value.cast("string") as key”) .agg(count("*") as 'value) .writeStream() .format("kafka") .option("topic", "output") .trigger("1 minute") .outputMode("update") .option("checkpointLocation", "…") .withWatermark(“timestamp” “2 minutes”) .start() Checkpoint & Watermark • Tracks the progress of a query in persistent storage • Can be used to restart the query if there is a failure. • trigger( Trigger. Continunous(“ 1 second”)) Set checkpoint location & watermark to drop very late events
  • 31.
    Fault-tolerance with Checkpointing Checkpointing– tracks progress (offsets) of consuming data from the source and intermediate state. Offsets and metadata saved as JSON Can resume after changing your streaming transformations end-to-end exactly-once guarantees process newdata t = 1 t = 2 t = 3 process newdata process newdata write ahead log
  • 32.
  • 33.
    Traditional ETL • Raw,dirty, un/semi-structured is data dumped as files • Periodic jobs run every few hours to convert raw data to structured data ready for further analytics • Problem: • Hours of delay beforetaking decisions on latest data • Unacceptablewhen timeis ofessence – [intrusion , anomaly or fraud detection, monitoring IoTdevices, etc.] file dump seconds hours table SQL Web ML10101010...
  • 34.
    1. Streaming ETLw/ Structured Streaming Structured Streaming Changes the Equation: • eliminates latencies • adds immediacy • transforms data continuously seconds table 10101010... SQL Web ML
  • 35.
    2. Streaming ETLw/ Structured Streaming & Delta Lake seconds 10101010... Transactional Log Parquet Files
  • 36.
    Transactional Log Parquet Files Delta Lake ensuresdata reliability Streaming ● ACID Transactions ● Schema Enforcement ● Unified Batch & Streaming ● Time Travel/Data Snapshots Key Features High Quality & Reliable Data always ready for analytics Batch Updates/Delete s
  • 37.
    Streaming ETL w/Structured Streaming Example 1. Json data being received in Kafka 2. Parse nested json and flatten it 3. Store in structured Parquet table 4. Get end-to-end failure guarantees from pyspark.sql import Trigger rawData = spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") .load() parsedData = rawData .selectExpr("cast (value as string) as json")) .select(from_json("json", schema).as("data")) .select("data.*") # do your ETL/Transformation query = parsedData.writeStream .option("checkpointLocation", "/checkpoint") .partitionBy("date") .format("parquet") .format(”delta") .trigger( Trigger. Continunous(“5 second”)) .start("/parquetTable") .start("/deltaTable”)
  • 38.
    Reading from Kafka raw_data_df= spark.readStream .format("kafka") .option("kafka.boostrap.servers",...) .option("subscribe", "topic") .load() rawData dataframe has the following columns key value topic partition offset timestamp [binary] [binary] "topicA" 0 345 1486087873 [binary] [binary] "topicB" 3 2890 1486086721
  • 39.
    Transforming Data Cast binaryvalue to string Name it column json Parse json string and expand into nested columns, name it data parsedData = rawData .selectExpr("cast (value as string) as json") .select(from_json("json", schema).as("data")) .select("data.*") json { "timestamp": 1486087873, "device": "devA", …} { "timestamp": 1486082418, "device": "devX", …} data (nested) timestamp device … 1486087873 devA … 1486086721 devX … from_json("json") as "data"
  • 40.
    Transforming Data Cast binaryvalue to string Name it column json Parse json string and expand into nested columns, name it data Flatten the nested columns parsedData = rawData .selectExpr("cast (value as string) as json") .select(from_json("json", schema).as("data")) .select("data.*") powerful built-in Python APIs to perform complex data transformations from_json, to_json, explode,... 100s offunctions (see our blogpost & tutorial)
  • 41.
    Writing to Save parseddata as Parquet table or Delta table in the given path Partition files by date so that future queries on time slices of data is fast e.g. query on last 48 hours of data queryP = parsedData.writeStream .option("checkpointLocation", ...) .partitionBy("date") .format("parquet") .start("/parquetTable") #pathname queryD = parsedData.writeStream .option("checkpointLocation", ...) .partitionBy("date") .format("delta") .start("/deltaTable") #pathname
  • 42.
  • 43.
  • 44.
    Summary • Apache Sparkbest suited for unified analytics & processing at scale • Structured Streaming APIs Enables Continunous Applications • Populate in Parquet tables or Delta Lake • Demonstrated Continunous Application
  • 45.
    Resources • Getting StartedGuide with Apache Spark on Databricks • docs.databricks.com • Spark Programming Guide • Structured Streaming Programming Guide • Anthology of Technical Assets for Structured Streaming • Databricks Engineering Blogs • https://databricks.com/training/instructor-led-training • https://delta.io
  • 46.