Deep dive into Apache Kafka consumption
Goals • Better understanding of Apache Kafka architecture and possible delivery guarantees • The happy coding path towards fault-tolerant Kafka consumption using Kafka Java client and Akka Stream
Apache Kafka? “Apache Kafka is publish-subscribe messaging system rethought as a distributed commit log.”
Consumer poll messages
Consumer poll messages Ordering per partition
Consumer Commit storage (ZK, Kafka, …) commit: (Partition 0, Offset 5)
 (Partition 1, Offset 3) (Partition 2, Offset 10) poll messages
Consumer Commit storage (ZK, Kafka, …) poll messages
Consumer Commit storage (ZK, Kafka, …) get: (Partition 0, Offset 5)
 (Partition 1, Offset 3) (Partition 2, Offset 10) poll messages restarting…
Consumer 1 Commit storage (ZK, Kafka, …) commit: (Partition 0, Offset 5)
 (Partition 1, Offset 3) poll messages Consumer 2 commit:
 (Partition 2, Offset 10) Same consumer-group
 (balance)
Consumer 1 Commit storage (ZK, Kafka, …) commit: (Partition 0, Offset 5)
 (Partition 1, Offset 3) (Partition 2, Offset 10) poll messages Consumer 2 Different consumer-groups
 (broadcast) commit: (Partition 0, Offset 2)
 (Partition 1, Offset 1) (Partition 2, Offset 3)
Delivery guarantees: commit before 1. Get message 2. Commit offset 3. Begin message processing 4. End message processing loop:
Delivery guarantees: 
 commit before 1. Get message 2. Commit offset 3. Begin message processing 4. End message processing Node failure / 
 Redeployment / Processing failure Message lost! 
 At-most-once guarantee loop:
Delivery guarantees:
 commit after 1. Get message 2. Begin message processing 3. End message processing 4. Commit offset Node failure / 
 Redeployment / Processing failure Message processed twice! 
 At-least-once guarantee loop:
Delivery guarantees: 
 auto-commit 1. Get message 2. Begin message processing 3. End message processing Node failure / 
 Redeployment / Processing failure Message lost OR processed twice! 
 No guarantee loop:
Delivery guarantees: exactly-once? • At-least-once + idempotent message processing • ex: update a key-value DB that stores the last state of a device • At-least-once + atomic message processing and storage of offset • ex: store offset + message in a SQL DB in a transaction, and use this DB as the main offset storage
How can I apply these concepts in my code?
Kafka Java client: 
 at-least-once
Async non-blocking? • In a Reactive/Scala world, message processing is usually asynchronous (non-blocking IO call to a DB, ask Akka actor, …):
 
 def processMsg(message: String): Future[Result] • How to process your Kafka messages staying reactive (i.e not blocking threads)?
Kafka Java client: 
 async non-blocking?
Kafka Java client: 
 async non-blocking? • Out-of-order processing! • No guarantee anymore! (offset N can be committed before N-1, “shadowing” N-1) • Unbounded amount of messages in-memory. If Kafka message rate > processing speed, can lead to Out Of Memory
What do we need? Ordered asynchronous stream processing with back pressure
What do we need? Ordered asynchronous stream processing with back pressure ENTER REACTIVE STREAMS
Reactive Streams • “Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.” • Backed by Netflix, Pivotal, Red Hat, Twitter, Lightbend (Typesafe), … • Implementations: RxJava, Akka Stream, Reactor, …
Akka Stream • Stream processing abstraction on top of Akka Actors • Types! Types are back! • Source[A] ~> Flow[A, B] ~> Sink[B] • Automatic back pressure
Reactive Kafka • Akka Stream client for Kafka • On top of Kafka Java client 0.9+ • https://github.com/akka/reactive-kafka
Reactive Kafka
Reactive Kafka • At-least-once semantic in case of node failure / redeployment • Asynchronous processing without blocking any thread • Back pressure • Ordered processing • But what if the processMsg function fails?
The difference between Error and Failure • Error: something went wrong, and this is deterministic (it will happen again if you do the same call)
 ex: HTTP 4xx, Deserialisation exception, Duplicate key DB error • Failure: something went wrong, and this is not deterministic (it may not happen again if you do the same call):
 ex: HTTP 5xx, network exception
