Building a Replicated Logging System with Apache Kafka Guozhang Wang, Joel Koshy, Sriram Subramanian, Kartik Paramasivam Mammad Zadeh, Neha Narkhede, Jun Rao, Jay Kreps, Joe Stein
We All Love Logs!
Apache Kafka • A distributed messaging system ..that store messages as a log!
Example: LinkedIn back in 2010 Point-to-Point Pipelines What We Want: A Centralized Data Pipeline
Log-centric Data Flow • Logical Ordering • Persistent Buffering • “Source-of-Truth”
Store Messages as a Log 4 5 5 7 8 9 10 11 12... Producer Write Consumer1 Reads (offset 7) Consumer2 Reads (offset 10) Messages 3
Partition the Log across Machines Topic 1 Topic 2 Partitions Producers Producers Consumers Consumers Brokers
Apache Kafka Example: Kafka at LinkedIn
“Source-of-Truth” should not be lost even when..
Replicas and Layout Logs Broker-1 topic1-part1 topic1-part3 topic1-part2 Logs topic1-part2 topic1-part1 topic1-part3 Logs topic1-part3 topic1-part2 topic1-part1 Broker-2 Broker-3
Consensus for Log Replication Logs Broker-1 Logs Logs Broker-2 Broker-3 Write Consensus Protocol Consensus Protocol
Key Idea Separate membership configuration from data replication
Primary-backup Replication Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write
Conventional Quorum Commits Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write
Conventional Quorum Commits Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write
Conventional Quorum Commits Logs Broker-1 * Logs Logs Broker-2 Broker-3
Conventional Quorum Commits Logs Broker-1 * Logs Logs Broker-2 Broker-3
• Leader maintains in-sync-replicas (ISR) • Failed / slow follower => drop from ISR • Caught-up follower => re-join ISR • Producer specifies required ACK based on ISR Configurable ISR Commits
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with all ISRs Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: ACK with Leader-only Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“leader”) ISR {1, 2, 3}
Example: ACK with Leader-only Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“leader”) ISR {1, 2, 3}
Example: ACK with Leader-only Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“leader”) ISR {1, 2, 3}
Example: ACK with Leader-only Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“leader”) ISR {1, 2, 3}
Example: ACK with Leader-only Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“leader”) ISR {1, 2, 3}
Example: ACK with Leader-only Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“leader”) ISR {1, 2, 3}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2, 3}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2}
Example: Slow Follower Logs Broker-1 * Logs Logs Broker-2 Broker-3 Write (ack=“all”) ISR {1, 2}
Configurable ISR Commits ACK mode Latency On Failures “no" no network delay some data loss “leader" 1 network roundtrip a few data loss “all" ~2 network roundtrips no data loss
• Use an embedded controller • Detect broker failure via ZooKeeper • Leader failure => elect new leader from ISR • Leader and ISR persisted in Zookeeper • For Controller fail-over Membership Management
Example: Broker Failure Logs Broker-1 * Logs Logs Broker-2 Broker-3 ISR {1, 2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 Broker-3
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 Broker-3
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 Broker-3
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 Broker-3 ISR {2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 Broker-3 ISR {2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 Broker-3 ISR {2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 * Broker-3 ISR {2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 * Broker-3 ISR {2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 * Broker-3 ISR {2}
Example: Broker Failure Logs Broker-1 Logs Logs Broker-2 * Broker-3 ISR {2, 3}
• Overview: Logs and Kafka • Log Replication in Kafka • Kafka Usage at LinkedIn • Conclusion Agenda
Change Log Replication
Apache Kafka Example: Kafka at LinkedIn
Example: Espresso • A distributed document store • Primary online data serving platform at LI • Member profile, homepage, InMail, etc [SIGMOD 2013]
Old Espresso Replication Data Center-1 Storage Node Storage NodeMySQL Replication MySQL MySQL Search Index Hadoop … …Databus Cross-DC Replicator Data Center-1 Storage Node Storage NodeMySQL Replication MySQL MySQL Search Index Hadoop … DatabusCross-DC Replicator
Problems with MySQL Replication Master Storage Node P1 Slave Storage Node P2 P3 P4 P5 P6 P1 P2 P3 P4 P5 P6 Binary Log Shipping
Replicate Logs with Kafka Storage Node Kafka Logs P1 Storage Node P2 P3 P4 P5 P6 P1 P2 P3 P4 P5 P6 Kafka Producer Kafka Consumer Kafka Consumer Kafka Producer
Key-based Log Compaction ... Partition Messages Segment-3 Segment-4 Segment-6 *
Key-based Log Compaction d: 3 f: 8 b: 0 c: null... Partition Messages c: 3 a: 5 a: 6 a: 5 f: 9 ... Segment-3 Segment-4 b: 2 d: 4a: 1
Key-based Log Compaction ... d: 3 f: 8 b: 0 c: null a: 5 f: 9 ... Segment-3 Segment-4 c: 3 a: 5 a: 6b: 2 d: 4a: 1 c: 3 a: 5 a: 6b: 2 d: 4a: 1 d: 3 f: 8 b: 0 a: 5 f: 9 New Segment Partition Messages
Key-based Log Compaction ... d: 3 f: 8 b: 0 c: null a: 5 f: 9 ... Segment-3 Segment-4 c: 3 a: 5 a: 6b: 2 d: 4a: 1 c: 3 a: 6 d: 3 f: 8 b: 0 c: null a: 5 f: 9 New Segment Partition Messages
Key-based Log Compaction ... d: 3 f: 8 b: 0 c: null a: 5 f: 9 ... Segment-3 Segment-4 c: 3 a: 5 a: 6b: 2 d: 4a: 1 d: 3 b: 0 a: 5 f: 9 New Segment Partition Messages
Key-based Log Compaction ... d: 3 f: 8 b: 0 c: null a: 5 f: 9 ... Segment-3 Segment-4 c: 3 a: 5 a: 6b: 2 d: 4a: 1 d: 3 b: 0 a: 5 f: 9 New Segment Partition Messages
New Espresso Replication Data Center-1 Storage Node Storage Node Storage Node Kafka Logs MySQL MySQL MySQL Data Center-n Storage Node Storage Node Storage Node Kafka Logs MySQL MySQL MySQL Kafka MirrorMaker Search Index Hadoop … … Search Index Hadoop … * In Progress
Stream Processing
Apache Kafka Example: Kafka at LinkedIn
• Data flow streaming on Kafka and YARN • Stateful processing • Re-processing • Failure Recovery Example: Samza [CIDR 2015]
Kafka Kafka Samza StateProces s Protoc ol StateProces s Protoc ol StateProces s Protoc ol Samza Processing
Kafka Kafka Samza StateProces s Protoc ol StateProces s Protoc ol StateProces s Protoc ol Samza Processing Kafka Changelog
Kafka Kafka Samza StateProces s Protoc ol StateProces s Protoc ol StateProces s Protoc ol Samza Processing Kafka Changlog
Kafka Kafka Samza StateProces s Protoc ol StateProces s Protoc ol StateProces s Protoc ol Samza Processing Kafka Changlog StateProces s Protoc ol
Take-aways • Log-centric data flow helps scaling your systems • Kafka: replicated log streams for real-time platforms
We are Hiring
Take-aways • Log-centric data flow helps scaling your systems • Kafka: replicated log streams for real-time platforms THANKS!

Building a Replicated Logging System with Apache Kafka

Editor's Notes

  • #2 Thank you. And good morning, today I am going to talk about Kafka, and how it can be built as a general replicated log streams for a wide use of scalable systems. This is a joint work from the Apache Kafka community.
  • #3 First of all, being in this room, I think it is safe to say “we all love logs”. Logs have been around almost as long as this research community.
  • #4 No-overwrite in POSTGRES ARIES: Write-Ahead-Logging in the 80’s Today, reading the 50 page Aries pager has been the must-to-do for every single database graduate student including myself.
  • #5 Similarly, Log-Structured storage architecture.
  • #6 Replicated State Machine And in all these examples, the log is used as the source of truth data change log to scale the systems while providing durability and consistency.
  • #7 So that is all good stuff about logs, but where is Kafka is this big picture. Well, Kafka is an Apache open sourced distributed messaging system that stores messages as a commit log.
  • #8 Data-serving websites, LinkedIn has a lot of data We have this variety of data and and we need to build all these products around such data. Messaging: ActiveMQ User Activity: In house log aggregation Logging: Splunk Metrics: JMX => Zenoss Database data: Databus, custom ETL
  • #9 This idea of using logs for data flow has been floating around LinkedIn, log-centric fashion. Take all the organization's data and put it into a central log for real-time subscription. Data integration, replication, real-time stream processing.
  • #10 Disks are fast when used sequentially File system caching
  • #11 Topic = message stream Topic has partitions, partitions are distributed to brokers
  • #13 higher availability and durability
  • #14 evenly distributed
  • #15 replicated log => replicated state machine
  • #17 One of the replicas is leader, leader evenly spread All writes go to leader Leader propagates writes to followers in order Leader decides when to commit message
  • #22 The size of the ISR is decoupled from the size of the replica set, hence the number of replicas and acknowledgements are independent.
  • #23 ack=3
  • #29 committed messages to consumer messages are committed is independent of the ack chosen by the producer.
  • #30 ack=1
  • #31 ack=1
  • #36 ack=3, follower slow
  • #42 under replicated partitions
  • #45 ack=3, broker failure
  • #61 load balancing cluster expansion
  • #62 load balancing cluster expansion
  • #69 This is a major initiative and will put Kafka on the critical path for site latency sensitive data paths which also require much higher message delivery guarantees.
  • #71 Data standardization, site monitoring
  • #72 Data flow graph. Flow rate may overwhelm query processor: batch processing, sampling, synopsis, etc In-memory storage constraints: single-pass algorithms, no stream backtracking
  • #74 WAL
  • #76 Streaming on Message Pipes