Ufuk Celebi uce@apache.org Flink Forward October 13, 2015 Stream & Batch Processing in One System Apache Flink’s Streaming Data Flow Engine
System Architecture Deployment
 Local (Single JVM) · Cluster (Standalone, YARN) DataStream API Unbounded Data DataSet API Bounded Data Runtime Distributed Streaming Data Flow Libraries Machine Learning · Graph Processing · SQL-like API 1
Today
 Journey from APIs to Parallel Execution A look behind the scenes. You don’t have to worry about this.
Components JobManager Master Client TaskManager Worker TaskManager Worker TaskManager Worker TaskManager Worker User System public class WordCount { public static void main(String[] args) throws Exception { // Flink’s entry point StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); DataStream<String> data = env.fromElements( "O Romeo, Romeo! wherefore art thou Romeo?", "Deny thy father and refuse thy name", "Or, if thou wilt not, be but sworn my love,", "And I'll no longer be a Capulet."); // Split by whitespace to (word, 1) and sum up ones DataStream<Tuple2<String, Integer>> counts = data .flatMap(new SplitByWhitespace()) .keyBy(0) .timeWindow(Time.of(10, TimeUnit.SECONDS)) .sum(1); counts.print(); // Today: What happens now? env.execute(); } } Submit Program Schedule Execute 2
Client Translates the API code to 
 a data flow graph called JobGraph and submits it to the JobManager. Source Transform Sink public class WordCount { public static void main(String[] args) throws Exception { // Flink’s entry point StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); DataStream<String> data = env.fromElements( "O Romeo, Romeo! wherefore art thou Romeo?", "Deny thy father and refuse thy name", "Or, if thou wilt not, be but sworn my love,", "And I'll no longer be a Capulet."); // Split by whitespace to (word, 1) and sum up ones DataStream<Tuple2<String, Integer>> counts = data .flatMap(new SplitByWhitespace()) .keyBy(0) .timeWindow(Time.of(10, TimeUnit.SECONDS)) .sum(1); counts.print(); // Today: What happens now? env.execute(); } } Translate 3
JobGraph JobVertex Intermediate
 Result JobVertex Intermediate
 Result JobVertex Intermediate
 Result Produce Consume Computation Data 4
The JobGraph Vertices and results are combined to a directed acyclic graph (DAG) representing the user program. 5 Source Source Sink SinkJoin Map
JobGraph Translation • Translation includes optimizations like chaining: f g f · g • DataSet API translation with cost-based optimization 6
JobGraph JobVertex Parameters • Parallelism • Code to run • Consumed result(s) • Connection pattern JobGraph is common abstraction for both DataStream and DataSet API. Result Parameters • Producer • Type Runtime is agnostic to the respective API. It’s only a question of JobGraph parameterization. 7
TaskManagerTaskManager Coordination • Coordination between components via Akka Actors • Actors exchange asynchronous messages • Each actor has own isolated state JobManager Master Client Actor SystemActor System 8 TaskManager
JobManager • All coordination via JobManager (master): • Scheduling programs for execution • Checkpoint coordination • Monitoring workers Actor System Scheduling Checkpoint Coordination 9
ExecutionGraph • Receive JobGraph and span out to ExecutionGraph EV1 EV3 EV2 EV4 RP1 RP2 RP3 RP4 EV1 EV2 Point to Point JobVertex Result ExecutionVertex (EV) ResultPartition (RP) JobVertex 10
ExecutionGraph • Receive JobGraph and span out to ExecutionGraph EV1 EV3 EV2 EV4 RP1 RP2 RP3 RP4 EV1 EV2 All to All JobVertex Result ExecutionVertex (EV) ResultPartition (RP) JobVertex 10
TaskManager Actor System Task SlotTask SlotTask SlotTask Slot • All data processing in TaskManager (worker): • Communicate with JobManager via Actor messages • Exchange data between themselves via dedicated data connections • Expose task slots for execution I/O Manager Memory Manager 11
Scheduling TaskManager 1 TaskManager 2 • Each ExecutionVertex will be executed one or more times • The JobManager maps Execution to task slots • Pipelined execution in same slot where applicable p=4 p=4 p=3 All to allPointwise 12
Scheduling • Scheduling happens from the sources • Later tasks are scheduled during runtime • Depending on the result type JobManager Master Actor System TaskManager Worker Actor System Submit Task State Updates 13
Execution • The ExecutionGraph tracks the state of each parallel Execution • Asynchronous messages from the 
 TaskManager and Client Failed FinishedCancellingCancelled Created Scheduled RunningDeploying 14
Task Execution • TaskManager receives Task per Execution • Task descriptor is limited to: • Location of consumed results • Produced results • Operator & user code User 
 Code Operator Task 15 ? ?
Task Execution DataStream<Tuple2<String, Integer>> counts = data.flatMap(new SplitByWhitespace()); User 
 Code StreamTask with
 StreamFlatMap operator Task with one consumed and one produced result for (…) {
 out.collect(new Tuple2<>(w, 1));
 } 17
Data Connections • Input Gates request input from local and remote channels on first read Task Result ResultManager TaskManager ResultManager TaskManager NetworkManagerNetworkManager Input Gate 2. Request 3. Send via TCP 1.Initiate TCP connection 18
Result Characteristics vs. vs. Ephemeral Checkpointed Pipelined Blocking How and when to do data exchange? How long to keep results around? 20
Map Pipelined Result 1101 0101 0100 Pipelined Results 21
Map Pipelined Result11010101 0100 Pipelined Results 21
Map Pipelined Result 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 01010100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Pipelined Result Reduce 1101 0101 0100 Pipelined Results 21
Map Blocking Result 1101 0101 0100 Blocking Results 22
Map Blocking Result11010101 0100 Blocking Results 22
Map Blocking Result 1101 0101 0100 Blocking Results 22
Map Blocking Result 1101 0101 0100 Blocking Results 22
Map Blocking Result 1101 0101 0100 Blocking Results 22
Map Blocking Result 1101 01010100 Blocking Results 22
Map Blocking Result 1101 0101 0100 Blocking Results 22
Map Blocking Result Reduce 1101 0101 0100 Blocking Results 22
Map Blocking Result Reduce 1101 0101 0100 Blocking Results 22
Map Blocking Result Reduce 1101 0101 0100 Blocking Results 22
Map Blocking Result Reduce 1101 0101 0100 Blocking Results 22
Map Blocking Result Reduce 1101 0101 0100 Blocking Results 22
Recap Client JobManager TaskManager Communication Actor-only (coordination) Actor-only (coordination) Actor & Data Streams Central 
 Abstraction JobGraph ExecutionGraph Task State Tracking – Complete
 program Single Task 23
Stream & Batch Processing • Stream and Batch programs are different parameterizations of the JobGraph • Everything goes down to the same runtime • Streaming first, batch as special case • Cost-based optimizer on translation • Blocking results for less resource fragmentation • But still profit from streaming • DataSet and DataStream API are essentially all user code to the runtime 24
Stream & Batch Processing DataStream DataSet JobGraph Chaining Chaining and cost- based optimisation Intermediate
 Results Pipelined Pipelined and Blocking Operators Stream operators Batch operators User function Common interface for
 map, reduce, … 25
Thank You!

Apache Flink Internals: Stream & Batch Processing in One System – Apache Flink's Streaming Data Flow Engine