©2024 Databricks Inc. — All rights reserved Avoiding common pitfalls: Spark Structured Streaming with Kafka Shasidhar Eranti - Sr Specialist Solutions Architect Vikas Reddy Aravabhumi - Staff Backline Engineer
©2024 Databricks Inc. — All rights reserved Agenda ● Streaming in Databricks ● Streaming Reference Architecture ● Kafka Spark Usage patterns ● Kafka Spark Streaming Common Pitfalls ○ Optimizing Read Performance ○ Accommodate Varying Data loads ○ Changing Rate Limits ○ Choosing Right Trigger mode ○ Achieve exactly once semantics ● Key takeaways 2
©2024 Databricks Inc. — All rights reserved Shasidhar Eranti Sr Specialist Solutions Architect Databricks (Amsterdam) https://www.linkedin.com/in/shasidhares/ Speakers Vikas Reddy Aravabhumi Staff Backline Engineer Databricks (Amsterdam) https://www.linkedin.com/in/shasidhares/
©2024 Databricks Inc. — All rights reserved 12,000+ global customers $1.6B+ in revenue $4B in investment Inventor of the lakehouse & Pioneer of generative AI Gartner-recognized Leader Database Management Systems Data Science and Machine Learning Platforms The data and AI company Creator of Mosaic MPT
©2024 Databricks Inc. — All rights reserved Lakehouse Platform Data Warehouse Data Engineering Data Science and ML Data Streaming All structured and unstructured data Cloud Data Lake Unity Catalog Unified governance for all data assets Delta Lake Data reliability and performance Enable all your data teams Data engineers, data scientists, and analysts can easily build streaming data pipelines with the languages and tools they already know. Simplify development and operations Reduce complexity by automating many of the production aspects associated with building and maintaining real-time data workflows. One platform for streaming batch and data Eliminate data silos, centralize security and governance models, and provide complete support for all your real-time use cases. 5 Data streaming made simple Real-time analytics, machine learning and applications on one platform
©2024 Databricks Inc. — All rights reserved Data streaming common use cases 6 Real-Time Applications Embed automatic and real-time actions into business applications. Real-Time Analytics Analyze streaming data for instant insights and faster decisions. Real-Time Machine Learning Train models on the freshest data. Score in real-time.
©2024 Databricks Inc. — All rights reserved Reference architectures for streaming use cases
©2024 Databricks Inc. — All rights reserved 8 Data Sources Mobile & IoT Data Application Events SaaS Applications Machine & Application Logs On-premises Systems Data Warehouses Cloud Storage Streaming ETL with Delta Live Tables Messag e Buses Unity Catalog for data governance and sharing Delta Lake for open and reliable data storage Photon for lightning-fast data processing Workflows for end-to-end orchestration Lakehouse Platform Real-Time BI Apps Real-Time AI Apps Predictive Maintenance Personalized Offers Patient Diagnostics Real-Time Operational Apps Alerts Fraud Detection Dynamic Pricing Real-Time Applications with Spark Structured Streaming Real-Time Analytics with Databricks SQL Real-Time Machine Learning with Databricks ML Integrated products for all your real-time use cases
©2024 Databricks Inc. — All rights reserved Analytics Machine Learning Business Insights Automatic Operations Orchestration Observability Data Quality CI / CD Operational Apps Photon Build Production ETL Pipelines with DLT Cloud Storage Message Queues Bronze Layer Silver Layer Gold Layer orders (Streaming Table) customers (Streaming Table) customer_order s (Materialized View) daily_orders (Materialized View) Databricks Lakehouse Platform UNITY CATALOG
©2024 Databricks Inc. — All rights reserved 10 Available Resources databricks.com/demos ● Streaming Data with Workflows and Delta Live Tables ● Delta Live Tables Product Tour ● 28 Databricks Tutorials with a pip installer
©2024 Databricks Inc. — All rights reserved 11 Apache Kafka & Apache Spark Structured Streaming (Usage Patterns)
©2024 Databricks Inc. — All rights reserved Apache Kafka and Apache Spark Structured Streaming Apache Kafka Connector for Spark Structured Streaming Structured Streaming End-to-end Open Source Pipeline 12
©2024 Databricks Inc. — All rights reserved Apache Kafka Connector for Spark Structured Streaming Structured Streaming Apache Kafka Connector for Spark Structured Streaming Confluent Kafka - Databricks
©2024 Databricks Inc. — All rights reserved Auto Loader Cloud Storage (S3, ADLS, GCS) Structured Streaming Apache Kafka and Databricks Auto Loader Confluent Kafka - AutoLoader - Databricks
©2024 Databricks Inc. — All rights reserved Cloud Storage (S3, ADLS, GCS) Delta Lake Sink Connector Confluent Cloud - Databricks Delta Lake Sink Connector Confluent Kafka - Delta - Databricks
©2024 Databricks Inc. — All rights reserved 16 Spark Streaming Basics
©2024 Databricks Inc. — All rights reserved Sample Spark Structured stream spark .readStream .format("kafka") ... .writeStream .format("delta") .table("delta_stream") .option("checkpointLocation ", "…") .start() Checkpoint Time Structured Streaming Structured Streaming Fault Tolerance Delivery Guarantees
©2024 Databricks Inc. — All rights reserved 18 Kafka Spark Streams Common Pitfalls
©2024 Databricks Inc. — All rights reserved Apache Kafka and Apache Spark Structured Streaming Apache Kafka Connector for Spark Structured Streaming Structured Streaming Sample Stream for illustration 19
©2024 Databricks Inc. — All rights reserved 1. Ingestion Performance Objective • How to get best read throughput from Kafka Scenario: What will users try? • Increase cluster size • Choose VMs with better I/O performance
©2024 Databricks Inc. — All rights reserved 1. Ingestion Performance Issue: Structured streaming reads data from each kafka partition in one task. ● #num kafka partitions == #num of read tasks ● 1 Kafka partition – 1 task(/core) ● Idle CPUs - Under utilization of cluster (Under utilized cluster) Topic: topiclive Partition0 Partition1 Spark cluster Core 0 Core 2 Core 1 Idle
©2024 Databricks Inc. — All rights reserved Recommendation: 22 1. Ingestion Performance With Fan-out (Optimal cluster utilization) Topic: topiclive Partition0 Partition1 Spark cluster Core 0 Core 2 Core 1 • Use minPartitions in reader • Cluster executor cores exceed Kafka partitions • Kafka partitions will split virtually • More parallelism • Better performance When to avoid: • Multiple Streams in 1 application: • Balance core allocation for optimal stream performance • Let the framework make the best decisions. df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("minPartitions", sc.defaultParallelism) .load()
©2024 Databricks Inc. — All rights reserved How to efficiently handle below scenarios? 2. Ingestion - Rate Limitation Apache Kafka Connector for Spark Structured Streaming Structured Streaming Objective 23 Historical Data Apache Kafka Connector for Spark Structured Streaming Structured Streaming Volatile Data Scenario: A Scenario: B
©2024 Databricks Inc. — All rights reserved What Users will try? • Cluster Sizing : Finding right cluster size 24 2. Ingestion - Rate Limitation Issue: • One cluster doesn’t fit all needs • By default, spark will not rate limit while fetching from Kafka
©2024 Databricks Inc. — All rights reserved Scenario: A Topic: topiclive Partition 0 Core 0 Recommendation: • Rate Limitation Value - Applies when records exceed rate limit • Kafka Source: • maxOffsetsPerTrigger • Rate limit on max number of offsets/batch • Split across topic Partitions • Full control on cluster 2. Ingestion - Rate Limitation 0 1 2 3 4 5 6 7 Rate limit=2 Fetch 2 records {Offset:0,1} Scenario: B Topic: topiclive Partition 0 Core 0 0 1 2 3 4 5 6 7 Rate limit=10 Fetch 8 records {Offset:0,7} df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("maxOffsetsPerTrigger", 1000) .load()
©2024 Databricks Inc. — All rights reserved 3. Ingestion - Rate Limitation Scenario: Objective: Change rate limits for existing streams What Users will try? • Stop the stream • Add below config value to readStream .option("maxOffsetsPerTrigger", 5000) • Restart the stream Issue: • Stream doesn’t use new rate limit "id" : "173ddaaa-dc9c-4e50-a2cd-3ffe8b96efd5", "runId" : "90f119f6-f753-4550-b642-3c44c75b6d2b", "name" : null, "timestamp" : "2024-02-26T11:26:49.461Z", "batchId" : 4, "batchDuration" : 25608, "numInputRows" : 24387, • Latest offset of last active batch is stored in Write Ahead Log (WAL) • Spark Streaming will use existing offset from WAL for a batch after stream restart.
©2024 Databricks Inc. — All rights reserved 3. Ingestion - Rate Limitation Checkpoint Internals for Restarting a Stream • Step 1 • List the offsets folder • Find Latest offset file/Write Ahead Log • Step 2 • List the commits folder • Latest commit file • Check if there is a mismatch • Latest offset file > Latest commit file checkpoint_folder/offsets/4 > checkpoint_folder/commits/3 • Fetch the latest offset file for processing • maxOffsetsPerTrigger not considered Latest Offsets: /checkpoint/offsets/4 Latest Commits: / checkpoint /commits/3 Step 1 Step 2
©2024 Databricks Inc. — All rights reserved Method 1 Rate limitation Applies - Latest offset=Latest commit Step 1: Delete the latest offset Step 2: Restart the stream from same checkpoint directory 3. Ingestion - Rate Limitation Recommendation Latest Offset: /checkpoint/offsets/4 Latest commit: /checkpoint/commits/3 After Before Method 2 Restart stream with bigger cluster to process the large microbatch that’s not committed in last step
©2024 Databricks Inc. — All rights reserved 4. Picking Right Trigger mode Objective: ● Use right trigger type for ad-hoc streaming jobs What Users will try? • Start stream with Trigger.Once() mode Issue: • Rate limitations are not respected
©2024 Databricks Inc. — All rights reserved 4. Picking Right Trigger Recommendation: Trigger.AvailableNow • Process all the available data with multiple micro-batches • Rate limitations are respected • Kafka Source: maxOffsetsPerTrigger Note: Available from Spark 3.3.0 onwards # Available-now trigger df = spark .writeStream .format("delta") .trigger(availableNow=True) .start()
©2024 Databricks Inc. — All rights reserved 5. Achieve Exactly Once Semantics Handling Stream Failures Structured Streaming Runtime errors Should be able to fix quickly Failure Type 1 Failure Type 2 Application error Data Quality issues Need code revisit/ Can take time Can lead to data duplication Can lead to data loss
©2024 Databricks Inc. — All rights reserved 5.1 Data duplication Handling Stream Failures Structured Streaming Runtime errors Should be able to fix quickly Failure Type 1 Can lead to data duplication
©2024 Databricks Inc. — All rights reserved 5.1 Data duplication Scenario: Objective: ● Avoid data re-ingestion What users will try? • After stream failure, restart the stream with delete/new checkpoint and startingOffsets=earliest Issue: • New checkpoint re-ingest data from source • Data duplicates in the sink df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("startingOffsets", "earliest") .load()
©2024 Databricks Inc. — All rights reserved 5.1 Data duplication - Data Duplication Recommendation: • Identify the root cause and mitigate the issue • Last option: Only opt to delete checkpoint in the below scenarios. • Delegate data duplication to application(Merge, dropDuplicates etc) • Handle data duplication in downstream layers
©2024 Databricks Inc. — All rights reserved 5.2. Data loss Handling Stream Failures Structured Streaming Failure Type 2 Application error Data Quality issues Need code revisit/ Can take time Can lead to data loss
©2024 Databricks Inc. — All rights reserved 5.2. Data loss Scenario: Objective ● Avoid/Detect data loss What Users will try? ● Stream using DEV/UAT config Issue: • Data in a Kafka topic has been deleted due to expired log retention • Small log.retention.hours • Framework will silently ignore data loss and print below log Logs: KafkaDataConsumer: Cannot fetch offset XXX (GroupId: <GroupId details>, TopicPartition: <Topic-details>). Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option 'failOnDataLoss' to 'true'. df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("failOnDataLoss", "false") .load()
©2024 Databricks Inc. — All rights reserved 5.2. Data loss Recommendation: • failOnDataLoss (default=true) • Consider query termination in case of potential data loss • Examples include deleted topics or out-of-range offsets • Increase the Kafka topic log retention duration • log.retention.hours • log.retention.minutes • log.retention.ms df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("failOnDataLoss", "true") #By default true .load()
©2024 Databricks Inc. — All rights reserved Key Takeaways Optimizing Read Performance Use minPartitions Changing Rate Limits Understand Latest offset/ Latest commit OR Use Bigger cluster Accommodate Varying Data loads Tune maxOffsetsPerTrigger Choose Right Trigger mode Use Trigger.AvailableNow() for adhoc runs Achieve Exactly Once semantics - Don’t delete checkpoint - Always keep failOnDataLoss=True
©2024 Databricks Inc. — All rights reserved

