Stateful Stream Processing with Kafka and Samza George Li georgexiaoxiaoli@acm.org
What is Apache Kafka? ● A distributed log system developed by LinkedIn ● Logs are stored as files on disk ● Logs are organized into topics (think tables in db). ● Topics are sharded into partitions
What is Apache Kafka? ● Each message is associated with an offset number ● Reads are almost always sequential: from a starting offset to the end of log ● Don’t use Kafka for random read.
https://github.com/abhioncbr/Kafka-Message-Server/wiki/Apache-of-Kafka- Architecture-(As-per-Apache-Kafka-0.8.0-Dcoumentation)
What is Apache Samza? ● A stream processing framework developed by LinkedIn ● Most commonly used in conjunction with Kafka and Yarn ● Often compared with Spark Streaming and Storm
What is Yarn? ● Resource and node management for Hadoop ● You can plug in different resource managers,e.g., Mesos, for Samza ● We use Yarn as resource manager in our solution.
Why not Spark Streaming? ● Our processing logic is mostly stateful ● Spark Streaming’s updateStateByKey is not flexible enough for us. ● We want to keep states across mini-batches and update single k-v pairs
Why not Apache Storm? ● In a Samza solution, topology can be decomposed into standalone nodes (Yarn job) and its in/out edges (Kafka topics) ● We were advised that Storm is relatively hard to configure and tune properly
How does Samza work? ● Our Samza code runs as a Yarn job ● You implement the StreamTask interface, which defines a process() call. ● StreamTask runs inside a task instance, which itself is inside a Yarn container.
How does Samza work? ● Task instance is single-threaded ● Each task instance reads only one partition for each Kafka topic it subscribes to. ● Process() called for each message task instance receives
Samza’s local state ● Local state is stored in a local k-v store such as RocksDB ● The content of the local k-v store is checkpointed into a separate Kafka topic ● Why not use Cassandra for quick random k- v read?
Avoid remote calls inside process() ● Task instance is single-threaded,i.e., remote call is expensive ● Because process() is called for each message, small performance hits add up ● In our use case, Kafka is Samza’s only input source and output destination.
Ask log for the ultimate truth ● To update the state of processing logic, we send a “update state” message to Kafka log ● Task instance knows how to handle such message ● In effect, our StreamTask becomes an interpreter evaluates incoming DSL message
Fault tolerance ● Process some messages, send result to output topics, and update local state ● Task instance restarts. Now we need restart from the last input checkpoint ● Output messages may be received already ● Local states change may have been checkpointed
Any part could fail. Need to simulate these cases
Fault tolerance with local state ● When a task instance restarts, local state is repopulated by reading its own Kafka log ● Yes, reading and repopulating will take a few minutes ● Ideally, message’s effect on state should idempotent ● What if my scenario is not ideal?
Testing fault tolerance ● A stackable trait that randomly redelivers message ● Another stackable trait that randomly injects failure at different functions ● Chaos Monkey on cluster
Reprocessing ● I want to upload a new version of code, but I dont want to kill my current stream job. ● An important problem. Need to design for it from the beginning
Reprocessing Options 1. Pull Kafka log into a data sink, and run batch job against the sink,i.e., lambda architecture 2. Since Kafka holds the ultimate truth, we just replay the log with new code, as long as you can do it fast enough
Is my replay fast enough? ● 5 input partitions. Each holds 10 mil messages ● If our Samza job can process 5k message/sec (which is not a high speed) ● Less than 40 mins to reprocess all 50 mil messages
More problems with Reprocessing ● When can you kill the old Yarn job? ● How does old result/new result affect user experience? ● You need to write your own tool to manage new topics and data sinks
Multi-tenancy Options ● Each tenant gets standalone topic/partitions ● Each topic partition holds data from multiple tenants In our case, the second option gives better resource utilization
Multi-tenancy with local state ● Local k-v store holds data from different tenants ● User should not play with tenant info - need abstraction ● Solution similar to how we handle multi- tenancy in Redis
Multi-tenant solution ● Each message has to carry tenant info ● Use a stackable trait to extract tenant info. ● This trait also controls access to local k-v store. ● Code inside process() has no knowledge of tenant info at all
Testing with local states ● Unit testing is slightly tricky due to API ● Use Kafka.utils.{TestUtils, TestZKUtils} to spawn a test zookeeper/Kafka cluster in process. ● org.apache.samza.test.integration. TestStatefulTask
Performance troubleshooting ● Samza can write metrics to Kafka log. This metrics often shows obvious bottlenecks ● Most performance problems come from process() logic
Performance troubleshooting ● Often Samza metrics does not lead to the root cause ● We rely mostly on jstack and tcpdump ● Tools do not replace problem-solving
Common performance problems ● Excessive JSON parsing ● Inefficient serialization/deserialization ● Accidental blocking call ● Evaluation of logging parameters. Can use Samza’s own logging trait
Questions? Feel free to send me emails/messages

