@tyler_treat Building a Distributed Message Log from Scratch Tyler Treat · SCALE 16x · 3/11/18
@tyler_treat - Managing Partner @ Real Kinetic - Messaging & distributed systems - Former nats.io core contributor - bravenewgeek.com Tyler Treat
@tyler_treat@tyler_treat
@tyler_treat - The Log
 -> What?
 -> Why? - Implementation
 -> Storage mechanics
 -> Data-replication techniques
 -> Scaling message delivery
 -> Trade-offs and lessons learned Outline
@tyler_treat The Log
@tyler_treat The Log A totally-ordered, append-only data structure.
@tyler_treat The Log 0
@tyler_treat 0 1 The Log
@tyler_treat 0 1 2 The Log
@tyler_treat 0 1 2 3 The Log
@tyler_treat 0 1 2 3 4 The Log
@tyler_treat 0 1 2 3 4 5 The Log
@tyler_treat 0 1 2 3 4 5 newest recordoldest record The Log
@tyler_treat newest recordoldest record The Log
@tyler_treat Logs record what happened and when.
@tyler_treat caches databases indexes writes
@tyler_treat https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
@tyler_treat Examples in the wild: -> Apache Kafka
 -> Amazon Kinesis -> NATS Streaming
 -> Apache Pulsar
@tyler_treat Key Goals: -> Performance -> High Availability -> Scalability
@tyler_treat The purpose of this talk is to learn…
 -> a bit about the internals of a log abstraction. -> how it can achieve these goals. -> some applied distributed systems theory.
@tyler_treat You will probably never need to build something like this yourself, but it helps to know how it works.
@tyler_treat Implemen- tation
@tyler_treat Implemen- tation Don’t try this at home.
@tyler_treat Storage
 Mechanics
@tyler_treat Some first principles… • The log is an ordered, immutable sequence of messages • Messages are atomic (meaning they can’t be broken up) • The log has a notion of message retention based on some policies (time, number of messages, bytes, etc.) • The log can be played back from any arbitrary position • The log is stored on disk • Sequential disk access is fast* • OS page cache means sequential access often avoids disk
@tyler_treat http://queue.acm.org/detail.cfm?id=1563874
@tyler_treat avg-cpu: %user %nice %system %iowait %steal %idle 13.53 0.00 11.28 0.00 0.00 75.19 Device: tps Blk_read/s Blk_wrtn/s Blk_read Blk_wrtn xvda 0.00 0.00 0.00 0 0 iostat
@tyler_treat Storage Mechanics log file 0
@tyler_treat Storage Mechanics log file 0 1
@tyler_treat Storage Mechanics log file 0 1 2
@tyler_treat Storage Mechanics log file 0 1 2 3
@tyler_treat Storage Mechanics log file 0 1 2 3 4
@tyler_treat Storage Mechanics log file 0 1 2 3 4 5
@tyler_treat Storage Mechanics log file … 0 1 2 3 4 5
@tyler_treat Storage Mechanics log segment 3 filelog segment 0 file 0 1 2 3 4 5
@tyler_treat Storage Mechanics log segment 3 filelog segment 0 file 0 1 2 3 4 5 0 1 2 0 1 2 index segment 0 file index segment 3 file
@tyler_treat Zero-Copy Reads user space kernel space page cache disk socket NIC application read send
@tyler_treat Zero-Copy Reads user space kernel space page cache disk NIC sendfile
@tyler_treat Left as an exercise for the listener…
 -> Batching
 -> Compression
@tyler_treat Data-Replication
 Techniques
@tyler_treat caches databases indexes writes
@tyler_treat caches databases indexes writes
@tyler_treat caches databases indexes writes
@tyler_treat How do we achieve high availability and fault tolerance?
@tyler_treat Questions:
 -> How do we ensure continuity of reads/writes? -> How do we replicate data? -> How do we ensure replicas are consistent? -> How do we keep things fast? -> How do we ensure data is durable?
@tyler_treat Questions:
 -> How do we ensure continuity of reads/writes? -> How do we replicate data? -> How do we ensure replicas are consistent? -> How do we keep things fast? -> How do we ensure data is durable?
@tyler_treat caches databases indexes writes
@tyler_treat Questions:
 -> How do we ensure continuity of reads/writes? -> How do we replicate data? -> How do we ensure replicas are consistent? -> How do we keep things fast? -> How do we ensure data is durable?
@tyler_treat Data-Replication Techniques 1. Gossip/multicast protocols Epidemic broadcast trees, bimodal multicast, SWIM, HyParView
 2. Consensus protocols 2PC/3PC, Paxos, Raft, Zab, chain replication
@tyler_treat Questions:
 -> How do we ensure continuity of reads/writes? -> How do we replicate data? -> How do we ensure replicas are consistent? -> How do we keep things fast? -> How do we ensure data is durable?