Error and Failure in Scala code using Scalactic Future[Result Or Every[Error]] can contain one or more Errorscan contain a Failure
Error and Failure in Scala code (non-async) Try[Result Or Every[Error]] can contain one or more Errorscan contain a Failure
Fault-tolerant consumption with Reactive Kafka
Keeping message ordering even in failure cases • Retrying message processing upon failures will block the processing of subsequent messages, but that’s ok if message processing is homogenous • ex: if processMsg of msg N results in a network failure calling a DB (say ELS), there is a high probability that processMsg of msg N+1 will encounter the same failure, so blocking is ok and even better to avoid losing messages due to transient failures • If message processing is heterogenous (calling different external systems according to the msg), it is better to implement different consumer-groups and/or have different topics
Consumer poll messages Reminder: Kafka guarantees ordering only per partition If #(consumer instances) < #(Kafka partitions), 
 at least one consumer instance will process two or more partitions
Parallel processing between partitions while keeping ordering per partition
Bonus: auto-adaptive micro-batching windows per partition based on back pressure signal Dynamic trade-off between latency and throughput!
Conclusion • Apache Kafka as a system is scalable and fault- tolerant but fault-tolerant consumption can be tricky • But with the right concepts and the right tools, we can make Kafka consumption fault-tolerant very easily (i.e with a few lines of extra code)
Thank you! Questions?

