Hai Lu Software Engineer @ Stream Infra Stream Processing in Python with Apache Samza & Beam
1 2 3 4 Agenda Introduction & Background Deep Dive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
Motivation Make Stream Processing accessible beyond Java! • Support machine learning use cases where Python is dominant. • Many projects at LinkedIn are built in Python without full-fledged stream processing support • Potentially support languages beyond Java and Python in the future. For example, Go.
Background Apache Samza •Apache Samza is a distributed stream processing framework for building stateful applications that process data at scale in real-time. Apache Beam • Apache Beam is an advanced unified programming model designed to provide efficient and portable data processing pipelines.
Background: Samza & Beam Integration Streaming Engine • State-of-the-art data processing API & model. e.g. event time support • Portability framework to support multiple languages • Libraries for ML use cases Streaming API & Model • Large-scale distributed stream processing; Battle tested at LinkedIn scale • Scalable and durable local state • Fault-tolerance and fast recovery
1 2 3 4 Agenda Introduction & Background Deep Dive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
Beam Portability Framework: High-level Python SDK Beam Pipeline Samza Runner Java SDK Go SDK Beam Portability Framework • Users develop streaming application with Beam SDKs • Applications get translated into language independent Beam pipeline (protobuf) • Pipelines get executed in the Samza runner
Beam Pipeline to Samza: Runtime Beam Pipeline Job Server A Tale of Two Languages Python Process Samza Tasks SDK worker gRPC (Fn API) gRPC (Job API) Translate to Samza High Level APIs (Runner API) Java Process • Pipeline submitted to job server and translated into Samza High Level APIs • Each Samza container starts two processes - one Java, one Python • Python worker executes UDFs (user defined functions) • Samza in Java handles everything else (IO, state, shuffle, join, etc.)
Beam Pipeline to Samza: Runtime Beam Pipeline Job Server A Tale of Two Languages Python Process Samza Tasks SDK worker gRPC (Fn API) gRPC (Job API) Translate to Samza High Level APIs (Runner API) Java Process • Job API: Job submission and pipeline management • Runner API: Representation of a beam pipeline and how the pipeline should be executed in a runner • Fn API: Execution control and data transfer between runner and SDK harness. Including data channels, state channels, control channels, etc.
Samza DAG Beam Pipeline to Samza: Translation IO.Read ParDo Window.Into GroupByKey /Combine SystemConsumer/SystemProducer - Consume events and generate watermarks DoFnOp: - ExecutableStageDoFnRunner - Inject Samza watermark, state and timers WindowAssignOp - Invoke window assignment function GroupByKeyOp: - GroupAlsoByWindowViaWindowSetNewDoFn - Samza merge, partitionBy, watermark, state and timers
Kafka IO in Samza Python • From the Python side, users see Python only pipeline = Pipeline(options=get_pipeline_option()) (pipeline | ‘read’ >> ReadFromKafka(cluster=“SrcCluster”, topic=“InputTopic”) | ‘transform’ >> beam.Map(lambda event: process_event(event)) | ‘window’ >> beam.WindowInto(FixedWindows(15)) | ‘count’ >> beam.CombinePerKey(CountCombineFn()) | ‘write’ >> WriteToKafka(cluster=“DstCluster”, topic=“OutputTopic”)) pipeline.run().wait_until_finish() Java Java Python
Kafka IO in Samza Python • Under the hood, the “ReadFromKafka” is translated into a protobuf transform and passed to Samza runner on the Java side. • Here, topic name, cluster name, and configs are enough information for Java side to construct the Kafka consumers syntax = “proto3” package com.linkedin.beam.proto; message KafkaTransformPayload { string topicName = 1; string systemName = 2; repeated ConfigPair configs = 3; } message ConfigPair { string key = 1; string value = 2; }
Beam Pipeline to Samza: Translation For portable pipelines, translate IOs to Java Read/Write. Pros: • Convergence on using the same IO clients (e.g. Kafka clients) • Leverage existing Java components • More efficient to do IOs on Java side Cons: • not leveraging the Beam IO from open source
Beam Pipeline to Samza: Translation Samza Table API: extension of IO • Local table (RocksDb) • Remote table (e.g. Couchbase, other KV store) • Easy stream-table join support • Out-of-box support for rate limiting, caching, etc.
Stream-Table Join in Samza Python (K1, V1) Kafka Input StreamTableJoinOp (K1, Entry 1) (K2, Entry 2) ... Remote/Local Table (K1, [V1, Entry1]) PTransform Output • Table read is provided as stream-table join. • Useful for enriching the events to be processed
Table API in Samza Python • Similar to Kafka IO, tables are also translated to be executed on the Java side • Consistent APIs across Python and Java pipeline = Pipeline(options=get_pipeline_option()) (pipeline | ‘read’ >> ReadFromKafka(cluster=“SrcCluster”, topic=“InputTopic”) | ‘join’ >> JoinWithCouchbase(bucket=“SampleTable”) | ‘transform’ >> beam.Map(lambda event: process_event(event)) | ‘write’ >> WriteToKafka(cluster=“DstCluster”, topic=“OutputTopic”)) pipeline.run().wait_until_finish()
Samza Deployment: Standalone Cluster (p0, p2) (p1, p3) (p4, p5) (p6, p7) ZooKeeper • Start Samza Java from a Python process. Python process manages lifecycles • Using Samza as a library • Coordination (leader election, partition assignment) done by ZooKeeper Samza Processors (Java) SDK workers (Python) Create Submit Job
Samza Deployment: YARN Cluster Samza Container Samza Container Samza Container Application Master YARN Cluster • Using YARN for resource management • Isolation, multi-tenancy, fault-tolerance • Work in progress to support YARN mode in Samza Python
Samza Python Performance Deploying Samza Python to test performance... • Initial result: ~85QPS per container • With saturated single core CPU usage
Performance: without bundling • Single message round trip drags down the performance due to overhead of handling each gRPC message on the Python side Python Java Input event Processed event
• Single message round trip drags down the performance • Bundling (batching) is the key to improve throughputPython Java Input event Processed event ... ... ... ... ... ... Performance: with bundling
Samza Runloop (Java) Performance: Samza Runloop • Events are buffered/bundled for up to N messages or up to T seconds before sending to Python • Buffer is backed by local state (RocksDb) for persistence and will be evicted at the end of bundle Python... Input events Buffered events
Performance Benchmark bundle size QPS (k) 1 0.085 5 0.34 20 1.1 50 1.7 100 2.3 200 2.9 500 3.7 3000 4.1 5000 5.6 10000 7.8 50000 8.3
1 2 3 4 Agenda Introduction & Background Deep Dive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
Near real time Image OCR with Kafka IO Image data in Kafka Python OCR* Tensorflow model (portable) Internal service (image url) Download images from url Load into memory (image url, text) OCR*: Optical Character Recognition. Kafka
Near real time Model Training with Samza Python Feature data in Kafka Window GroupByKey Training algorithm Coefficients Store Offline Training Read coefficients Update coefficients Push (less frequent) HDFS ETL
Near real time activity analysis/monitoring Activity logs in Kafka Fixed window Count.perKey (combine) Process Extract key & timestamp Monitoring/Alerting System Capture (abnormal) activities and send to monitoring/alerting system
1 2 3 4 Agenda Introduction & Background Deep Dive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
Future Samza Portable Runner ● YARN support ● Jupyter Notebook support ● Open source ● Stateful ParDo ● Timer in ParDo
Thank you We’re hiring!

