Batch & Stream Graph Processing with Apache Flink Vasia Kalavri vasia@apache.org @vkalavri
Apache Flink • An open-source, distributed data analysis framework • True streaming at its core • Streaming & Batch API 2 Historic data Kafka, RabbitMQ, ... HDFS, JDBC, ... Event logs ETL, Graphs,
 Machine Learning
 Relational, … Low latency,
 windowing, aggregations, ...
Integration (picture not complete) POSIX Java/Scala
 Collections POSIX
Why Stream Processing? • Most problems have streaming nature • Stream processing gives lower latency • Data volumes more easily tamed 4 Event stream
Batch and Streaming Pipelined and
 blocking operators Streaming Dataflow Runtime Batch Parameters DataSet DataStream Relational
 Optimizer Window
 Optimization Pipelined and
 windowed operators Schedule lazily Schedule eagerly Recompute whole
 operators Periodic checkpoints Streaming data movement Stateful operations DAG recovery Fully buffered streams DAG resource management Streaming Parameters
Flink APIs 6 case class Word (word: String, frequency: Int) val lines: DataStream[String] = env.readFromKafka(...) lines.flatMap {line => line.split(" ").map(word => Word(word,1))} .keyBy("word”).timeWindow(Time.of(5,SECONDS)).sum("frequency") .print() val lines: DataSet[String] = env.readTextFile(...) lines.flatMap {line => line.split(" ").map(word => Word(word,1))} .groupBy("word").sum("frequency”) .print() DataSet API (batch): DataStream API (streaming):
Working with Windows 7 Why windows? We are often interested in fresh data!15 38 65 88 110 120 #sec 40 80 SUM #2 0 SUM #1 20 60 100 120 15 38 65 88 1) Tumbling windows myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS)); #sec 40 80 SUM #3 SUM #2 0 SUM #1 20 60 100 15 38 38 65 65 88 myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS), Time.of(20, TimeUnit.SECONDS)); 2) Sliding windows window buckets/panes
Working with Windows 7 Why windows? We are often interested in fresh data! Highlight: Flink can form and trigger windows consistently under different notions of time and deal with late events! 15 38 65 88 110 120 #sec 40 80 SUM #2 0 SUM #1 20 60 100 120 15 38 65 88 1) Tumbling windows myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS)); #sec 40 80 SUM #3 SUM #2 0 SUM #1 20 60 100 15 38 38 65 65 88 myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS), Time.of(20, TimeUnit.SECONDS)); 2) Sliding windows window buckets/panes
Flink Stack Gelly Table ML SAMOA DataSet (Java/Scala) DataStream (Java/Scala) HadoopM/R Local Remote Yarn Embedded Dataflow Dataflow(WiP) Table Cascading Streaming dataflow runtime CEP 8
Gelly the Flink Graph API
Meet Gelly • Java & Scala Graph APIs on top of Flink • graph transformations and utilities • iterative graph processing • library of graph algorithms • Can be seamlessly mixed with the DataSet Flink API to easily implement applications that use both record-based and graph-based analysis 10
Hello, Gelly! ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env); Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, env); DataSet<Vertex<Long, Long>> verticesWithMinIds = graph.run( new ConnectedComponents(maxIterations)); val env = ExecutionEnvironment.getExecutionEnvironment val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env) val graph = Graph.fromDataSet(edges, env) val components = graph.run(new ConnectedComponents(maxIterations)) Java Scala 11
Graph Methods Graph Properties getVertexIds getEdgeIds numberOfVertices numberOfEdges getDegrees ... 12 Transformations map, filter, join subgraph, union, difference reverse, undirected getTriplets Mutations add vertex/edge remove vertex/edge
Neighborhood Methods graph.reduceOnNeighbors(new MinValue, EdgeDirection.OUT) 13
Iterative Graph Processing • Gelly offers iterative graph processing abstractions on top of Flink’s Delta iterations • Based on the BSP, vertex-centric model • scatter-gather • gather-sum-apply • vertex-centric (pregel)* • partition-centric* 14
Scatter-Gather Iterations • MessagingFunction: generate message for other vertices • VertexUpdateFunction: update vertex value based on received messages 15 Scatter Gather
Gather-Sum-Apply Iterations • Gather: compute one value per edge • Sum: combine the partial values of Gather to a single value • Apply: update the vertex value, based on the Sum and the current value 16 Gather ApplySum
Library of Algorithms • PageRank* • Single Source Shortest Paths* • Label Propagation • Weakly Connected Components* • Community Detection • Triangle Count & Enumeration • Graph Summarization • val ranks = inputGraph.run(new PageRank(0.85, 20)) • *: both scatter-gather and GSA implementations 17
Gelly-Stream single-pass stream graph processing with Flink
Real Graphs are dynamic Graphs are created from events happening in real-time 19
20
20
20
20
20
20
20
20
20
Gelly on Streams 21 DataStreamDataSet Distributed Dataflow Deployment DataStream
Gelly on Streams 21 DataStreamDataSet Distributed Dataflow Deployment Gelly • Static Graphs • Multi-Pass Algorithms • Full Computations DataStream
Gelly on Streams 21 DataStreamDataSet Distributed Dataflow Deployment Gelly Gelly-Stream • Static Graphs • Multi-Pass Algorithms • Full Computations DataStream
Gelly on Streams 21 DataStreamDataSet Distributed Dataflow Deployment Gelly Gelly-Stream • Static Graphs • Multi-Pass Algorithms • Full Computations • Dynamic Graphs • Single-Pass Algorithms • Approximate Computations DataStream
Batch vs. Stream Graph Processing 22 Batch Stream Input Graph static dynamic Analysis on a snapshot continuous Response after job completion immediately
Graph Streaming Challenges • Maintain the graph structure • How to apply state updates efficiently? • Result updates • Re-run the analysis for each event? • Design an incremental algorithm? • Run separate instances on multiple snapshots? • Computation on most recent events only 23
Single-Pass Graph Streaming • Each event is an edge addition • Maintain only a graph summary • Recent events are grouped in graph windows 24
Graph Summaries • spanners for distance estimation • sparsifiers for cut estimation • sketches for homomorphic properties graph summary algorithm algorithm~R1 R2 25
Examples
Batch Connected Components • State: the graph and a component ID per vertex (initially equal to vertex ID) • Iterative Computation: For each vertex: • choose the min of neighbors’ component IDs and own component ID as new ID • if component ID changed since last iteration, notify neighbors 27
1 43 2 5 6 7 8 i=0 Batch Connected Components 28
1 11 2 2 6 6 6 i=1 Batch Connected Components 29
1 11 1 5 6 6 6 i=2 Batch Connected Components 30 1
1 11 1 1 6 6 6 i=3 Batch Connected Components 31
Stream Connected Components • State: a disjoint set data structure for the components • Computation: For each edge • if seen for the 1st time, create a component with ID the min of the vertex IDs • if in different components, merge them and update the component ID to the min of the component IDs • if only one of the endpoints belongs to a component, add the other one to the same component 32
31 52 54 76 86 ComponentID Vertices 1 43 2 5 6 7 8 33
31 52 54 76 86 42 ComponentID Vertices 1 1, 3 1 43 2 5 6 7 8 34
31 52 54 76 86 42 ComponentID Vertices 43 2 2, 5 1 1, 3 1 43 2 5 6 7 8 35
31 52 54 76 86 42 43 87 ComponentID Vertices 2 2, 4, 5 1 1, 3 1 43 2 5 6 7 8 36
31 52 54 76 86 42 43 87 41 ComponentID Vertices 2 2, 4, 5 1 1, 3 6 6, 7 1 43 2 5 6 7 8 37
52 54 76 86 42 43 87 41 ComponentID Vertices 2 2, 4, 5 1 1, 3 6 6, 7, 8 1 43 2 5 6 7 8 38
54 76 86 42 43 87 41 ComponentID Vertices 2 2, 4, 5 1 1, 3 6 6, 7, 8 1 43 2 5 6 7 8 39
76 86 42 43 87 41 ComponentID Vertices 2 2, 4, 5 1 1, 3 6 6, 7, 8 1 43 2 5 6 7 8 40
76 86 42 43 87 41 ComponentID Vertices 6 6, 7, 8 1 1, 2, 3, 4, 5 1 43 2 5 6 7 8 41
86 42 43 87 41 ComponentID Vertices 6 6, 7, 8 1 1, 2, 3, 4, 5 1 43 2 5 6 7 8 42
42 43 87 41 ComponentID Vertices 6 6, 7, 8 1 1, 2, 3, 4, 5 1 43 2 5 6 7 8 43
Distributed Stream Connected Components 44
API Requirements • Continuous aggregations on edge streams • Global graph aggregations • Support for windowing 45
Introducing Gelly-Stream 46 Gelly-Stream enriches the DataStream API with two new additional ADTs: • GraphStream: • A representation of a data stream of edges. • Edges can have state (e.g. weights). • Supports property streams, transformations and aggregations. • GraphWindow: • A “time-slice” of a graph stream. • It enables neighborhood aggregations
GraphStream Operations 47 .getEdges() .getVertices() .numberOfVertices() .numberOfEdges() .getDegrees() .inDegrees() .outDegrees() GraphStream -> DataStream .mapEdges(); .distinct(); .filterVertices(); .filterEdges(); .reverse(); .undirected(); .union(); GraphStream -> GraphStream Property Streams Transformations
Graph Stream Aggregations 48 result aggregate property streamgraph stream (window) fold combine fold reduce local summaries global summary edges agg global aggregates can be persistent or transient graphStream.aggregate( new MyGraphAggregation(window, fold, combine, transform))
Graph Stream Aggregations 49 result aggregate property stream graph stream (window) fold combine transform fold reduce map local summaries global summary edges agg graphStream.aggregate( new MyGraphAggregation(window, fold, combine, transform))
Connected Components 50 graph stream #components
Connected Components 50 graph stream 1 43 2 5 6 7 8 #components
Connected Components 50 graph stream 31 52 1 43 2 5 6 7 8 #components
Connected Components 51 graph stream {1,3} {2,5} 1 43 2 5 6 7 8 #components
Connected Components 52 graph stream {1,3} {2,5} 54 1 43 2 5 6 7 8 #components
Connected Components 53 graph stream {1,3} {2,5} {4,5} 76 86 1 43 2 5 6 7 8 #components
Connected Components 54 graph stream {1,3} {2,5} {4,5} {6,7} {6,8} 1 43 2 5 6 7 8 #components
Connected Components 54 graph stream {1,3} {2,5} {4,5} {6,7} {6,8} 1 43 2 5 6 7 8 #components window triggers
Connected Components 55 graph stream {2,5} {6,8} {1,3} {4,5} {6,7} 1 43 2 5 6 7 8 #components
Connected Components 55 graph stream {2,5} {6,8} {1,3} {4,5} {6,7} 3 1 43 2 5 6 7 8 #components
Connected Components 56 graph stream {1,3} {2,4,5} {6,7,8} 1 43 2 5 6 7 8 #components
Connected Components 56 graph stream {1,3} {2,4,5} {6,7,8} 3 1 43 2 5 6 7 8 #components
Connected Components 57 graph stream {1,3} {2,4,5} {6,7,8} 42 43 1 43 2 5 6 7 8 #components
Connected Components 58 graph stream {1,3} {2,4,5} {6,7,8}{2,4} {3,4} 41 87 1 43 2 5 6 7 8 #components
Connected Components 59 graph stream {1,3} {2,4,5} {6,7,8}{1,2,4} {3,4} {7,8} 1 43 2 5 6 7 8 #components
Connected Components 59 graph stream {1,3} {2,4,5} {6,7,8}{1,2,4} {3,4} {7,8} 1 43 2 5 6 7 8 #components window triggers
Connected Components 60 graph stream {1,2,4,5} {6,7,8} {3,4} {7,8} 1 43 2 5 6 7 8 #components
Connected Components 60 graph stream {1,2,4,5} {6,7,8} 2 {3,4} {7,8} 1 43 2 5 6 7 8 #components
Connected Components 61 graph stream {1,2,3,4,5} {6,7,8} 1 43 2 5 6 7 8 #components
Connected Components 61 graph stream {1,2,3,4,5} {6,7,8} 2 1 43 2 5 6 7 8 #components
Slicing Graph Streams 62 graphStream.slice(Time.of(1, MINUTE)); 11:40 11:41 11:42 11:43
Aggregating Slices 63 graphStream.slice(Time.of(1, MINUTE), direction) • Slicing collocates edges by vertex information • Neighborhood aggregations on sliced graphs source target
Aggregating Slices 63 graphStream.slice(Time.of(1, MINUTE), direction) • Slicing collocates edges by vertex information • Neighborhood aggregations on sliced graphs source target
Aggregating Slices 63 graphStream.slice(Time.of(1, MINUTE), direction) .reduceOnEdges(); .foldNeighbors(); .applyOnNeighbors(); • Slicing collocates edges by vertex information • Neighborhood aggregations on sliced graphs source target Aggregations
Finding Matches Nearby 64
Finding Matches Nearby 64 graphStream.filterVertices(GraphGeeks()) .slice(Time.of(15, MINUTE), EdgeDirection.IN) .applyOnNeighbors(FindPairs()) GraphStream :: graph geek check-ins wendy checked_in soap_bar steve checked_in soap_bar tom checked_in joe’s_grill sandra checked_in soap_bar rafa checked_in joe’s_grill
Finding Matches Nearby 64 graphStream.filterVertices(GraphGeeks()) .slice(Time.of(15, MINUTE), EdgeDirection.IN) .applyOnNeighbors(FindPairs()) slice GraphStream :: graph geek check-ins wendy checked_in soap_bar steve checked_in soap_bar tom checked_in joe’s_grill sandra checked_in soap_bar rafa checked_in joe’s_grill wendy steve sandra soap bar tom rafa joe’s grill GraphWindow :: user-place
Finding Matches Nearby 64 graphStream.filterVertices(GraphGeeks()) .slice(Time.of(15, MINUTE), EdgeDirection.IN) .applyOnNeighbors(FindPairs()) slice GraphStream :: graph geek check-ins wendy checked_in soap_bar steve checked_in soap_bar tom checked_in joe’s_grill sandra checked_in soap_bar rafa checked_in joe’s_grill wendy steve sandra soap bar tom rafa joe’s grill FindPairs {wendy, steve} {steve, sandra} {wendy, sandra} {tom, rafa} GraphWindow :: user-place
What’s next? • Integration with Neo4j (Input / Output) • OpenCypher on Flink/Gelly • Pregel and Partition-Centric Iterations • Integration with Graphalytics
Feeling Gelly? • Gelly Guide https://ci.apache.org/projects/flink/flink-docs-master/libs/gelly_guide.html • Gelly-Stream Repository https://github.com/vasia/gelly-streaming • Gelly-Stream talk @FOSDEM16 https://fosdem.org/2016/schedule/event/graph_processing_apache_flink/ • An interesting read http://people.cs.umass.edu/~mcgregor/papers/13-graphsurvey.pdf • A cool thesis http://urn.kb.se/resolve?urn=urn:nbn:se:kth:diva-170425

Batch and Stream Graph Processing with Apache Flink