Deep dive into Apache Kafka consumption

  • 1.
    Deep dive intoApache Kafka consumption
  • 2.
    Goals • Better understandingof Apache Kafka architecture and possible delivery guarantees • The happy coding path towards fault-tolerant Kafka consumption using Kafka Java client and Akka Stream
  • 3.
    Apache Kafka? “Apache Kafkais publish-subscribe messaging system rethought as a distributed commit log.”
  • 6.
  • 7.
  • 8.
    Consumer Commit storage (ZK, Kafka,…) commit: (Partition 0, Offset 5)
 (Partition 1, Offset 3) (Partition 2, Offset 10) poll messages
  • 9.
  • 10.
    Consumer Commit storage (ZK, Kafka,…) get: (Partition 0, Offset 5)
 (Partition 1, Offset 3) (Partition 2, Offset 10) poll messages restarting…
  • 11.
    Consumer 1 Commit storage (ZK, Kafka,…) commit: (Partition 0, Offset 5)
 (Partition 1, Offset 3) poll messages Consumer 2 commit:
 (Partition 2, Offset 10) Same consumer-group
 (balance)
  • 12.
    Consumer 1 Commit storage (ZK, Kafka,…) commit: (Partition 0, Offset 5)
 (Partition 1, Offset 3) (Partition 2, Offset 10) poll messages Consumer 2 Different consumer-groups
 (broadcast) commit: (Partition 0, Offset 2)
 (Partition 1, Offset 1) (Partition 2, Offset 3)
  • 13.
    Delivery guarantees: commit before 1.Get message 2. Commit offset 3. Begin message processing 4. End message processing loop:
  • 14.
    Delivery guarantees: 
 commitbefore 1. Get message 2. Commit offset 3. Begin message processing 4. End message processing Node failure / 
 Redeployment / Processing failure Message lost! 
 At-most-once guarantee loop:
  • 15.
    Delivery guarantees:
 commit after 1.Get message 2. Begin message processing 3. End message processing 4. Commit offset Node failure / 
 Redeployment / Processing failure Message processed twice! 
 At-least-once guarantee loop:
  • 16.
    Delivery guarantees: 
 auto-commit 1.Get message 2. Begin message processing 3. End message processing Node failure / 
 Redeployment / Processing failure Message lost OR processed twice! 
 No guarantee loop:
  • 17.
    Delivery guarantees: exactly-once? • At-least-once+ idempotent message processing • ex: update a key-value DB that stores the last state of a device • At-least-once + atomic message processing and storage of offset • ex: store offset + message in a SQL DB in a transaction, and use this DB as the main offset storage
  • 18.
    How can Iapply these concepts in my code?
  • 19.
    Kafka Java client:
 at-least-once
  • 20.
    Async non-blocking? • Ina Reactive/Scala world, message processing is usually asynchronous (non-blocking IO call to a DB, ask Akka actor, …):
 
 def processMsg(message: String): Future[Result] • How to process your Kafka messages staying reactive (i.e not blocking threads)?
  • 21.
    Kafka Java client:
 async non-blocking?
  • 22.
    Kafka Java client:
 async non-blocking? • Out-of-order processing! • No guarantee anymore! (offset N can be committed before N-1, “shadowing” N-1) • Unbounded amount of messages in-memory. If Kafka message rate > processing speed, can lead to Out Of Memory
  • 23.
    What do weneed? Ordered asynchronous stream processing with back pressure
  • 24.
    What do weneed? Ordered asynchronous stream processing with back pressure ENTER REACTIVE STREAMS
  • 25.
    Reactive Streams • “ReactiveStreams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.” • Backed by Netflix, Pivotal, Red Hat, Twitter, Lightbend (Typesafe), … • Implementations: RxJava, Akka Stream, Reactor, …
  • 26.
    Akka Stream • Streamprocessing abstraction on top of Akka Actors • Types! Types are back! • Source[A] ~> Flow[A, B] ~> Sink[B] • Automatic back pressure
  • 27.
    Reactive Kafka • AkkaStream client for Kafka • On top of Kafka Java client 0.9+ • https://github.com/akka/reactive-kafka
  • 28.
  • 29.
    Reactive Kafka • At-least-oncesemantic in case of node failure / redeployment • Asynchronous processing without blocking any thread • Back pressure • Ordered processing • But what if the processMsg function fails?
  • 30.
    The difference between Errorand Failure • Error: something went wrong, and this is deterministic (it will happen again if you do the same call)
 ex: HTTP 4xx, Deserialisation exception, Duplicate key DB error • Failure: something went wrong, and this is not deterministic (it may not happen again if you do the same call):
 ex: HTTP 5xx, network exception
  • 31.
    Error and Failurein Scala code using Scalactic Future[Result Or Every[Error]] can contain one or more Errorscan contain a Failure
  • 32.
    Error and Failurein Scala code (non-async) Try[Result Or Every[Error]] can contain one or more Errorscan contain a Failure
  • 33.
  • 34.
    Keeping message ordering evenin failure cases • Retrying message processing upon failures will block the processing of subsequent messages, but that’s ok if message processing is homogenous • ex: if processMsg of msg N results in a network failure calling a DB (say ELS), there is a high probability that processMsg of msg N+1 will encounter the same failure, so blocking is ok and even better to avoid losing messages due to transient failures • If message processing is heterogenous (calling different external systems according to the msg), it is better to implement different consumer-groups and/or have different topics
  • 35.
    Consumer poll messages Reminder: Kafkaguarantees ordering only per partition If #(consumer instances) < #(Kafka partitions), 
 at least one consumer instance will process two or more partitions
  • 36.
    Parallel processing betweenpartitions while keeping ordering per partition
  • 37.
    Bonus: auto-adaptive micro-batching windowsper partition based on back pressure signal Dynamic trade-off between latency and throughput!
  • 38.
    Conclusion • Apache Kafkaas a system is scalable and fault- tolerant but fault-tolerant consumption can be tricky • But with the right concepts and the right tools, we can make Kafka consumption fault-tolerant very easily (i.e with a few lines of extra code)
  • 39.