@tyler_treat Data-Replication Techniques 1. Gossip/multicast protocols Epidemic broadcast trees, bimodal multicast, SWIM, HyParView, NeEM
 2. Consensus protocols 2PC/3PC, Paxos, Raft, Zab, chain replication
@tyler_treat Replication in Kafka 1. Select a leader 2. Maintain in-sync replica set (ISR) (initially every replica) 3. Leader writes messages to write-ahead log (WAL) 4. Leader commits messages when all replicas in ISR ack 5. Leader maintains high-water mark (HW) of last committed message 6. Piggyback HW on replica fetch responses which replicas periodically checkpoint to disk
@tyler_treat 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes Replication in Kafka
@tyler_treat Failure Modes 1. Leader fails
@tyler_treat 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes Leader fails
@tyler_treat 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes Leader fails
@tyler_treat 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes Leader fails
@tyler_treat 0 1 2 3 HW: 3 0 1 2 3 HW: 3 b2 (leader) b3 (follower)ISR: {b2, b3} writes Leader fails
@tyler_treat Failure Modes 1. Leader fails
 2. Follower fails
@tyler_treat 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes Follower fails
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes replica.lag.time.max.ms
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 3 0 1 2 3 HW: 3 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2} writes replica.lag.time.max.ms
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 5 0 1 2 3 HW: 5 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2} writes 5
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 5 0 1 2 3 HW: 5 HW: 3 b2 (follower) b3 (follower)ISR: {b1, b2} writes 5
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 5 0 1 2 3 HW: 5 HW: 4 b2 (follower) b3 (follower)ISR: {b1, b2} writes 5 4
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 5 0 1 2 3 HW: 5 HW: 5 b2 (follower) b3 (follower)ISR: {b1, b2} writes 5 4 5
@tyler_treat Follower fails 0 1 2 3 4 5 b1 (leader) 0 1 2 3 4HW: 5 0 1 2 3 HW: 5 HW: 5 b2 (follower) b3 (follower)ISR: {b1, b2, b3} writes 5 4 5
@tyler_treat Replication in NATS Streaming 1. Raft replicates client state, messages, and subscriptions
 2. Conceptually, two logs: Raft log and message log
 3. Parallels work implementing Raft in RabbitMQ
@tyler_treat http://thesecretlivesofdata.com/raft
@tyler_treat Replication in NATS Streaming • Initially used Raft group per topic and separate metadata group 
 • A couple issues with this:
 -> Topic scalability
 -> Increased complexity due to lack of ordering between Raft groups
@tyler_treat Challenges 1. Scaling topics
@tyler_treat Scaling Raft With a single topic, one node is elected leader and it heartbeats messages to followers
@tyler_treat Scaling Raft As the number of topics increases, so does the number of Raft groups.
@tyler_treat Scaling Raft Technique 1: run a fixed number of Raft groups and use a consistent hash to map a topic to a group.
@tyler_treat Scaling Raft Technique 2: run an entire node’s worth of topics as a single group using a layer on top of Raft. https://www.cockroachlabs.com/blog/scaling-raft
@tyler_treat Scaling Raft Technique 3: use a single Raft group for all topics and metadata.
@tyler_treat Challenges 1. Scaling topics 2. Dual writes
@tyler_treat Dual Writes Raft Store committed
@tyler_treat Dual Writes msg 1Raft Store committed
@tyler_treat Dual Writes msg 1 msg 2Raft Store committed
@tyler_treat Dual Writes msg 1 msg 2Raft msg 1 msg 2Store committed
@tyler_treat Dual Writes msg 1 msg 2 subRaft msg 1 msg 2Store committed
@tyler_treat Dual Writes msg 1 msg 2 sub msg 3Raft msg 1 msg 2Store committed
@tyler_treat Dual Writes msg 1 msg 2 sub msg 3 add peer msg 4Raft msg 1 msg 2 msg 3Store committed
@tyler_treat Dual Writes msg 1 msg 2 sub msg 3 add peer msg 4Raft msg 1 msg 2 msg 3Store committed
@tyler_treat Dual Writes msg 1 msg 2 sub msg 3 add peer msg 4Raft msg 1 msg 2 msg 3 msg 4Store commit
@tyler_treat Dual Writes msg 1 msg 2 sub msg 3 add peer msg 4Raft msg 1 msg 2 msg 3 msg 4Store 0 1 2 3 4 5 0 1 2 3 physical offset logical offset
@tyler_treat Dual Writes msg 1 msg 2 sub msg 3 add peer msg 4Raft msg 1 msg 2Index 0 1 2 3 4 5 0 1 2 3 physical offset logical offset msg 3 msg 4
@tyler_treat Treat the Raft log as our message write-ahead log.
@tyler_treat Questions:
 -> How do we ensure continuity of reads/writes? -> How do we replicate data? -> How do we ensure replicas are consistent? -> How do we keep things fast? -> How do we ensure data is durable?
