The Power of Snapshots Stateful Stream Processing with Apache Flink Stephan Ewen QCon San Francisco, 2017 1
InfoQ.com: News & Community Site • Over 1,000,000 software developers, architects and CTOs read the site world- wide every month • 250,000 senior developers subscribe to our weekly newsletter • Published in 4 languages (English, Chinese, Japanese and Brazilian Portuguese) • Post content from our QCon conferences • 2 dedicated podcast channels: The InfoQ Podcast, with a focus on Architecture and The Engineering Culture Podcast, with a focus on building • 96 deep dives on innovative topics packed as downloadable emags and minibooks • Over 40 new content items per week Watch the video with slide synchronization on InfoQ.com! https://www.infoq.com/presentations/ distributed-stream-processing-flink
Purpose of QCon - to empower software development by facilitating the spread of knowledge and innovation Strategy - practitioner-driven conference designed for YOU: influencers of change and innovation in your teams - speakers and topics driving the evolution and innovation - connecting and catalyzing the influencers and innovators Highlights - attended by more than 12,000 delegates since 2007 - held in 9 cities worldwide Presented at QCon San Francisco www.qconsf.com
2 Original creators of Apache Flink® dA Platform 2 Open Source Apache Flink + dA Application Manager
3 Stream Processing
What changes faster? Data or Query? 4 Data changes slowly compared to fast changing queries ad-hoc queries, data exploration, ML training and (hyper) parameter tuning Batch Processing Use Case Data changes fast application logic is long-lived continuous applications, data pipelines, standing queries, anomaly detection, ML evaluation, … Stream Processing Use Case
Batch Processing 5
Stream Processing 6
7 Stateful Stream Processing
Moving State into the Processors 8 Application External DBstate Stateless Stream Processor Stateful Stream Processor Application state
9 Apache Flink
Apache Flink in a Nutshell 10 Queries Applications Devices etc. Database Stream File / Object Storage Stateful computations over streams real-time and historic fast, scalable, fault tolerant, in-memory, event time, large state, exactly-once Historic Data Streams Application
11 Event Streams State (Event) Time Snapshots The Core Building Blocks real-time and hindsight complex business logic consistency with out-of-order data and late data forking / versioning / time-travel
Stateful Event & Stream Processing 12 Source Transformation Transformation Sink val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer09(…)) val events: DataStream[Event] = lines.map((line) => parse(line)) val stats: DataStream[Statistic] = stream .keyBy("sensor") .timeWindow(Time.seconds(5)) .sum(new MyAggregationFunction()) stats.addSink(new RollingSink(path)) Streaming Dataflow Source Transform Window (state read/write) Sink
Stateful Event & Stream Processing 13 Scalable embedded state Access at memory speed & scales with parallel operators
Event time and Processing Time 14 Event Producer Message Queue Flink Data Source Flink Window Operator partition 1 partition 2 Event Time Ingestion Time Processing Time Broker Time Event time, Watermarks, as in the Dataflow model
Powerful Abstractions 15 Process Function (events, state, time) DataStream API (streams, windows) Stream SQL / Tables (dynamic tables) Stream- & Batch Data Processing High-level Analytics API Stateful Event- Driven Applications val stats = stream .keyBy("sensor") .timeWindow(Time.seconds(5)) .sum((a, b) -> a.add(b)) def processElement(event: MyEvent, ctx: Context, out: Collector[Result]) = { // work with event and state (event, state.value) match { … } out.collect(…) // emit events state.update(…) // modify state // schedule a timer callback ctx.timerService.registerEventTimeTimer(event.timestamp + 500) } Layered abstractions to navigate simple to complex use cases
16 Distributed Snapshots
Event Sourcing + Memory Image 17 event log persists events (temporarily) event / command Process main memory update local variables/structures periodically snapshot the memory
Event Sourcing + Memory Image 18 Recovery: Restore snapshot and replay events since snapshot event log persists events (temporarily) Process
Consistent Distributed Snapshots 19 Scalable embedded state Access at memory speed & scales with parallel operators
Checkpoint Barriers 20
Consistent Distributed Snapshots 21 Trigger checkpoint Inject checkpoint barrier
Consistent Distributed Snapshots 22 Take state snapshot Trigger state copy-on-write
Consistent Distributed Snapshots 23 Persist state snapshots Persist snapshots asynchronously Processing pipeline continues
Consistent Distributed Snapshots 25 Re-load state Reset positions in input streams Rolling back computation Re-processing
Consistent Distributed Snapshots 26 Restore to different programs
27 Checkpoints and Savepoints in Apache Flink
Speed or Operability? 28 Fast snapshots Checkpoint Flexible Operations on Snapshots Savepoint What to optimize for?
Savepoints: Opt. for Operability  Self contained: No references to other checkpoints  Canonical format: Switch between state structures  Efficiently re-scalable: Indexed by key group  Future: More self-describing serialization format for to archiving / versioning (like Avro, Thrift, etc.) 29
Checkpoints: Opt. for Efficiency  Often incremental: • Snapshot only diff from last snapshot • Reference older snapshots, compaction over time  Format specific to state backend: • No extra copied or re-encoding • Not possible to switch to another state backend between checkpoints  Compact serialization: Optimized for speed/space, not long term archival and evolution  Key goups not indexed: Re-distribution may be more expensive 30
31 What else are snapshots / checkpoints good for?
What users built on checkpoints  Upgrades and Rollbacks  Cross Datacenter Failover  State Archiving  State Bootstrapping  Application Migration  Spot Instance Region Arbitrage  A/B testing  … 32
33 Distributed Snapshots and side effects
Transaction coordination for side fx 34 One snapshot can transactionally move data between different systems Snapshots may include side effects
Transaction coordination for side fx  Similar to a distributed 2-phase commit  Coordinated by asynchronous checkpoints, no voting delays  Basic algorithm: • Between checkpoints: Produce into transaction or Write Ahead Log • On operator snapshot: Flush local transaction (vote-to-commit) • On checkpoint complete: Commit transactions (commit) • On recovery: check and commit any pending transactions 35
36 Distributed Snapshots and Application Architectures (A Philosophical Monologue)
Good old centralized architecture 37 The big mean central database $$$ The grumpy DBA Application Application Application Application
Stateful Stream Proc. & Applications 38 Application Application Application Application Application decentralized infrastructure DevOps decentralized responsibilities still involves managing databases
Stateless Application Containers 39 State management is nasty, let's pretend we don't have to do it
Stateless Application Containers 40 Kudos to Kiki Carter for the Broccoli Metaphor Broccoli (state management) is nasty, let's pretend we don't have to eat do it
Stateful Stream Proc. to the rescue 41 Application Sensor APIs Application Application Application very simple: state is just part of the application
Compute, State, and Storage 42 Classic tiered architecture Streaming architecture database layer compute layer application state + backup compute + stream storage and snapshot storage (backup) application state
Performance 43 synchronous reads/writes across tier boundary asynchronous writes of large blobs all modifications are local Classic tiered architecture Streaming architecture
Consistency 44 distributed transactions at scale typically at-most / at-least once exactly once per state =1 =1 Classic tiered architecture Streaming architecture
Scaling a Service 45 separately provision additional database capacity provision compute and state together Classic tiered architecture Streaming architecture provision compute
Rolling out a new Service 46 provision a new database (or add capacity to an existing one) simply occupies some additional backup space Classic tiered architecture Streaming architecture provision compute and state together
Time, Completeness, Out-of-order 47 ? event time clocks define data completeness event time timers handle actions for out-of-order data Classic tiered architecture Streaming architecture
Stateful Stream Processing 48 Application Sensor APIs Application Application Application very simple: state is just part of the application
The Challenges with that:  Upgrades are stateful, need consistency • application evolution and bug fixes  Migration of application state • cluster migration, A/B testing  Re-processing and reinstatement • fix corrupt results, bootstrap new applications  State evolution (schema evolution) 49
50 Consistent Distributed Snapshots The answer (my personal and obviously biased take)
51 Payments Dashboard Demo Time!
52 Thank you very much  (shameless plug)
We are hiring! data-artisans.com/careers
Appendix 54
55 Details about Snapshots and Transactional Side Effects
Exactly-once via Transactions 56 chk-1 chk-2 TXN-1 ✔chk-1 ✔chk-2 TXN-2 ✘ TXN-3 Side effect ✔ global ✔ global
Transaction fails after local snapshot 57 chk-1 chk-2 TXN-1 ✔chk-1 TXN-2 ✘ TXN-3 ✔ global Side effect
Transaction fails before commit… 58 chk-1 chk-2 TXN-1 ✔chk-1 TXN-2 ✘ TXN-3 ✔ global ✔ global Side effect
… commit on recovery 59 chk-2 TXN-2 TXN-3 ✔ global recover TXN handle chk-3 Side effect
Watch the video with slide synchronization on InfoQ.com! https://www.infoq.com/presentations/ distributed-stream-processing-flink

The Power of Distributed Snapshots in Apache Flink

  • 1.
    The Power ofSnapshots Stateful Stream Processing with Apache Flink Stephan Ewen QCon San Francisco, 2017 1
  • 2.
    InfoQ.com: News &Community Site • Over 1,000,000 software developers, architects and CTOs read the site world- wide every month • 250,000 senior developers subscribe to our weekly newsletter • Published in 4 languages (English, Chinese, Japanese and Brazilian Portuguese) • Post content from our QCon conferences • 2 dedicated podcast channels: The InfoQ Podcast, with a focus on Architecture and The Engineering Culture Podcast, with a focus on building • 96 deep dives on innovative topics packed as downloadable emags and minibooks • Over 40 new content items per week Watch the video with slide synchronization on InfoQ.com! https://www.infoq.com/presentations/ distributed-stream-processing-flink
  • 3.
    Purpose of QCon -to empower software development by facilitating the spread of knowledge and innovation Strategy - practitioner-driven conference designed for YOU: influencers of change and innovation in your teams - speakers and topics driving the evolution and innovation - connecting and catalyzing the influencers and innovators Highlights - attended by more than 12,000 delegates since 2007 - held in 9 cities worldwide Presented at QCon San Francisco www.qconsf.com
  • 4.
    2 Original creators of ApacheFlink® dA Platform 2 Open Source Apache Flink + dA Application Manager
  • 5.
  • 6.
    What changes faster?Data or Query? 4 Data changes slowly compared to fast changing queries ad-hoc queries, data exploration, ML training and (hyper) parameter tuning Batch Processing Use Case Data changes fast application logic is long-lived continuous applications, data pipelines, standing queries, anomaly detection, ML evaluation, … Stream Processing Use Case
  • 7.
  • 8.
  • 9.
  • 10.
    Moving State intothe Processors 8 Application External DBstate Stateless Stream Processor Stateful Stream Processor Application state
  • 11.
  • 12.
    Apache Flink ina Nutshell 10 Queries Applications Devices etc. Database Stream File / Object Storage Stateful computations over streams real-time and historic fast, scalable, fault tolerant, in-memory, event time, large state, exactly-once Historic Data Streams Application
  • 13.
    11 Event Streams State(Event) Time Snapshots The Core Building Blocks real-time and hindsight complex business logic consistency with out-of-order data and late data forking / versioning / time-travel
  • 14.
    Stateful Event &Stream Processing 12 Source Transformation Transformation Sink val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer09(…)) val events: DataStream[Event] = lines.map((line) => parse(line)) val stats: DataStream[Statistic] = stream .keyBy("sensor") .timeWindow(Time.seconds(5)) .sum(new MyAggregationFunction()) stats.addSink(new RollingSink(path)) Streaming Dataflow Source Transform Window (state read/write) Sink
  • 15.
    Stateful Event &Stream Processing 13 Scalable embedded state Access at memory speed & scales with parallel operators
  • 16.
    Event time andProcessing Time 14 Event Producer Message Queue Flink Data Source Flink Window Operator partition 1 partition 2 Event Time Ingestion Time Processing Time Broker Time Event time, Watermarks, as in the Dataflow model
  • 17.
    Powerful Abstractions 15 Process Function(events, state, time) DataStream API (streams, windows) Stream SQL / Tables (dynamic tables) Stream- & Batch Data Processing High-level Analytics API Stateful Event- Driven Applications val stats = stream .keyBy("sensor") .timeWindow(Time.seconds(5)) .sum((a, b) -> a.add(b)) def processElement(event: MyEvent, ctx: Context, out: Collector[Result]) = { // work with event and state (event, state.value) match { … } out.collect(…) // emit events state.update(…) // modify state // schedule a timer callback ctx.timerService.registerEventTimeTimer(event.timestamp + 500) } Layered abstractions to navigate simple to complex use cases
  • 18.
  • 19.
    Event Sourcing +Memory Image 17 event log persists events (temporarily) event / command Process main memory update local variables/structures periodically snapshot the memory
  • 20.
    Event Sourcing +Memory Image 18 Recovery: Restore snapshot and replay events since snapshot event log persists events (temporarily) Process
  • 21.
    Consistent Distributed Snapshots 19 Scalableembedded state Access at memory speed & scales with parallel operators
  • 22.
  • 23.
    Consistent Distributed Snapshots 21 Triggercheckpoint Inject checkpoint barrier
  • 24.
    Consistent Distributed Snapshots 22 Takestate snapshot Trigger state copy-on-write
  • 25.
    Consistent Distributed Snapshots 23 Persiststate snapshots Persist snapshots asynchronously Processing pipeline continues
  • 26.
    Consistent Distributed Snapshots 25 Re-loadstate Reset positions in input streams Rolling back computation Re-processing
  • 27.
  • 28.
  • 29.
    Speed or Operability? 28 Fastsnapshots Checkpoint Flexible Operations on Snapshots Savepoint What to optimize for?
  • 30.
    Savepoints: Opt. forOperability  Self contained: No references to other checkpoints  Canonical format: Switch between state structures  Efficiently re-scalable: Indexed by key group  Future: More self-describing serialization format for to archiving / versioning (like Avro, Thrift, etc.) 29
  • 31.
    Checkpoints: Opt. forEfficiency  Often incremental: • Snapshot only diff from last snapshot • Reference older snapshots, compaction over time  Format specific to state backend: • No extra copied or re-encoding • Not possible to switch to another state backend between checkpoints  Compact serialization: Optimized for speed/space, not long term archival and evolution  Key goups not indexed: Re-distribution may be more expensive 30
  • 32.
    31 What else aresnapshots / checkpoints good for?
  • 33.
    What users builton checkpoints  Upgrades and Rollbacks  Cross Datacenter Failover  State Archiving  State Bootstrapping  Application Migration  Spot Instance Region Arbitrage  A/B testing  … 32
  • 34.
  • 35.
    Transaction coordination forside fx 34 One snapshot can transactionally move data between different systems Snapshots may include side effects
  • 36.
    Transaction coordination forside fx  Similar to a distributed 2-phase commit  Coordinated by asynchronous checkpoints, no voting delays  Basic algorithm: • Between checkpoints: Produce into transaction or Write Ahead Log • On operator snapshot: Flush local transaction (vote-to-commit) • On checkpoint complete: Commit transactions (commit) • On recovery: check and commit any pending transactions 35
  • 37.
    36 Distributed Snapshots and ApplicationArchitectures (A Philosophical Monologue)
  • 38.
    Good old centralizedarchitecture 37 The big mean central database $$$ The grumpy DBA Application Application Application Application
  • 39.
    Stateful Stream Proc.& Applications 38 Application Application Application Application Application decentralized infrastructure DevOps decentralized responsibilities still involves managing databases
  • 40.
    Stateless Application Containers 39 Statemanagement is nasty, let's pretend we don't have to do it
  • 41.
    Stateless Application Containers 40 Kudosto Kiki Carter for the Broccoli Metaphor Broccoli (state management) is nasty, let's pretend we don't have to eat do it
  • 42.
    Stateful Stream Proc.to the rescue 41 Application Sensor APIs Application Application Application very simple: state is just part of the application
  • 43.
    Compute, State, andStorage 42 Classic tiered architecture Streaming architecture database layer compute layer application state + backup compute + stream storage and snapshot storage (backup) application state
  • 44.
    Performance 43 synchronous reads/writes across tierboundary asynchronous writes of large blobs all modifications are local Classic tiered architecture Streaming architecture
  • 45.
    Consistency 44 distributed transactions at scaletypically at-most / at-least once exactly once per state =1 =1 Classic tiered architecture Streaming architecture
  • 46.
    Scaling a Service 45 separatelyprovision additional database capacity provision compute and state together Classic tiered architecture Streaming architecture provision compute
  • 47.
    Rolling out anew Service 46 provision a new database (or add capacity to an existing one) simply occupies some additional backup space Classic tiered architecture Streaming architecture provision compute and state together
  • 48.
    Time, Completeness, Out-of-order 47 ? eventtime clocks define data completeness event time timers handle actions for out-of-order data Classic tiered architecture Streaming architecture
  • 49.
  • 50.
    The Challenges withthat:  Upgrades are stateful, need consistency • application evolution and bug fixes  Migration of application state • cluster migration, A/B testing  Re-processing and reinstatement • fix corrupt results, bootstrap new applications  State evolution (schema evolution) 49
  • 51.
    50 Consistent Distributed Snapshots The answer (mypersonal and obviously biased take)
  • 52.
  • 53.
    52 Thank you verymuch  (shameless plug)
  • 54.
  • 55.
  • 56.
    55 Details about Snapshots andTransactional Side Effects
  • 57.
    Exactly-once via Transactions 56 chk-1chk-2 TXN-1 ✔chk-1 ✔chk-2 TXN-2 ✘ TXN-3 Side effect ✔ global ✔ global
  • 58.
    Transaction fails afterlocal snapshot 57 chk-1 chk-2 TXN-1 ✔chk-1 TXN-2 ✘ TXN-3 ✔ global Side effect
  • 59.
    Transaction fails beforecommit… 58 chk-1 chk-2 TXN-1 ✔chk-1 TXN-2 ✘ TXN-3 ✔ global ✔ global Side effect
  • 60.
    … commit onrecovery 59 chk-2 TXN-2 TXN-3 ✔ global recover TXN handle chk-3 Side effect
  • 61.
    Watch the videowith slide synchronization on InfoQ.com! https://www.infoq.com/presentations/ distributed-stream-processing-flink