Avoiding Common Pitfalls: Spark Structured Streaming with Kafka

  • 1.
    ©2024 Databricks Inc.— All rights reserved Avoiding common pitfalls: Spark Structured Streaming with Kafka Shasidhar Eranti - Sr Specialist Solutions Architect Vikas Reddy Aravabhumi - Staff Backline Engineer
  • 2.
    ©2024 Databricks Inc.— All rights reserved Agenda ● Streaming in Databricks ● Streaming Reference Architecture ● Kafka Spark Usage patterns ● Kafka Spark Streaming Common Pitfalls ○ Optimizing Read Performance ○ Accommodate Varying Data loads ○ Changing Rate Limits ○ Choosing Right Trigger mode ○ Achieve exactly once semantics ● Key takeaways 2
  • 3.
    ©2024 Databricks Inc.— All rights reserved Shasidhar Eranti Sr Specialist Solutions Architect Databricks (Amsterdam) https://www.linkedin.com/in/shasidhares/ Speakers Vikas Reddy Aravabhumi Staff Backline Engineer Databricks (Amsterdam) https://www.linkedin.com/in/shasidhares/
  • 4.
    ©2024 Databricks Inc.— All rights reserved 12,000+ global customers $1.6B+ in revenue $4B in investment Inventor of the lakehouse & Pioneer of generative AI Gartner-recognized Leader Database Management Systems Data Science and Machine Learning Platforms The data and AI company Creator of Mosaic MPT
  • 5.
    ©2024 Databricks Inc.— All rights reserved Lakehouse Platform Data Warehouse Data Engineering Data Science and ML Data Streaming All structured and unstructured data Cloud Data Lake Unity Catalog Unified governance for all data assets Delta Lake Data reliability and performance Enable all your data teams Data engineers, data scientists, and analysts can easily build streaming data pipelines with the languages and tools they already know. Simplify development and operations Reduce complexity by automating many of the production aspects associated with building and maintaining real-time data workflows. One platform for streaming batch and data Eliminate data silos, centralize security and governance models, and provide complete support for all your real-time use cases. 5 Data streaming made simple Real-time analytics, machine learning and applications on one platform
  • 6.
    ©2024 Databricks Inc.— All rights reserved Data streaming common use cases 6 Real-Time Applications Embed automatic and real-time actions into business applications. Real-Time Analytics Analyze streaming data for instant insights and faster decisions. Real-Time Machine Learning Train models on the freshest data. Score in real-time.
  • 7.
    ©2024 Databricks Inc.— All rights reserved Reference architectures for streaming use cases
  • 8.
    ©2024 Databricks Inc.— All rights reserved 8 Data Sources Mobile & IoT Data Application Events SaaS Applications Machine & Application Logs On-premises Systems Data Warehouses Cloud Storage Streaming ETL with Delta Live Tables Messag e Buses Unity Catalog for data governance and sharing Delta Lake for open and reliable data storage Photon for lightning-fast data processing Workflows for end-to-end orchestration Lakehouse Platform Real-Time BI Apps Real-Time AI Apps Predictive Maintenance Personalized Offers Patient Diagnostics Real-Time Operational Apps Alerts Fraud Detection Dynamic Pricing Real-Time Applications with Spark Structured Streaming Real-Time Analytics with Databricks SQL Real-Time Machine Learning with Databricks ML Integrated products for all your real-time use cases
  • 9.
    ©2024 Databricks Inc.— All rights reserved Analytics Machine Learning Business Insights Automatic Operations Orchestration Observability Data Quality CI / CD Operational Apps Photon Build Production ETL Pipelines with DLT Cloud Storage Message Queues Bronze Layer Silver Layer Gold Layer orders (Streaming Table) customers (Streaming Table) customer_order s (Materialized View) daily_orders (Materialized View) Databricks Lakehouse Platform UNITY CATALOG
  • 10.
    ©2024 Databricks Inc.— All rights reserved 10 Available Resources databricks.com/demos ● Streaming Data with Workflows and Delta Live Tables ● Delta Live Tables Product Tour ● 28 Databricks Tutorials with a pip installer
  • 11.
    ©2024 Databricks Inc.— All rights reserved 11 Apache Kafka & Apache Spark Structured Streaming (Usage Patterns)
  • 12.
    ©2024 Databricks Inc.— All rights reserved Apache Kafka and Apache Spark Structured Streaming Apache Kafka Connector for Spark Structured Streaming Structured Streaming End-to-end Open Source Pipeline 12
  • 13.
    ©2024 Databricks Inc.— All rights reserved Apache Kafka Connector for Spark Structured Streaming Structured Streaming Apache Kafka Connector for Spark Structured Streaming Confluent Kafka - Databricks
  • 14.
    ©2024 Databricks Inc.— All rights reserved Auto Loader Cloud Storage (S3, ADLS, GCS) Structured Streaming Apache Kafka and Databricks Auto Loader Confluent Kafka - AutoLoader - Databricks
  • 15.
    ©2024 Databricks Inc.— All rights reserved Cloud Storage (S3, ADLS, GCS) Delta Lake Sink Connector Confluent Cloud - Databricks Delta Lake Sink Connector Confluent Kafka - Delta - Databricks
  • 16.
    ©2024 Databricks Inc.— All rights reserved 16 Spark Streaming Basics
  • 17.
    ©2024 Databricks Inc.— All rights reserved Sample Spark Structured stream spark .readStream .format("kafka") ... .writeStream .format("delta") .table("delta_stream") .option("checkpointLocation ", "…") .start() Checkpoint Time Structured Streaming Structured Streaming Fault Tolerance Delivery Guarantees
  • 18.
    ©2024 Databricks Inc.— All rights reserved 18 Kafka Spark Streams Common Pitfalls
  • 19.
    ©2024 Databricks Inc.— All rights reserved Apache Kafka and Apache Spark Structured Streaming Apache Kafka Connector for Spark Structured Streaming Structured Streaming Sample Stream for illustration 19
  • 20.
    ©2024 Databricks Inc.— All rights reserved 1. Ingestion Performance Objective • How to get best read throughput from Kafka Scenario: What will users try? • Increase cluster size • Choose VMs with better I/O performance
  • 21.
    ©2024 Databricks Inc.— All rights reserved 1. Ingestion Performance Issue: Structured streaming reads data from each kafka partition in one task. ● #num kafka partitions == #num of read tasks ● 1 Kafka partition – 1 task(/core) ● Idle CPUs - Under utilization of cluster (Under utilized cluster) Topic: topiclive Partition0 Partition1 Spark cluster Core 0 Core 2 Core 1 Idle
  • 22.
    ©2024 Databricks Inc.— All rights reserved Recommendation: 22 1. Ingestion Performance With Fan-out (Optimal cluster utilization) Topic: topiclive Partition0 Partition1 Spark cluster Core 0 Core 2 Core 1 • Use minPartitions in reader • Cluster executor cores exceed Kafka partitions • Kafka partitions will split virtually • More parallelism • Better performance When to avoid: • Multiple Streams in 1 application: • Balance core allocation for optimal stream performance • Let the framework make the best decisions. df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("minPartitions", sc.defaultParallelism) .load()
  • 23.
    ©2024 Databricks Inc.— All rights reserved How to efficiently handle below scenarios? 2. Ingestion - Rate Limitation Apache Kafka Connector for Spark Structured Streaming Structured Streaming Objective 23 Historical Data Apache Kafka Connector for Spark Structured Streaming Structured Streaming Volatile Data Scenario: A Scenario: B
  • 24.
    ©2024 Databricks Inc.— All rights reserved What Users will try? • Cluster Sizing : Finding right cluster size 24 2. Ingestion - Rate Limitation Issue: • One cluster doesn’t fit all needs • By default, spark will not rate limit while fetching from Kafka
  • 25.
    ©2024 Databricks Inc.— All rights reserved Scenario: A Topic: topiclive Partition 0 Core 0 Recommendation: • Rate Limitation Value - Applies when records exceed rate limit • Kafka Source: • maxOffsetsPerTrigger • Rate limit on max number of offsets/batch • Split across topic Partitions • Full control on cluster 2. Ingestion - Rate Limitation 0 1 2 3 4 5 6 7 Rate limit=2 Fetch 2 records {Offset:0,1} Scenario: B Topic: topiclive Partition 0 Core 0 0 1 2 3 4 5 6 7 Rate limit=10 Fetch 8 records {Offset:0,7} df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("maxOffsetsPerTrigger", 1000) .load()
  • 26.
    ©2024 Databricks Inc.— All rights reserved 3. Ingestion - Rate Limitation Scenario: Objective: Change rate limits for existing streams What Users will try? • Stop the stream • Add below config value to readStream .option("maxOffsetsPerTrigger", 5000) • Restart the stream Issue: • Stream doesn’t use new rate limit "id" : "173ddaaa-dc9c-4e50-a2cd-3ffe8b96efd5", "runId" : "90f119f6-f753-4550-b642-3c44c75b6d2b", "name" : null, "timestamp" : "2024-02-26T11:26:49.461Z", "batchId" : 4, "batchDuration" : 25608, "numInputRows" : 24387, • Latest offset of last active batch is stored in Write Ahead Log (WAL) • Spark Streaming will use existing offset from WAL for a batch after stream restart.
  • 27.
    ©2024 Databricks Inc.— All rights reserved 3. Ingestion - Rate Limitation Checkpoint Internals for Restarting a Stream • Step 1 • List the offsets folder • Find Latest offset file/Write Ahead Log • Step 2 • List the commits folder • Latest commit file • Check if there is a mismatch • Latest offset file > Latest commit file checkpoint_folder/offsets/4 > checkpoint_folder/commits/3 • Fetch the latest offset file for processing • maxOffsetsPerTrigger not considered Latest Offsets: /checkpoint/offsets/4 Latest Commits: / checkpoint /commits/3 Step 1 Step 2
  • 28.
    ©2024 Databricks Inc.— All rights reserved Method 1 Rate limitation Applies - Latest offset=Latest commit Step 1: Delete the latest offset Step 2: Restart the stream from same checkpoint directory 3. Ingestion - Rate Limitation Recommendation Latest Offset: /checkpoint/offsets/4 Latest commit: /checkpoint/commits/3 After Before Method 2 Restart stream with bigger cluster to process the large microbatch that’s not committed in last step
  • 29.
    ©2024 Databricks Inc.— All rights reserved 4. Picking Right Trigger mode Objective: ● Use right trigger type for ad-hoc streaming jobs What Users will try? • Start stream with Trigger.Once() mode Issue: • Rate limitations are not respected
  • 30.
    ©2024 Databricks Inc.— All rights reserved 4. Picking Right Trigger Recommendation: Trigger.AvailableNow • Process all the available data with multiple micro-batches • Rate limitations are respected • Kafka Source: maxOffsetsPerTrigger Note: Available from Spark 3.3.0 onwards # Available-now trigger df = spark .writeStream .format("delta") .trigger(availableNow=True) .start()
  • 31.
    ©2024 Databricks Inc.— All rights reserved 5. Achieve Exactly Once Semantics Handling Stream Failures Structured Streaming Runtime errors Should be able to fix quickly Failure Type 1 Failure Type 2 Application error Data Quality issues Need code revisit/ Can take time Can lead to data duplication Can lead to data loss
  • 32.
    ©2024 Databricks Inc.— All rights reserved 5.1 Data duplication Handling Stream Failures Structured Streaming Runtime errors Should be able to fix quickly Failure Type 1 Can lead to data duplication
  • 33.
    ©2024 Databricks Inc.— All rights reserved 5.1 Data duplication Scenario: Objective: ● Avoid data re-ingestion What users will try? • After stream failure, restart the stream with delete/new checkpoint and startingOffsets=earliest Issue: • New checkpoint re-ingest data from source • Data duplicates in the sink df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("startingOffsets", "earliest") .load()
  • 34.
    ©2024 Databricks Inc.— All rights reserved 5.1 Data duplication - Data Duplication Recommendation: • Identify the root cause and mitigate the issue • Last option: Only opt to delete checkpoint in the below scenarios. • Delegate data duplication to application(Merge, dropDuplicates etc) • Handle data duplication in downstream layers
  • 35.
    ©2024 Databricks Inc.— All rights reserved 5.2. Data loss Handling Stream Failures Structured Streaming Failure Type 2 Application error Data Quality issues Need code revisit/ Can take time Can lead to data loss
  • 36.
    ©2024 Databricks Inc.— All rights reserved 5.2. Data loss Scenario: Objective ● Avoid/Detect data loss What Users will try? ● Stream using DEV/UAT config Issue: • Data in a Kafka topic has been deleted due to expired log retention • Small log.retention.hours • Framework will silently ignore data loss and print below log Logs: KafkaDataConsumer: Cannot fetch offset XXX (GroupId: <GroupId details>, TopicPartition: <Topic-details>). Some data may have been lost because they are not available in Kafka any more; either the data was aged out by Kafka or the topic may have been deleted before all the data in the topic was processed. If you want your streaming query to fail on such cases, set the source option 'failOnDataLoss' to 'true'. df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("failOnDataLoss", "false") .load()
  • 37.
    ©2024 Databricks Inc.— All rights reserved 5.2. Data loss Recommendation: • failOnDataLoss (default=true) • Consider query termination in case of potential data loss • Examples include deleted topics or out-of-range offsets • Increase the Kafka topic log retention duration • log.retention.hours • log.retention.minutes • log.retention.ms df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1") .option("subscribe", "topic1") .option("failOnDataLoss", "true") #By default true .load()
  • 38.
    ©2024 Databricks Inc.— All rights reserved Key Takeaways Optimizing Read Performance Use minPartitions Changing Rate Limits Understand Latest offset/ Latest commit OR Use Bigger cluster Accommodate Varying Data loads Tune maxOffsetsPerTrigger Choose Right Trigger mode Use Trigger.AvailableNow() for adhoc runs Achieve Exactly Once semantics - Don’t delete checkpoint - Always keep failOnDataLoss=True
  • 39.
    ©2024 Databricks Inc.— All rights reserved