Stream processing in python with Apache Samza and Beam

  • 1.
    Hai Lu Software Engineer@ Stream Infra Stream Processing in Python with Apache Samza & Beam
  • 2.
    1 2 3 4 Agenda Introduction & Background DeepDive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
  • 3.
    Motivation Make Stream Processingaccessible beyond Java! • Support machine learning use cases where Python is dominant. • Many projects at LinkedIn are built in Python without full-fledged stream processing support • Potentially support languages beyond Java and Python in the future. For example, Go.
  • 4.
    Background Apache Samza •Apache Samzais a distributed stream processing framework for building stateful applications that process data at scale in real-time. Apache Beam • Apache Beam is an advanced unified programming model designed to provide efficient and portable data processing pipelines.
  • 5.
    Background: Samza &Beam Integration Streaming Engine • State-of-the-art data processing API & model. e.g. event time support • Portability framework to support multiple languages • Libraries for ML use cases Streaming API & Model • Large-scale distributed stream processing; Battle tested at LinkedIn scale • Scalable and durable local state • Fault-tolerance and fast recovery
  • 6.
    1 2 3 4 Agenda Introduction & Background DeepDive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
  • 7.
    Beam Portability Framework:High-level Python SDK Beam Pipeline Samza Runner Java SDK Go SDK Beam Portability Framework • Users develop streaming application with Beam SDKs • Applications get translated into language independent Beam pipeline (protobuf) • Pipelines get executed in the Samza runner
  • 8.
    Beam Pipeline toSamza: Runtime Beam Pipeline Job Server A Tale of Two Languages Python Process Samza Tasks SDK worker gRPC (Fn API) gRPC (Job API) Translate to Samza High Level APIs (Runner API) Java Process • Pipeline submitted to job server and translated into Samza High Level APIs • Each Samza container starts two processes - one Java, one Python • Python worker executes UDFs (user defined functions) • Samza in Java handles everything else (IO, state, shuffle, join, etc.)
  • 9.
    Beam Pipeline toSamza: Runtime Beam Pipeline Job Server A Tale of Two Languages Python Process Samza Tasks SDK worker gRPC (Fn API) gRPC (Job API) Translate to Samza High Level APIs (Runner API) Java Process • Job API: Job submission and pipeline management • Runner API: Representation of a beam pipeline and how the pipeline should be executed in a runner • Fn API: Execution control and data transfer between runner and SDK harness. Including data channels, state channels, control channels, etc.
  • 10.
    Samza DAG Beam Pipelineto Samza: Translation IO.Read ParDo Window.Into GroupByKey /Combine SystemConsumer/SystemProducer - Consume events and generate watermarks DoFnOp: - ExecutableStageDoFnRunner - Inject Samza watermark, state and timers WindowAssignOp - Invoke window assignment function GroupByKeyOp: - GroupAlsoByWindowViaWindowSetNewDoFn - Samza merge, partitionBy, watermark, state and timers
  • 11.
    Kafka IO inSamza Python • From the Python side, users see Python only pipeline = Pipeline(options=get_pipeline_option()) (pipeline | ‘read’ >> ReadFromKafka(cluster=“SrcCluster”, topic=“InputTopic”) | ‘transform’ >> beam.Map(lambda event: process_event(event)) | ‘window’ >> beam.WindowInto(FixedWindows(15)) | ‘count’ >> beam.CombinePerKey(CountCombineFn()) | ‘write’ >> WriteToKafka(cluster=“DstCluster”, topic=“OutputTopic”)) pipeline.run().wait_until_finish() Java Java Python
  • 12.
    Kafka IO inSamza Python • Under the hood, the “ReadFromKafka” is translated into a protobuf transform and passed to Samza runner on the Java side. • Here, topic name, cluster name, and configs are enough information for Java side to construct the Kafka consumers syntax = “proto3” package com.linkedin.beam.proto; message KafkaTransformPayload { string topicName = 1; string systemName = 2; repeated ConfigPair configs = 3; } message ConfigPair { string key = 1; string value = 2; }
  • 13.
    Beam Pipeline to Samza:Translation For portable pipelines, translate IOs to Java Read/Write. Pros: • Convergence on using the same IO clients (e.g. Kafka clients) • Leverage existing Java components • More efficient to do IOs on Java side Cons: • not leveraging the Beam IO from open source
  • 14.
    Beam Pipeline to Samza:Translation Samza Table API: extension of IO • Local table (RocksDb) • Remote table (e.g. Couchbase, other KV store) • Easy stream-table join support • Out-of-box support for rate limiting, caching, etc.
  • 15.
    Stream-Table Join inSamza Python (K1, V1) Kafka Input StreamTableJoinOp (K1, Entry 1) (K2, Entry 2) ... Remote/Local Table (K1, [V1, Entry1]) PTransform Output • Table read is provided as stream-table join. • Useful for enriching the events to be processed
  • 16.
    Table API inSamza Python • Similar to Kafka IO, tables are also translated to be executed on the Java side • Consistent APIs across Python and Java pipeline = Pipeline(options=get_pipeline_option()) (pipeline | ‘read’ >> ReadFromKafka(cluster=“SrcCluster”, topic=“InputTopic”) | ‘join’ >> JoinWithCouchbase(bucket=“SampleTable”) | ‘transform’ >> beam.Map(lambda event: process_event(event)) | ‘write’ >> WriteToKafka(cluster=“DstCluster”, topic=“OutputTopic”)) pipeline.run().wait_until_finish()
  • 17.
    Samza Deployment: StandaloneCluster (p0, p2) (p1, p3) (p4, p5) (p6, p7) ZooKeeper • Start Samza Java from a Python process. Python process manages lifecycles • Using Samza as a library • Coordination (leader election, partition assignment) done by ZooKeeper Samza Processors (Java) SDK workers (Python) Create Submit Job
  • 18.
    Samza Deployment: YARNCluster Samza Container Samza Container Samza Container Application Master YARN Cluster • Using YARN for resource management • Isolation, multi-tenancy, fault-tolerance • Work in progress to support YARN mode in Samza Python
  • 19.
    Samza Python Performance Deploying SamzaPython to test performance... • Initial result: ~85QPS per container • With saturated single core CPU usage
  • 20.
    Performance: without bundling •Single message round trip drags down the performance due to overhead of handling each gRPC message on the Python side Python Java Input event Processed event
  • 21.
    • Single messageround trip drags down the performance • Bundling (batching) is the key to improve throughputPython Java Input event Processed event ... ... ... ... ... ... Performance: with bundling
  • 22.
    Samza Runloop (Java) Performance:Samza Runloop • Events are buffered/bundled for up to N messages or up to T seconds before sending to Python • Buffer is backed by local state (RocksDb) for persistence and will be evicted at the end of bundle Python... Input events Buffered events
  • 23.
    Performance Benchmark bundle sizeQPS (k) 1 0.085 5 0.34 20 1.1 50 1.7 100 2.3 200 2.9 500 3.7 3000 4.1 5000 5.6 10000 7.8 50000 8.3
  • 24.
    1 2 3 4 Agenda Introduction & Background DeepDive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
  • 25.
    Near real timeImage OCR with Kafka IO Image data in Kafka Python OCR* Tensorflow model (portable) Internal service (image url) Download images from url Load into memory (image url, text) OCR*: Optical Character Recognition. Kafka
  • 26.
    Near real timeModel Training with Samza Python Feature data in Kafka Window GroupByKey Training algorithm Coefficients Store Offline Training Read coefficients Update coefficients Push (less frequent) HDFS ETL
  • 27.
    Near real timeactivity analysis/monitoring Activity logs in Kafka Fixed window Count.perKey (combine) Process Extract key & timestamp Monitoring/Alerting System Capture (abnormal) activities and send to monitoring/alerting system
  • 28.
    1 2 3 4 Agenda Introduction & Background DeepDive: Samza Portable Runner for Beam Use Cases of Stream Processing in Python Future Work
  • 29.
    Future Samza Portable Runner ●YARN support ● Jupyter Notebook support ● Open source ● Stateful ParDo ● Timer in ParDo
  • 30.