Streaming Data Lakes Using Kafka Connect +Apache Hudi Balaji Varadarajan, Vinoth Chandar
Speakers Vinoth Chandar PMC Chair/Creator of Hudi Sr.Staff Eng @ Uber (Data Infra/Platforms, Networking) Principal Eng @ Confluent (ksqlDB, Kafka/Streams) Staff Eng @ Linkedin (Voldemort, DDS) Sr Eng @ Oracle (CDC/Goldengate/XStream) Balaji Varadarajan PMC Member, Apache Hudi Sr. Staff Eng @ Robinhood, Data Infra Tech Lead @Uber, Data Platform Staff Engineer @Linkedin, Databus CDC
Agenda 1) Background 2) Hudi 101 3) Hudi’s Spark Writers (existing) 4) Kafka Connect Sink (new) 5) Onwards
Background Event Streams, Data Lakes
Data Lakes are now essential Architectural Pattern for Analytical Data ❏ Data Lake != Spark, Flink ❏ Data Lake != Files on S3 ❏ Raw data (OLTP schema) ❏ Derived Data (OLAP/BI, ML schema) Open Storage + Scalable Compute ❏ Avoid data lock-in, Open formats (data + metadata) ❏ Efficient, Universal (Analytics, Data Science) Lot of exciting progress ❏ Lakehouse = Lake + Warehouse ❏ Data meshes on Lakes => Need for streams Source: https://martinfowler.com/bliki/images/dataLake/context.png
Event Streams are the new norm Events come in many flavors Database change Events ❏ High fidelity, High value, update/deletes ❏ E.g: Debezium changelogs into Kafka Application/Service business events ❏ High volume, Immutable or Deltas, ❏ E.g: Emit Uber app events, emit changes from IoT sensors SaaS Data Sources ❏ Lower volume, mutable ❏ E.g: polling Github events API
Database Kafka Cluster Apps/ Services Event Firehose External Sources Extracting Event Streams Kafka Connect Sources
Why not just Connect File Sinks? Queries DFS/Cloud Storage Data Lake?? Files Kafka Cluster Kafka Connect Sinks (S3/HDFS)
Challenges Working at the file abstraction level is painful ❏ Transactional, Concurrency Control ❏ Updates subset of data, indexing for faster access Scalability, Operational Overhead ❏ Writing columnar files is resource intensive ❏ Partitioned data increases memory overhead Lack of management ❏ Control file sizes, Deletes for GDPR/Compliance ❏ Re-align storage for better query performance
Apache Hudi Transactional Writes, MVCC/OCC ❏ Work with tables and records ❏ Automatic compaction, clustering, sizing First class support for Updates, Deletes ❏ Record level Update/Deletes inspired by stream processors CDC Streams From Lake Storage ❏ Storage Layout optimized for incremental fetches ❏ Hudi’s unique contribution in the space
Hudi 101 Components, APIs, Architecture
Stream processing + Batch data The Incremental Stack + Intelligent, Incremental + Fast, Efficient + Scans, Columnar formats + Scalable Compute https://www.oreilly.com/content/ubers-case-for- incremental-processing-on-hadoop/; 2016
The Hudi Stack ❏ Complete “data” lake platform ❏ Tightly integrated, Self managing ❏ Write using Spark, Flink ❏ Query using Spark, Flink, Hive, Presto, Trino, Impala, AWS Athena/Redshift, Aliyun DLA etc ❏ Out-of-box tools/services for data ops http://hudi.apache.org/blog/2021/07/21/st reaming-data-lake-platform
Storage Layout
❏ Powers arguably the largest transactional data lake on the planet @ Uber ❏ (Database CDC) Robinhood’s near-realtime data lake ❏ (ML Feature stores) @ Logical Clocks ❏ (Event Deletions/De-Duping) @ Moveworks ❏ Many more companies, pre-installed by 5 major cloud providers 1000+ Slack members 150+ Contributors 1000+ GH Engagers ~10-20 PRs/week 20+ Committers 10+ PMCs The Community
Hudi DeltaStreamer Efficient, Micro-batched
Event Streams DFS/Cloud Storage Tables Pull using Spark Kafka De-Dupe Indexing Txn DeltaStreamer Utility, Spark Streaming Cluster Optimize Compact Apply Pull Cleaning
Current Kafka to Hudi Options - Ingest streaming data to Data Lake - Raw Tables - Current Solutions through Spark: - Hudi DeltaStreamer - Spark Structured Streaming Kafka Cluster Hudi DeltaStreamer Spark Structured Streaming DFS/Cloud Storage Tables Apply
Structured Streaming Sink // Read data from stream Dataset<Row> streamingInput = spark.readStream()... // Write to Hudi in a streaming fashion DataStreamWriter<Row> writer = streamingInput.writeStream() .format("org.apache.hudi") .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "timestamp") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName) .option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append()); String tablePath = “s3://…." // Schedule the job StreamingQuery query = ... writer.trigger(Trigger.ProcessingTime(500)).start(tablePath); query.awaitTermination(streamingDurationInMs);
DeltaStreamer Utility ❏ Fully Managed Ingestion and ETL service ❏ Integration with various Streaming and batch sources ❏ Table State & Checkpoints transactionally consistent ❏ Pluggable Transformations for ETL use cases.
DeltaStreamer Example spark-submit --master yarn --packages org.apache.hudi:hudi-utilities-bundle_2.12:0.8.0 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 ... --enable-sync --hoodie-conf auto.offset.reset=latest --hoodie-conf hoodie.avro.schema.validate=true …. --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --props /path/job.properties --transformer-class com.some.someTransformer --continuous ← Enables async compaction, clustering & cleaning along with streaming writes Streaming Data Lake without writing any code!
Case Study: Robinhood Data Lake Master RDS Replica RDS Table Topic DeltaStreamer (Live) DeltaStreamer (Bootstrap) DATA LAKE (s3://xxx/… Update schema and partition Write incremental data and checkpoint offsets
Case Study: Robinhood Data Lake ❏ 1000s of CDC based Streaming ingest pipelines supported by Apache Hudi DeltaStreamer. ❏ Data Lake freshness Latency down to 5-15 mins from hours. ❏ Powers critical dashboards and use-cases
End-to-End Streaming Data Lake ❏ Data Lake has both raw tables and derived tables built through ETLs. ❏ Streaming Data-lake - Needs streaming semantics supported for both kinds of tables. ❏ The Missing Primitive : Derived Tables need Changelog view of the upstream dataset -> Apache Hudi Incremental Read to rescue
The Big Picture Pull Database Event Streams Apps/ Service s External Sources CDC Push Streaming Data Lake Raw Tables DeltaStreamer Spark Streaming Hudi Change log Derived Tables DeltaStreamer Spark Streaming
Connect Hudi Sink Kafkaesque, Commit protocol, Transactional
Motivations Integration with Kafka Connect ❏ Separation of concerns (writing vs optimization/management) ❏ Streamline operationally, just one framework for ingesting ❏ Less need for Spark expertise Faster data ❏ Amortize startup costs (containers, queue delays) ❏ Commit frequently i.e every 1 minute (every N secs in near future) ❏ E.g avro records in Kafka log to Hudi’s log format
Putting it all together Event Streams DFS/Cloud Storage Tables Kafka De-Dupe Indexing Txn Hudi Connect Sink (Writing) Commit Pull Compact Cluster Hudi’s Table Services (Optimization, management) Clean Deletes
Design Challenges Determining Transaction Boundaries ❏ No co-ordination via driver process like Spark/Flink ❏ Workers doing their own commits => horrible concurrency bottlenecks Connect APIs cannot express DAGs ❏ Meant to be simple `putRecords()`/`preCommit()` ❏ Indexing, De-duplication, Storage optimization all shuffle data
Design Overview Central Transaction Co-ordination ❏ Use Kafka to elect co- ordinator. ❏ Runs in one of the workers Kafka as control channel ❏ Consume from latest control topic offsets https://cwiki.apache.org/confluence/display/HUDI/RFC-32+Kafka+Connect+Sink+for+Hudi
Design Overview Transaction Coordinator ❏ Daemon thread on owner of partition 0 ❏ Sends commands to participants Embedded Hudi Java Writer ❏ Lands data into set of file groups, mapped to a partition ❏ Hudi’s commit fencing guards from failures/partial writes
Co-ordinator State Machine Paxos-like two phase commit ❏ Co-ordinator process to start, end commits ❏ Safety > liveness, abort after timeout Participants “pause” at each commit boundary ❏ Return latest write offsets to co-ordinator ❏ Resume again on start of next commit
Example Sink Configuration # hudi table properties target.base.path target.table.name target.database.name schemaprovider.class partition.field.name hoodie.table.base.file.format Pre-release, subject to change. Refer to official Hudi docs, for actual config names. # controller properties control.topic.name coordinator.writestatus.timeout write.retry.timeout
Choosing Right Delta Streamer Connect Sink Provides full set of Hudi features Insert only for now, indexes/updates coming as enhancements Offers better elasticity for merging/writing columnar data i.e copy-on-write tables Great impedance match with Kafka, for landing avro/row-oriented data i.e merge-on- read tables Data freshness of several minutes, if not running in continuous mode Approach ~1 min freshness Need experience with Spark/Flink Operate all data ingestion in a single framework.
What’s to come Onwards
Kafka + Hudi Support for mutable, keyed updates/deletes ❏ Need to implement a new index ala Flink writer ❏ preCombine, buffering/batching What if : Back Kafka’s tiered storage using Hudi ❏ Map offsets to Hudi commit_seq_no ❏ Columnar reads for historical/catch-up reads
Engage With Our Community User Docs : https://hudi.apache.org Technical Wiki : https://cwiki.apache.org/confluence/display/HUDI Github : https://github.com/apache/hudi/ Twitter : https://twitter.com/apachehudi Mailing list(s) : dev-subscribe@hudi.apache.org (send an empty email to subscribe) dev@hudi.apache.org (actual mailing list) Slack : https://join.slack.com/t/apache-hudi/signup
Questions? Thanks!

Streaming Data Lakes using Kafka Connect + Apache Hudi | Vinoth Chandar, Apache Software Foundation

  • 1.
    Streaming Data LakesUsing Kafka Connect +Apache Hudi Balaji Varadarajan, Vinoth Chandar
  • 2.
    Speakers Vinoth Chandar PMC Chair/Creatorof Hudi Sr.Staff Eng @ Uber (Data Infra/Platforms, Networking) Principal Eng @ Confluent (ksqlDB, Kafka/Streams) Staff Eng @ Linkedin (Voldemort, DDS) Sr Eng @ Oracle (CDC/Goldengate/XStream) Balaji Varadarajan PMC Member, Apache Hudi Sr. Staff Eng @ Robinhood, Data Infra Tech Lead @Uber, Data Platform Staff Engineer @Linkedin, Databus CDC
  • 3.
    Agenda 1) Background 2) Hudi101 3) Hudi’s Spark Writers (existing) 4) Kafka Connect Sink (new) 5) Onwards
  • 4.
  • 5.
    Data Lakes arenow essential Architectural Pattern for Analytical Data ❏ Data Lake != Spark, Flink ❏ Data Lake != Files on S3 ❏ Raw data (OLTP schema) ❏ Derived Data (OLAP/BI, ML schema) Open Storage + Scalable Compute ❏ Avoid data lock-in, Open formats (data + metadata) ❏ Efficient, Universal (Analytics, Data Science) Lot of exciting progress ❏ Lakehouse = Lake + Warehouse ❏ Data meshes on Lakes => Need for streams Source: https://martinfowler.com/bliki/images/dataLake/context.png
  • 6.
    Event Streams arethe new norm Events come in many flavors Database change Events ❏ High fidelity, High value, update/deletes ❏ E.g: Debezium changelogs into Kafka Application/Service business events ❏ High volume, Immutable or Deltas, ❏ E.g: Emit Uber app events, emit changes from IoT sensors SaaS Data Sources ❏ Lower volume, mutable ❏ E.g: polling Github events API
  • 7.
  • 8.
    Why not justConnect File Sinks? Queries DFS/Cloud Storage Data Lake?? Files Kafka Cluster Kafka Connect Sinks (S3/HDFS)
  • 9.
    Challenges Working at thefile abstraction level is painful ❏ Transactional, Concurrency Control ❏ Updates subset of data, indexing for faster access Scalability, Operational Overhead ❏ Writing columnar files is resource intensive ❏ Partitioned data increases memory overhead Lack of management ❏ Control file sizes, Deletes for GDPR/Compliance ❏ Re-align storage for better query performance
  • 10.
    Apache Hudi Transactional Writes,MVCC/OCC ❏ Work with tables and records ❏ Automatic compaction, clustering, sizing First class support for Updates, Deletes ❏ Record level Update/Deletes inspired by stream processors CDC Streams From Lake Storage ❏ Storage Layout optimized for incremental fetches ❏ Hudi’s unique contribution in the space
  • 11.
  • 12.
    Stream processing +Batch data The Incremental Stack + Intelligent, Incremental + Fast, Efficient + Scans, Columnar formats + Scalable Compute https://www.oreilly.com/content/ubers-case-for- incremental-processing-on-hadoop/; 2016
  • 13.
    The Hudi Stack ❏Complete “data” lake platform ❏ Tightly integrated, Self managing ❏ Write using Spark, Flink ❏ Query using Spark, Flink, Hive, Presto, Trino, Impala, AWS Athena/Redshift, Aliyun DLA etc ❏ Out-of-box tools/services for data ops http://hudi.apache.org/blog/2021/07/21/st reaming-data-lake-platform
  • 14.
  • 15.
    ❏ Powers arguablythe largest transactional data lake on the planet @ Uber ❏ (Database CDC) Robinhood’s near-realtime data lake ❏ (ML Feature stores) @ Logical Clocks ❏ (Event Deletions/De-Duping) @ Moveworks ❏ Many more companies, pre-installed by 5 major cloud providers 1000+ Slack members 150+ Contributors 1000+ GH Engagers ~10-20 PRs/week 20+ Committers 10+ PMCs The Community
  • 16.
  • 17.
    Event Streams DFS/Cloud Storage Tables Pull usingSpark Kafka De-Dupe Indexing Txn DeltaStreamer Utility, Spark Streaming Cluster Optimize Compact Apply Pull Cleaning
  • 18.
    Current Kafka toHudi Options - Ingest streaming data to Data Lake - Raw Tables - Current Solutions through Spark: - Hudi DeltaStreamer - Spark Structured Streaming Kafka Cluster Hudi DeltaStreamer Spark Structured Streaming DFS/Cloud Storage Tables Apply
  • 19.
    Structured Streaming Sink //Read data from stream Dataset<Row> streamingInput = spark.readStream()... // Write to Hudi in a streaming fashion DataStreamWriter<Row> writer = streamingInput.writeStream() .format("org.apache.hudi") .option(DataSourceWriteOptions.TABLE_TYPE.key(), tableType) .option(DataSourceWriteOptions.RECORDKEY_FIELD.key(), "_row_key") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "partition") .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "timestamp") .option(HoodieWriteConfig.TABLE_NAME.key(), tableName) .option("checkpointLocation", checkpointLocation) .outputMode(OutputMode.Append()); String tablePath = “s3://…." // Schedule the job StreamingQuery query = ... writer.trigger(Trigger.ProcessingTime(500)).start(tablePath); query.awaitTermination(streamingDurationInMs);
  • 20.
    DeltaStreamer Utility ❏ FullyManaged Ingestion and ETL service ❏ Integration with various Streaming and batch sources ❏ Table State & Checkpoints transactionally consistent ❏ Pluggable Transformations for ETL use cases.
  • 21.
    DeltaStreamer Example spark-submit --master yarn --packagesorg.apache.hudi:hudi-utilities-bundle_2.12:0.8.0 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 ... --enable-sync --hoodie-conf auto.offset.reset=latest --hoodie-conf hoodie.avro.schema.validate=true …. --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --props /path/job.properties --transformer-class com.some.someTransformer --continuous ← Enables async compaction, clustering & cleaning along with streaming writes Streaming Data Lake without writing any code!
  • 22.
    Case Study: RobinhoodData Lake Master RDS Replica RDS Table Topic DeltaStreamer (Live) DeltaStreamer (Bootstrap) DATA LAKE (s3://xxx/… Update schema and partition Write incremental data and checkpoint offsets
  • 23.
    Case Study: RobinhoodData Lake ❏ 1000s of CDC based Streaming ingest pipelines supported by Apache Hudi DeltaStreamer. ❏ Data Lake freshness Latency down to 5-15 mins from hours. ❏ Powers critical dashboards and use-cases
  • 24.
    End-to-End Streaming DataLake ❏ Data Lake has both raw tables and derived tables built through ETLs. ❏ Streaming Data-lake - Needs streaming semantics supported for both kinds of tables. ❏ The Missing Primitive : Derived Tables need Changelog view of the upstream dataset -> Apache Hudi Incremental Read to rescue
  • 25.
    The Big Picture Pull DatabaseEvent Streams Apps/ Service s External Sources CDC Push Streaming Data Lake Raw Tables DeltaStreamer Spark Streaming Hudi Change log Derived Tables DeltaStreamer Spark Streaming
  • 26.
    Connect Hudi Sink Kafkaesque,Commit protocol, Transactional
  • 27.
    Motivations Integration with KafkaConnect ❏ Separation of concerns (writing vs optimization/management) ❏ Streamline operationally, just one framework for ingesting ❏ Less need for Spark expertise Faster data ❏ Amortize startup costs (containers, queue delays) ❏ Commit frequently i.e every 1 minute (every N secs in near future) ❏ E.g avro records in Kafka log to Hudi’s log format
  • 28.
    Putting it alltogether Event Streams DFS/Cloud Storage Tables Kafka De-Dupe Indexing Txn Hudi Connect Sink (Writing) Commit Pull Compact Cluster Hudi’s Table Services (Optimization, management) Clean Deletes
  • 29.
    Design Challenges Determining TransactionBoundaries ❏ No co-ordination via driver process like Spark/Flink ❏ Workers doing their own commits => horrible concurrency bottlenecks Connect APIs cannot express DAGs ❏ Meant to be simple `putRecords()`/`preCommit()` ❏ Indexing, De-duplication, Storage optimization all shuffle data
  • 30.
    Design Overview Central TransactionCo-ordination ❏ Use Kafka to elect co- ordinator. ❏ Runs in one of the workers Kafka as control channel ❏ Consume from latest control topic offsets https://cwiki.apache.org/confluence/display/HUDI/RFC-32+Kafka+Connect+Sink+for+Hudi
  • 31.
    Design Overview Transaction Coordinator ❏Daemon thread on owner of partition 0 ❏ Sends commands to participants Embedded Hudi Java Writer ❏ Lands data into set of file groups, mapped to a partition ❏ Hudi’s commit fencing guards from failures/partial writes
  • 32.
    Co-ordinator State Machine Paxos-liketwo phase commit ❏ Co-ordinator process to start, end commits ❏ Safety > liveness, abort after timeout Participants “pause” at each commit boundary ❏ Return latest write offsets to co-ordinator ❏ Resume again on start of next commit
  • 33.
    Example Sink Configuration #hudi table properties target.base.path target.table.name target.database.name schemaprovider.class partition.field.name hoodie.table.base.file.format Pre-release, subject to change. Refer to official Hudi docs, for actual config names. # controller properties control.topic.name coordinator.writestatus.timeout write.retry.timeout
  • 34.
    Choosing Right Delta StreamerConnect Sink Provides full set of Hudi features Insert only for now, indexes/updates coming as enhancements Offers better elasticity for merging/writing columnar data i.e copy-on-write tables Great impedance match with Kafka, for landing avro/row-oriented data i.e merge-on- read tables Data freshness of several minutes, if not running in continuous mode Approach ~1 min freshness Need experience with Spark/Flink Operate all data ingestion in a single framework.
  • 35.
  • 36.
    Kafka + Hudi Supportfor mutable, keyed updates/deletes ❏ Need to implement a new index ala Flink writer ❏ preCombine, buffering/batching What if : Back Kafka’s tiered storage using Hudi ❏ Map offsets to Hudi commit_seq_no ❏ Columnar reads for historical/catch-up reads
  • 37.
    Engage With OurCommunity User Docs : https://hudi.apache.org Technical Wiki : https://cwiki.apache.org/confluence/display/HUDI Github : https://github.com/apache/hudi/ Twitter : https://twitter.com/apachehudi Mailing list(s) : dev-subscribe@hudi.apache.org (send an empty email to subscribe) dev@hudi.apache.org (actual mailing list) Slack : https://join.slack.com/t/apache-hudi/signup
  • 38.