Stateful stream processing with kafka and samza

  • 1.
    Stateful Stream Processingwith Kafka and Samza George Li georgexiaoxiaoli@acm.org
  • 2.
    What is ApacheKafka? ● A distributed log system developed by LinkedIn ● Logs are stored as files on disk ● Logs are organized into topics (think tables in db). ● Topics are sharded into partitions
  • 3.
    What is ApacheKafka? ● Each message is associated with an offset number ● Reads are almost always sequential: from a starting offset to the end of log ● Don’t use Kafka for random read.
  • 4.
  • 5.
    What is ApacheSamza? ● A stream processing framework developed by LinkedIn ● Most commonly used in conjunction with Kafka and Yarn ● Often compared with Spark Streaming and Storm
  • 6.
    What is Yarn? ●Resource and node management for Hadoop ● You can plug in different resource managers,e.g., Mesos, for Samza ● We use Yarn as resource manager in our solution.
  • 7.
    Why not SparkStreaming? ● Our processing logic is mostly stateful ● Spark Streaming’s updateStateByKey is not flexible enough for us. ● We want to keep states across mini-batches and update single k-v pairs
  • 8.
    Why not ApacheStorm? ● In a Samza solution, topology can be decomposed into standalone nodes (Yarn job) and its in/out edges (Kafka topics) ● We were advised that Storm is relatively hard to configure and tune properly
  • 9.
    How does Samzawork? ● Our Samza code runs as a Yarn job ● You implement the StreamTask interface, which defines a process() call. ● StreamTask runs inside a task instance, which itself is inside a Yarn container.
  • 10.
    How does Samzawork? ● Task instance is single-threaded ● Each task instance reads only one partition for each Kafka topic it subscribes to. ● Process() called for each message task instance receives
  • 12.
    Samza’s local state ●Local state is stored in a local k-v store such as RocksDB ● The content of the local k-v store is checkpointed into a separate Kafka topic ● Why not use Cassandra for quick random k- v read?
  • 13.
    Avoid remote callsinside process() ● Task instance is single-threaded,i.e., remote call is expensive ● Because process() is called for each message, small performance hits add up ● In our use case, Kafka is Samza’s only input source and output destination.
  • 14.
    Ask log forthe ultimate truth ● To update the state of processing logic, we send a “update state” message to Kafka log ● Task instance knows how to handle such message ● In effect, our StreamTask becomes an interpreter evaluates incoming DSL message
  • 15.
    Fault tolerance ● Processsome messages, send result to output topics, and update local state ● Task instance restarts. Now we need restart from the last input checkpoint ● Output messages may be received already ● Local states change may have been checkpointed
  • 16.
    Any part couldfail. Need to simulate these cases
  • 17.
    Fault tolerance withlocal state ● When a task instance restarts, local state is repopulated by reading its own Kafka log ● Yes, reading and repopulating will take a few minutes ● Ideally, message’s effect on state should idempotent ● What if my scenario is not ideal?
  • 18.
    Testing fault tolerance ●A stackable trait that randomly redelivers message ● Another stackable trait that randomly injects failure at different functions ● Chaos Monkey on cluster
  • 19.
    Reprocessing ● I wantto upload a new version of code, but I dont want to kill my current stream job. ● An important problem. Need to design for it from the beginning
  • 20.
    Reprocessing Options 1. PullKafka log into a data sink, and run batch job against the sink,i.e., lambda architecture 2. Since Kafka holds the ultimate truth, we just replay the log with new code, as long as you can do it fast enough
  • 22.
    Is my replayfast enough? ● 5 input partitions. Each holds 10 mil messages ● If our Samza job can process 5k message/sec (which is not a high speed) ● Less than 40 mins to reprocess all 50 mil messages
  • 23.
    More problems withReprocessing ● When can you kill the old Yarn job? ● How does old result/new result affect user experience? ● You need to write your own tool to manage new topics and data sinks
  • 24.
    Multi-tenancy Options ● Eachtenant gets standalone topic/partitions ● Each topic partition holds data from multiple tenants In our case, the second option gives better resource utilization
  • 26.
    Multi-tenancy with localstate ● Local k-v store holds data from different tenants ● User should not play with tenant info - need abstraction ● Solution similar to how we handle multi- tenancy in Redis
  • 27.
    Multi-tenant solution ● Eachmessage has to carry tenant info ● Use a stackable trait to extract tenant info. ● This trait also controls access to local k-v store. ● Code inside process() has no knowledge of tenant info at all
  • 28.
    Testing with localstates ● Unit testing is slightly tricky due to API ● Use Kafka.utils.{TestUtils, TestZKUtils} to spawn a test zookeeper/Kafka cluster in process. ● org.apache.samza.test.integration. TestStatefulTask
  • 29.
    Performance troubleshooting ● Samzacan write metrics to Kafka log. This metrics often shows obvious bottlenecks ● Most performance problems come from process() logic
  • 30.
    Performance troubleshooting ● OftenSamza metrics does not lead to the root cause ● We rely mostly on jstack and tcpdump ● Tools do not replace problem-solving
  • 31.
    Common performance problems ●Excessive JSON parsing ● Inefficient serialization/deserialization ● Accidental blocking call ● Evaluation of logging parameters. Can use Samza’s own logging trait
  • 32.
    Questions? Feel free tosend me emails/messages