@tyler_treat Performance 1. Publisher acks 
 -> broker acks on commit (slow but safe)
 -> broker acks on local log append (fast but unsafe)
 -> publisher doesn’t wait for ack (fast but unsafe) 
 2. Don’t fsync, rely on replication for durability
 3. Keep disk access sequential and maximize zero-copy reads
 4. Batch aggressively
@tyler_treat Questions:
 -> How do we ensure continuity of reads/writes? -> How do we replicate data? -> How do we ensure replicas are consistent? -> How do we keep things fast? -> How do we ensure data is durable?
@tyler_treat Durability 1. Quorum guarantees durability
 -> Comes for free with Raft
 -> In Kafka, need to configure min.insync.replicas and acks, e.g.
 topic with replication factor 3, min.insync.replicas=2, and
 acks=all
 2. Disable unclean leader elections
 3. At odds with availability,
 i.e. no quorum == no reads/writes
@tyler_treat Scaling Message
 Delivery
@tyler_treat Scaling Message Delivery 1. Partitioning
@tyler_treat Partitioning is how we scale linearly.
@tyler_treat caches databases indexes writes
@tyler_treat HELLA WRITES caches databases indexes
@tyler_treat caches databases indexes HELLA WRITES
@tyler_treat caches databases indexes writes writes writes writes Topic: purchases Topic: inventory
@tyler_treat caches databases indexes writes writes writes writes Topic: purchases Topic: inventory Accounts A-M Accounts N-Z SKUs A-M SKUs N-Z
@tyler_treat Scaling Message Delivery 1. Partitioning 2. High fan-out
@tyler_treat Kinesis Fan-Out consumers shard-1 consumers shard-2 consumers shard-3 writes
@tyler_treat Replication in Kafka and NATS Streaming is purely a means of HA.
@tyler_treat High Fan-Out 1. Observation: with an immutable log, there are no stale/phantom reads
 2. This should make it “easy” (in theory) to scale to a large number of consumers
 3. With Raft, we can use “non-voters” to act as read replicas and load balance consumers
@tyler_treat Scaling Message Delivery 1. Partitioning 2. High fan-out 3. Push vs. pull
@tyler_treat Push vs. Pull • In Kafka, consumers pull data from brokers • In NATS Streaming, brokers push data to consumers • Design implications: • Fan-out • Flow control • Optimizing for latency vs. throughput • Client complexity
@tyler_treat Trade-Offs and
 Lessons Learned
@tyler_treat Trade-Offs and Lessons Learned 1. Competing goals
@tyler_treat Competing Goals 1. Performance
 -> Easy to make something fast that’s not fault-tolerant or scalable
 -> Simplicity of mechanism makes this easier
 -> Simplicity of “UX” makes this harder 2. Scalability and fault-tolerance
 -> At odds with simplicity
 -> Cannot be an afterthought 3. Simplicity
 -> Simplicity of mechanism shifts complexity elsewhere (e.g. client)
 -> Easy to let server handle complexity; hard when that needs to be
 distributed, consistent, and fast
@tyler_treat Trade-Offs and Lessons Learned 1. Competing goals 2. Aim for simplicity
@tyler_treat Distributed systems are complex enough.
 Simple is usually better (and faster).
@tyler_treat “A complex system that works is invariably found to have evolved from a simple system that works.”
@tyler_treat Trade-Offs and Lessons Learned 1. Competing goals 2. Aim for simplicity 3. You can’t effectively bolt on fault-tolerance
@tyler_treat “A complex system designed from scratch never works and cannot be patched up to make it work. You have to start over, beginning with a working simple system.”
@tyler_treat Trade-Offs and Lessons Learned 1. Competing goals 2. Aim for simplicity 3. You can’t effectively bolt on fault-tolerance 4. Lean on existing work
@tyler_treat Don’t roll your own coordination protocol,
 use Raft, ZooKeeper, etc.
@tyler_treat Trade-Offs and Lessons Learned 1. Competing goals 2. Aim for simplicity 3. You can’t effectively bolt on fault-tolerance 4. Lean on existing work 5. There are probably edge cases for which you haven’t written tests
@tyler_treat There are many failure modes, and you can only write so many tests.
 
 Formal methods and property-based/ generative testing can help.
@tyler_treat@tyler_treat
@tyler_treat Trade-Offs and Lessons Learned 1. Competing goals 2. Aim for simplicity 3. You can’t effectively bolt on fault-tolerance 4. Lean on existing work 5. There are probably edge cases for which you haven’t written tests 6. Be honest with your users
@tyler_treat Don’t try to be everything to everyone.
 Be explicit about design decisions, trade- offs, guarantees, defaults, etc.
@tyler_treat https://bravenewgeek.com/tag/building-a-distributed-log-from-scratch/
@tyler_treat Thanks! bravenewgeek.com realkinetic.com

Building a Distributed Message Log from Scratch - SCaLE 16x