GRADOOP: Scalable Graph Analytics with Apache Flink Martin Junghanns @kc1s Leipzig University
About the speaker and the team André, PhD StudentMartin, PhD Student Kevin, M.Sc. StudentNiklas, M.Sc. Student Prof. Dr. Erhard Rahm Database Chair
Motivation
𝑮𝑟𝑟𝑟𝑟 = (𝑽𝑒𝑒𝑒𝑒𝑒𝑒𝑒, 𝑬𝑑𝑑𝑑𝑑) “Graphs are everywhere”
𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹) “Graphs are everywhere” Alice Bob Eve Dave Carol Mallory Peggy Trent
𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹) “Graphs are everywhere” Alice Bob Eve Dave Carol Mallory Peggy Trent
“Graphs are heterogeneous” Alice Bob AC/DC Dave Carol Mallory Peggy Metallica 𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔 ∪ 𝐁𝐁𝐁𝐁𝐁, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹 ∪ 𝐿𝐿𝐿𝐿𝐿)
𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔 ∪ 𝐁𝐁𝐁𝐁𝐁, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹 ∪ 𝐿𝐿𝐿𝐿𝐿) “Graphs can be analyzed” Alice Bob AC/DC Dave Carol Mallory Peggy Metallica
0.2 0.28 0.26 0.33 0.25 0.26 “Graphs can be analyzed” Alice Bob AC/DC Dave Carol Mallory Peggy Metallica 3.6 2.82 𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔 ∪ 𝐁𝐁𝐁𝐁𝐁, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹 ∪ 𝐿𝐿𝐿𝐿𝐿)
“Graphs can be analyzed“ Assuming a social network
“Graphs can be analyzed“ Assuming a social network 1. Determine subgraph
“Graphs can be analyzed“ Assuming a social network 1. Determine subgraph 2. Find communities
“Graphs can be analyzed“ Assuming a social network 1. Determine subgraph 2. Find communities 3. Filter communities
“Graphs can be analyzed“ Assuming a social network 1. Determine subgraph 2. Find communities 3. Filter communities 4. Find common subgraph
“Graph data models must be expressive“ Assuming a social network  Heterogeneous data 1. Determine subgraph  Apply graph transformation 2. Find communities  Handle collections of graphs 3. Filter communities  Aggregation, Selection 4. Find common subgraph  Apply dedicated algorithms
„And let‘s not forget …“
“…Graphs are large”
A framework and research platform for efficient, distributed and domain independent graph data management and analytics.
High Level Architecture HDFS/YARN Cluster HBase Distributed Graph Store Extended Property Graph Model Flink Operator Implementation Graph Analytical DSL  Java  25K (33K) LOC  GPLv3
Extended Property Graph Model (EPGM)
EPGM – Graph Representation 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3
[2] Community | interest : Graphs | vertexCount : 4 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 EPGM – Graph Representation [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013
EPGM – Operators and Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
EPGM – Operators and Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
EPGM – Operators and Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
Combination 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) [2] Community | interest : Graphs| vertexCount : 4 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 DB
[0] Community | interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2])
EPGM – Operators and Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
[0] Community | interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc)
[0] Community | interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc) [5] [11] Person city : Leipzig count : 2 [12] Person city : Dresden count : 3 [13] Person city : Berlin count : 1 24 25 26 27 28 knows count : 3 knows count : 1 knows count : 2 knows count : 2 knows count : 2
[0] Community | interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping + Aggregation [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc) 7: aggFunc = (g => |g.E|) 8: aggGraph = sumGraph.aggregate(“edgeCount”, aggFunc) [5] [11] Person city : Leipzig count : 2 [12] Person city : Dresden count : 3 [13] Person city : Berlin count : 1 24 25 26 27 28 knows count : 3 knows count : 1 knows count : 2 knows count : 2 knows count : 2
[0] Community | interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping + Aggregation [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc) 7: aggFunc = (g => |g.E|) 8: aggGraph = sumGraph.aggregate(“edgeCount”, aggFunc) [5] edgeCount : 5 [11] Person city : Leipzig count : 2 [12] Person city : Dresden count : 3 [13] Person city : Berlin count : 1 24 25 26 27 28 knows count : 3 knows count : 1 knows count : 2 knows count : 2 knows count : 2
EPGM – Operators and Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
Selection 1: resultColl = db.G[0,1,2].select((g => g[“vertexCount”] > 3)) [2] Community | interest : Graphs | vertexCount : 4 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 DB
Selection 1: resultColl = db.G[0,1,2].select((g => g[“vertexCount”] > 3)) [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 DB [2] Community | interest : Graphs | vertexCount : 4 [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 0 1 2 3 4 5 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014
EPGM – Operators and Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
Apache Flink
Apache Flink http://www.slideshare.net/robertmetzger1/apache-flink-meetup-munich-november-2015-flink-overview-architecture-integrations-and-use-case „Streaming Dataflow Engine that provides • data distribution, • communication, • and fault tolerance for distributed computations over data streams.“ HDFS LocalFS HBase JDBC Kafka RabbitMQ Flume (Neo4j) EmbeddedTezYarnClusterLocal Streaming Dataflow Runtime DataSet DataStream HadoopMR Table Gelly ML Table Zeppelin Cascading MRQL Dataflow Storm(wip) Dataflow(wip) SAMOA
Apache Flink – DataSet API  DataSet := Distributed Collection of Data  Transformation := Operation applied on DataSet  Flink Program := Composition of Transformations DataSet DataSet DataSet Transformation Transformation DataSet DataSet Transformation DataSet Flink Program
Apache Flink – DataSet Transformations  aggregate  coGroup  cross  distinct  filter  first-N  flatMap  groupBy  join  leftOuterJoin  rightOuterJoin  fullOuterJoin  map  mapPartition  project  reduce  reduceGroup  union  iterate  iterateDelta
The „Hello World“ of Big Data – Word Count 1: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 2: 3: DataSet<String> text = env.fromElements( // or env.readTextFile(„hdfs://…“) 4: „He who controls the past controls the future.“, 5: „He who controls the present controls the past.“); 6: 7: DataSet<Tuple2<String, Integer>> wordCounts = text 8: .flatMap(new LineSplitter()) // splits the line and outputs (word, 1) tuples 9: .groupBy(0) 10: .sum(1); 11: 12: wordCounts.print(); // trigger execution flatMap „He who controls the past controls the future.“ „He who controls the present controls the past.“ (He,1) (who,1) (controls,1) (the,1) (past,1) // ... groupBy(0) [(He,1),(He,1)] [(who,1),(who,1)] [(future,1)] [(past,1),(past,1)] [(present,1)] // ... sum(1) (He,2) (who,2) (future,1) (past,2) (present,1) // ...
EPGM on Apache Flink
EPGM on Apache Flink – User facing API LogicalGraph fromCollections(…) : LogicalGraph fromDataSets(…) : LogicalGraph fromGellyGraph(…) : LogicalGraph getGraphHead() : DataSet<EPGMGraphHead> toGellyGraph() : Graph combine(…) : LogicalGraph intersect(…) : LogicalGraph groupBy(…) : LogicalGraph match(…) : GraphCollection // ... GraphCollection fromCollections(…) : GraphCollection fromDataSets(…) : GraphCollection getGraphHeads() : DataSet<EPGMGraphHead> getGraph(…) : LogicalGraph getGraphs(…) : GraphCollection select(…) : GraphCollection union(…) : GraphCollection distinct(…) : GraphCollection sortBy(…) : GraphCollection // ... GraphBase getVertices() : DataSet<EPGMVertex> getEdges() : DataSet<EPGMEdge> // ... graphHeads : DataSet<EPGMGraphHead> vertices : DataSet<EPGMVertex> edges : DataSet<EPGMEdge> EPGMDatabase fromCollections(…) : EPGMDatabase fromJSONFile(…) : EPGMDatabase fromHBase(…) : EPGMDatabase writeAsJSON(…) : void writeToHBase(…) : void getDatabaseGraph() : LogicalGraph // ...
EPGM on Apache Flink – DataSets Id Label Properties Graphs Id Label Properties SourceId TargetId Graphs EPGMGraphHead EPGMVertex EPGMEdge Id Label Properties POJO POJO POJO DataSet<EPGMGraphHead> DataSet<EPGMVertex> DataSet<EPGMEdge> Id Label Properties Graphs EPGMVertex GradoopId := UUID 128-bit String PropertyList := List<Property> Property := (String, PropertyValue) PropertyValue := byte[] GradoopIdSet := Set<GradoopId>
EPGM on Apache Flink – Exclusion // input: firstGraph (G[0]), secondGraph (G[2]) 1: DataSet<GradoopId> graphId = secondGraph.getGraphHead() 2: .map(new Id<G>()); 3: 4: DataSet<V> newVertices = firstGraph.getVertices() 5: .filter(new NotInGraphBroadCast<V>()) 6: .withBroadcastSet(graphId, GRAPH_ID); 7: 8: DataSet<E> newEdges = firstGraph.getEdges() 9: .filter(new NotInGraphBroadCast<E>()) 10: .withBroadcastSet(graphId, GRAPH_ID) 11: .join(newVertices) 12: .where(new SourceId<E>().equalTo(new Id<V>()) 13: .with(new LeftSide<E, V>()) 14: .join(newVertices) 15: .where(new TargetId<E>().equalTo(new Id<V>()) 16: .with(new LeftSide<E, V>()); db.G[0].exclude(db.G[2]) [2] Community | interest : Graphs| vertexCount : 4 [0] Community | interest : Databases | vertexCount : 3 [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en 0 1 2 3 4 5 6 7 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2013
EPGM on Apache Flink – Exclusion Id Label Properties 2 Community interest: Graphs vertexCount: 4 graphId = secondGraph.getGraphHead() Id 2 newVertices = firstGraph.getVertices() Id Label Properties Graphs 5 Person name: Alice gender: f … [0, 2] 6 Person name: Bob gender: m … [0, 2] 9 Person name: Eve gender: f … [0] Id Label Properties Graphs 9 Person name: Eve gender: f … [0] .map(new Id<G>()); .filter(new NotInGraphBroadCast<V>()) .withBroadcastSet(graphId, GRAPH_ID);
EPGM in Apache Flink – Exclusion newEdges = firstGraph.getEdges() Id Label SourceId TargetId Properties Graphs 0 knows 5 6 since: 2014 [0, 2] 1 knows 6 5 since: 2014 [0, 2] 6 knows 9 5 since: 2013 [0] 7 knows 9 6 since: 2015 [0] Id Label SourceId TargetId Properties Graphs 6 knows 9 5 since: 2013 [0] 7 knows 9 6 since: 2015 [0] Id Label SourceId TargetId … Id Label … 6 knows 9 5 … 9 Person … 7 knows 9 6 … 9 Person … Id Label SourceId TargetId … 6 knows 9 5 … 7 knows 9 6 … Id Label SourceId TargetId … Id Label … Id Label SourceId TargetId ….with(new LeftSide<E, V>()); .join(newVertices) .where(new TargetId<E>().equalTo(new Id<V>()) .with(new LeftSide<E, V>()) .join(newVertices) .where(new SourceId<E>().equalTo(new Id<V>()) .filter(new NotInGraphBroadCast<E>()) .withBroadcastSet(graphId, GRAPH_ID)
Social Network Example
LDBC Social Network Data http://ldbcouncil.org/
LDBC Social Network Data socialNetwork .subgraph( (v => v.label == ‘Person‘), (e => e.label == ‘knows‘)) .transform( (gIn, gOut => gOut = gIn), (vIn, vOut => { vOut.label = vIn.label, vOut[‘city‘] = vIn[‘city‘] vOut[‘gender‘] = vIn[‘gender‘] vOut[‘key‘] = vIn[‘birthday‘]}), (eIn, eOut) => eOut.label = eIn.label)) .callForCollection(:LabelPropagation, [‘key‘, 4]) .apply(g => g.aggregate(‘vertexCount‘, (h => |h.V|)) .select(g => g[‘vertexCount‘] > 50000) .reduce(g, h => g.combine(h)) .groupBy( [‘city‘,‘gender‘], (superVertex, vertices => superVertex[‘count‘] = |vertices|), [], (superEdge, edges => superEdge[‘count‘] = |edges|) .aggregate(‘vCount‘, (g => |g.V|)) .aggregate(‘eCount‘, (g => |g.E|))
LDBC Social Network Data socialNetwork .subgraph( (v => v.label == ‘Person‘), (e => e.label == ‘knows‘)) .transform( (gIn, gOut => gOut = gIn), (vIn, vOut => { vOut.label = vIn.label, vOut[‘city‘] = vIn[‘city‘] vOut[‘gender‘] = vIn[‘gender‘] vOut[‘key‘] = vIn[‘birthday‘]}), (eIn, eOut) => eOut.label = eIn.label)) .callForCollection(:LabelPropagation, [‘key‘, 4]) .apply(g => g.aggregate(‘vertexCount‘, (h => |h.V|)) .select(g => g[‘vertexCount‘] > 50000) .reduce(g, h => g.combine(h)) .groupBy( [‘city‘,‘gender‘], (superVertex, vertices => superVertex[‘count‘] = |vertices|), [], (superEdge, edges => superEdge[‘count‘] = |edges|) .aggregate(‘vCount‘, (g => |g.V|)) .aggregate(‘eCount‘, (g => |g.E|))
Benchmark Results Dataset # Vertices # Edges Graphalytics.1 61,613 2,026,082 Graphalytics.10 260,613 16,600,778 Graphalytics.100 1,695,613 147,437,275 Graphalytics.1000 12,775,613 1,363,747,260 Graphalytics.10000 90,025,613 10,872,109,028  16x Intel(R) Xeon(R) 2.50GHz (6 Cores)  16x 48 GB RAM  1 Gigabit Ethernet  Hadoop 2.6.0  Flink 1.0-SNAPSHOT  slots (per worker) 12  jobmanager.heap.mb 2048  taskmanager.heap.mb 40960 0 200 400 600 800 1000 1200 1 2 4 8 16 Runtime[s] Number of workers Runtime Graphalytics.100
Benchmark Results Dataset # Vertices # Edges Graphalytics.1 61,613 2,026,082 Graphalytics.10 260,613 16,600,778 Graphalytics.100 1,695,613 147,437,275 Graphalytics.1000 12,775,613 1,363,747,260 Graphalytics.10000 90,025,613 10,872,109,028 1 2 4 8 16 1 2 4 8 16 Speedup Number of workers Speedup Graphalytics.100 Linear  16x Intel(R) Xeon(R) 2.50GHz (6 Cores)  16x 48 GB RAM  1 Gigabit Ethernet  Hadoop 2.6.0  Flink 1.0-SNAPSHOT  slots (per worker) 12  jobmanager.heap.mb 2048  taskmanager.heap.mb 40960
Benchmark Results Dataset # Vertices # Edges Graphalytics.1 61,613 2,026,082 Graphalytics.10 260,613 16,600,778 Graphalytics.100 1,695,613 147,437,275 Graphalytics.1000 12,775,613 1,363,747,260 Graphalytics.10000 90,025,613 10,872,109,028 1 10 100 1000 10000 Runtime[s] Datasets  16x Intel(R) Xeon(R) 2.50GHz (6 Cores)  16x 48 GB RAM  1 Gigabit Ethernet  Hadoop 2.6.0  Flink 1.0-SNAPSHOT  slots (per worker) 12  jobmanager.heap.mb 2048  taskmanager.heap.mb 40960
Current State & Future Work
Current State Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
Current State  0.0.1 First Prototype (May 2015)  Hadoop MapReduce and Giraph for operator implementations  Too much complexity  Performance loss through serialization in HDFS/HBase  0.0.2 Using Flink as execution layer (June 2015)  Basic operators  0.1 December 2015  System-side identifiers (UUID)  Improved property handling  More operator implementations (e.g., Equality, Bool operators)  Code refactoring  0.2-SNAPSHOT  Graph Pattern Matching  Frequent Subgraph Mining  Memory optimization (96-bit ID, Dictionary Encoding, …)  Tuple Implementation
Contributions to Flink  FLINK-2411 Add basic graph summarization algorithm  FLINK-2590 DataSetUtils.zipWithUniqueID creates duplicate Ids  FLINK-2905 Add intersect method to Graph class  FLINK-2910 Combine tests for binary graph operators  FLINK-2941 Implement a neo4j - Flink/Gelly connector  FLINK-2981 Update README for building docs  FLINK-3064 Missing size check in GroupReduceOperatorBase leads to NPE  FLINK-3118 Check if MessageFunction implements ResultTypeQueryable  FLINK-3122 Generalize value type in LabelPropagation  FLINK-3272 Generalize vertex value type in ConnectedComponents  Flink Forward (October 2015)  Meetup Big Data Usergroup Saxony (December 2015)
Contributions welcome!  Code  Operator implementations  Performance Tuning  Extend HBase Storage  People  Bachelor / Master Thesis  Open PhD positions in Leipzig, Germany  Data! and Use Cases  We are researchers, we assume ...
Thank you! www.gradoop.com https://flink.apache.org http://ldbcouncil.org/ http://dbs.uni-leipzig.de/file/GradoopTR.pdf http://dbs.uni-leipzig.de/file/biiig-vldb2014.pdf https://github.com/dbs-leipzig/gradoop https://github.com/s1ck/gdl https://github.com/s1ck/ldbc-flink-import https://github.com/s1ck/flink-neo4j

Gradoop: Scalable Graph Analytics with Apache Flink @ FOSDEM 2016

  • 1.
    GRADOOP: Scalable Graph Analyticswith Apache Flink Martin Junghanns @kc1s Leipzig University
  • 2.
    About the speakerand the team André, PhD StudentMartin, PhD Student Kevin, M.Sc. StudentNiklas, M.Sc. Student Prof. Dr. Erhard Rahm Database Chair
  • 3.
  • 4.
    𝑮𝑟𝑟𝑟𝑟 = (𝑽𝑒𝑒𝑒𝑒𝑒𝑒𝑒,𝑬𝑑𝑑𝑑𝑑) “Graphs are everywhere”
  • 5.
    𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔,𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹) “Graphs are everywhere” Alice Bob Eve Dave Carol Mallory Peggy Trent
  • 6.
    𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔,𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹) “Graphs are everywhere” Alice Bob Eve Dave Carol Mallory Peggy Trent
  • 7.
    “Graphs are heterogeneous” Alice Bob AC/DC Dave Carol Mallory Peggy Metallica 𝐺𝐺𝐺𝐺𝐺= (𝐔𝐔𝐔𝐔𝐔 ∪ 𝐁𝐁𝐁𝐁𝐁, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹 ∪ 𝐿𝐿𝐿𝐿𝐿)
  • 8.
    𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔∪ 𝐁𝐁𝐁𝐁𝐁, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹 ∪ 𝐿𝐿𝐿𝐿𝐿) “Graphs can be analyzed” Alice Bob AC/DC Dave Carol Mallory Peggy Metallica
  • 9.
    0.2 0.28 0.26 0.33 0.25 0.26 “Graphs can beanalyzed” Alice Bob AC/DC Dave Carol Mallory Peggy Metallica 3.6 2.82 𝐺𝐺𝐺𝐺𝐺 = (𝐔𝐔𝐔𝐔𝐔 ∪ 𝐁𝐁𝐁𝐁𝐁, 𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹𝐹 ∪ 𝐿𝐿𝐿𝐿𝐿)
  • 10.
    “Graphs can beanalyzed“ Assuming a social network
  • 11.
    “Graphs can beanalyzed“ Assuming a social network 1. Determine subgraph
  • 12.
    “Graphs can beanalyzed“ Assuming a social network 1. Determine subgraph 2. Find communities
  • 13.
    “Graphs can beanalyzed“ Assuming a social network 1. Determine subgraph 2. Find communities 3. Filter communities
  • 14.
    “Graphs can beanalyzed“ Assuming a social network 1. Determine subgraph 2. Find communities 3. Filter communities 4. Find common subgraph
  • 15.
    “Graph data modelsmust be expressive“ Assuming a social network  Heterogeneous data 1. Determine subgraph  Apply graph transformation 2. Find communities  Handle collections of graphs 3. Filter communities  Aggregation, Selection 4. Find common subgraph  Apply dedicated algorithms
  • 16.
    „And let‘s notforget …“
  • 17.
  • 18.
    A framework andresearch platform for efficient, distributed and domain independent graph data management and analytics.
  • 19.
    High Level Architecture HDFS/YARN Cluster HBaseDistributed Graph Store Extended Property Graph Model Flink Operator Implementation Graph Analytical DSL  Java  25K (33K) LOC  GPLv3
  • 20.
  • 21.
    EPGM – GraphRepresentation 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3
  • 22.
    [2] Community |interest : Graphs | vertexCount : 4 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 EPGM – Graph Representation [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013
  • 23.
    EPGM – Operatorsand Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 24.
    EPGM – Operatorsand Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 25.
    EPGM – Operatorsand Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 26.
    Combination 1: personGraph =db.G[0].combine(db.G[1]).combine(db.G[2]) [2] Community | interest : Graphs| vertexCount : 4 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 DB
  • 27.
    [0] Community |interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2])
  • 28.
    EPGM – Operatorsand Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 29.
    [0] Community |interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc)
  • 30.
    [0] Community |interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc) [5] [11] Person city : Leipzig count : 2 [12] Person city : Dresden count : 3 [13] Person city : Berlin count : 1 24 25 26 27 28 knows count : 3 knows count : 1 knows count : 2 knows count : 2 knows count : 2
  • 31.
    [0] Community |interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping + Aggregation [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc) 7: aggFunc = (g => |g.E|) 8: aggGraph = sumGraph.aggregate(“edgeCount”, aggFunc) [5] [11] Person city : Leipzig count : 2 [12] Person city : Dresden count : 3 [13] Person city : Berlin count : 1 24 25 26 27 28 knows count : 3 knows count : 1 knows count : 2 knows count : 2 knows count : 2
  • 32.
    [0] Community |interest : Databases | vertexCount : 3 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag DB Combination + Grouping + Aggregation [4] [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 1: personGraph = db.G[0].combine(db.G[1]).combine(db.G[2]) 2: vertexGroupingKeys = [:label, “city”] 3: edgeGroupingKeys = [: label] 4: vertexAggFunc = (superVertex, vertices => superVertex[“count”] = |vertices|) 5: edgeAggFunc = (superEdge, edges => superEdge[“count”] = |edges|) 6: sumGraph = personGraph.groupBy(vertexGroupingKeys, vertexAggFunc, edgeGroupingKeys, edgeAggFunc) 7: aggFunc = (g => |g.E|) 8: aggGraph = sumGraph.aggregate(“edgeCount”, aggFunc) [5] edgeCount : 5 [11] Person city : Leipzig count : 2 [12] Person city : Dresden count : 3 [13] Person city : Berlin count : 1 24 25 26 27 28 knows count : 3 knows count : 1 knows count : 2 knows count : 2 knows count : 2
  • 33.
    EPGM – Operatorsand Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 34.
    Selection 1: resultColl =db.G[0,1,2].select((g => g[“vertexCount”] > 3)) [2] Community | interest : Graphs | vertexCount : 4 [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 knows since : 2014 knows since : 2014 knows since : 2013 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 DB
  • 35.
    Selection 1: resultColl =db.G[0,1,2].select((g => g[“vertexCount”] > 3)) [1] Community | interest : Hadoop| vertexCount : 3[0] Community | interest : Databases | vertexCount : 3 [0] Tag name : Databases [1] Tag name : Graphs [2] Tag name : Hadoop [3] Forum title : Graph Databases [4] Forum title : Graph Processing [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en [10] Person name : Frank gender : m city : Berlin age : 23 IP: 169.32.1.3 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 hasInterest hasInterest hasInterest hasInterest hasModeratorhasModerator hasMember hasMember hasMember hasMember hasTag hasTaghasTag hasTag knows since : 2015 knows since : 2015 knows since : 2015 knows since : 2013 DB [2] Community | interest : Graphs | vertexCount : 4 [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 0 1 2 3 4 5 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014
  • 36.
    EPGM – Operatorsand Algorithms Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation Pattern Matching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 37.
  • 38.
    Apache Flink http://www.slideshare.net/robertmetzger1/apache-flink-meetup-munich-november-2015-flink-overview-architecture-integrations-and-use-case „Streaming DataflowEngine that provides • data distribution, • communication, • and fault tolerance for distributed computations over data streams.“ HDFS LocalFS HBase JDBC Kafka RabbitMQ Flume (Neo4j) EmbeddedTezYarnClusterLocal Streaming Dataflow Runtime DataSet DataStream HadoopMR Table Gelly ML Table Zeppelin Cascading MRQL Dataflow Storm(wip) Dataflow(wip) SAMOA
  • 39.
    Apache Flink –DataSet API  DataSet := Distributed Collection of Data  Transformation := Operation applied on DataSet  Flink Program := Composition of Transformations DataSet DataSet DataSet Transformation Transformation DataSet DataSet Transformation DataSet Flink Program
  • 40.
    Apache Flink –DataSet Transformations  aggregate  coGroup  cross  distinct  filter  first-N  flatMap  groupBy  join  leftOuterJoin  rightOuterJoin  fullOuterJoin  map  mapPartition  project  reduce  reduceGroup  union  iterate  iterateDelta
  • 41.
    The „Hello World“of Big Data – Word Count 1: ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 2: 3: DataSet<String> text = env.fromElements( // or env.readTextFile(„hdfs://…“) 4: „He who controls the past controls the future.“, 5: „He who controls the present controls the past.“); 6: 7: DataSet<Tuple2<String, Integer>> wordCounts = text 8: .flatMap(new LineSplitter()) // splits the line and outputs (word, 1) tuples 9: .groupBy(0) 10: .sum(1); 11: 12: wordCounts.print(); // trigger execution flatMap „He who controls the past controls the future.“ „He who controls the present controls the past.“ (He,1) (who,1) (controls,1) (the,1) (past,1) // ... groupBy(0) [(He,1),(He,1)] [(who,1),(who,1)] [(future,1)] [(past,1),(past,1)] [(present,1)] // ... sum(1) (He,2) (who,2) (future,1) (past,2) (present,1) // ...
  • 42.
  • 43.
    EPGM on ApacheFlink – User facing API LogicalGraph fromCollections(…) : LogicalGraph fromDataSets(…) : LogicalGraph fromGellyGraph(…) : LogicalGraph getGraphHead() : DataSet<EPGMGraphHead> toGellyGraph() : Graph combine(…) : LogicalGraph intersect(…) : LogicalGraph groupBy(…) : LogicalGraph match(…) : GraphCollection // ... GraphCollection fromCollections(…) : GraphCollection fromDataSets(…) : GraphCollection getGraphHeads() : DataSet<EPGMGraphHead> getGraph(…) : LogicalGraph getGraphs(…) : GraphCollection select(…) : GraphCollection union(…) : GraphCollection distinct(…) : GraphCollection sortBy(…) : GraphCollection // ... GraphBase getVertices() : DataSet<EPGMVertex> getEdges() : DataSet<EPGMEdge> // ... graphHeads : DataSet<EPGMGraphHead> vertices : DataSet<EPGMVertex> edges : DataSet<EPGMEdge> EPGMDatabase fromCollections(…) : EPGMDatabase fromJSONFile(…) : EPGMDatabase fromHBase(…) : EPGMDatabase writeAsJSON(…) : void writeToHBase(…) : void getDatabaseGraph() : LogicalGraph // ...
  • 44.
    EPGM on ApacheFlink – DataSets Id Label Properties Graphs Id Label Properties SourceId TargetId Graphs EPGMGraphHead EPGMVertex EPGMEdge Id Label Properties POJO POJO POJO DataSet<EPGMGraphHead> DataSet<EPGMVertex> DataSet<EPGMEdge> Id Label Properties Graphs EPGMVertex GradoopId := UUID 128-bit String PropertyList := List<Property> Property := (String, PropertyValue) PropertyValue := byte[] GradoopIdSet := Set<GradoopId>
  • 45.
    EPGM on ApacheFlink – Exclusion // input: firstGraph (G[0]), secondGraph (G[2]) 1: DataSet<GradoopId> graphId = secondGraph.getGraphHead() 2: .map(new Id<G>()); 3: 4: DataSet<V> newVertices = firstGraph.getVertices() 5: .filter(new NotInGraphBroadCast<V>()) 6: .withBroadcastSet(graphId, GRAPH_ID); 7: 8: DataSet<E> newEdges = firstGraph.getEdges() 9: .filter(new NotInGraphBroadCast<E>()) 10: .withBroadcastSet(graphId, GRAPH_ID) 11: .join(newVertices) 12: .where(new SourceId<E>().equalTo(new Id<V>()) 13: .with(new LeftSide<E, V>()) 14: .join(newVertices) 15: .where(new TargetId<E>().equalTo(new Id<V>()) 16: .with(new LeftSide<E, V>()); db.G[0].exclude(db.G[2]) [2] Community | interest : Graphs| vertexCount : 4 [0] Community | interest : Databases | vertexCount : 3 [5] Person name : Alice gender : f city : Leipzig age : 23 [6] Person name : Bob gender : m city : Leipzig age : 30 [7] Person name : Carol gender : f city : Dresden age : 30 [8] Person name : Dave gender : m city : Dresden age : 42 [9] Person name : Eve gender : f city : Dresden age : 35 speaks : en 0 1 2 3 4 5 6 7 knows since : 2014 knows since : 2014 knows since : 2013 knows since : 2013 knows since : 2014 knows since : 2014 knows since : 2015 knows since : 2013
  • 46.
    EPGM on ApacheFlink – Exclusion Id Label Properties 2 Community interest: Graphs vertexCount: 4 graphId = secondGraph.getGraphHead() Id 2 newVertices = firstGraph.getVertices() Id Label Properties Graphs 5 Person name: Alice gender: f … [0, 2] 6 Person name: Bob gender: m … [0, 2] 9 Person name: Eve gender: f … [0] Id Label Properties Graphs 9 Person name: Eve gender: f … [0] .map(new Id<G>()); .filter(new NotInGraphBroadCast<V>()) .withBroadcastSet(graphId, GRAPH_ID);
  • 47.
    EPGM in ApacheFlink – Exclusion newEdges = firstGraph.getEdges() Id Label SourceId TargetId Properties Graphs 0 knows 5 6 since: 2014 [0, 2] 1 knows 6 5 since: 2014 [0, 2] 6 knows 9 5 since: 2013 [0] 7 knows 9 6 since: 2015 [0] Id Label SourceId TargetId Properties Graphs 6 knows 9 5 since: 2013 [0] 7 knows 9 6 since: 2015 [0] Id Label SourceId TargetId … Id Label … 6 knows 9 5 … 9 Person … 7 knows 9 6 … 9 Person … Id Label SourceId TargetId … 6 knows 9 5 … 7 knows 9 6 … Id Label SourceId TargetId … Id Label … Id Label SourceId TargetId ….with(new LeftSide<E, V>()); .join(newVertices) .where(new TargetId<E>().equalTo(new Id<V>()) .with(new LeftSide<E, V>()) .join(newVertices) .where(new SourceId<E>().equalTo(new Id<V>()) .filter(new NotInGraphBroadCast<E>()) .withBroadcastSet(graphId, GRAPH_ID)
  • 48.
  • 49.
    LDBC Social NetworkData http://ldbcouncil.org/
  • 50.
    LDBC Social NetworkData socialNetwork .subgraph( (v => v.label == ‘Person‘), (e => e.label == ‘knows‘)) .transform( (gIn, gOut => gOut = gIn), (vIn, vOut => { vOut.label = vIn.label, vOut[‘city‘] = vIn[‘city‘] vOut[‘gender‘] = vIn[‘gender‘] vOut[‘key‘] = vIn[‘birthday‘]}), (eIn, eOut) => eOut.label = eIn.label)) .callForCollection(:LabelPropagation, [‘key‘, 4]) .apply(g => g.aggregate(‘vertexCount‘, (h => |h.V|)) .select(g => g[‘vertexCount‘] > 50000) .reduce(g, h => g.combine(h)) .groupBy( [‘city‘,‘gender‘], (superVertex, vertices => superVertex[‘count‘] = |vertices|), [], (superEdge, edges => superEdge[‘count‘] = |edges|) .aggregate(‘vCount‘, (g => |g.V|)) .aggregate(‘eCount‘, (g => |g.E|))
  • 51.
    LDBC Social NetworkData socialNetwork .subgraph( (v => v.label == ‘Person‘), (e => e.label == ‘knows‘)) .transform( (gIn, gOut => gOut = gIn), (vIn, vOut => { vOut.label = vIn.label, vOut[‘city‘] = vIn[‘city‘] vOut[‘gender‘] = vIn[‘gender‘] vOut[‘key‘] = vIn[‘birthday‘]}), (eIn, eOut) => eOut.label = eIn.label)) .callForCollection(:LabelPropagation, [‘key‘, 4]) .apply(g => g.aggregate(‘vertexCount‘, (h => |h.V|)) .select(g => g[‘vertexCount‘] > 50000) .reduce(g, h => g.combine(h)) .groupBy( [‘city‘,‘gender‘], (superVertex, vertices => superVertex[‘count‘] = |vertices|), [], (superEdge, edges => superEdge[‘count‘] = |edges|) .aggregate(‘vCount‘, (g => |g.V|)) .aggregate(‘eCount‘, (g => |g.E|))
  • 52.
    Benchmark Results Dataset #Vertices # Edges Graphalytics.1 61,613 2,026,082 Graphalytics.10 260,613 16,600,778 Graphalytics.100 1,695,613 147,437,275 Graphalytics.1000 12,775,613 1,363,747,260 Graphalytics.10000 90,025,613 10,872,109,028  16x Intel(R) Xeon(R) 2.50GHz (6 Cores)  16x 48 GB RAM  1 Gigabit Ethernet  Hadoop 2.6.0  Flink 1.0-SNAPSHOT  slots (per worker) 12  jobmanager.heap.mb 2048  taskmanager.heap.mb 40960 0 200 400 600 800 1000 1200 1 2 4 8 16 Runtime[s] Number of workers Runtime Graphalytics.100
  • 53.
    Benchmark Results Dataset #Vertices # Edges Graphalytics.1 61,613 2,026,082 Graphalytics.10 260,613 16,600,778 Graphalytics.100 1,695,613 147,437,275 Graphalytics.1000 12,775,613 1,363,747,260 Graphalytics.10000 90,025,613 10,872,109,028 1 2 4 8 16 1 2 4 8 16 Speedup Number of workers Speedup Graphalytics.100 Linear  16x Intel(R) Xeon(R) 2.50GHz (6 Cores)  16x 48 GB RAM  1 Gigabit Ethernet  Hadoop 2.6.0  Flink 1.0-SNAPSHOT  slots (per worker) 12  jobmanager.heap.mb 2048  taskmanager.heap.mb 40960
  • 54.
    Benchmark Results Dataset #Vertices # Edges Graphalytics.1 61,613 2,026,082 Graphalytics.10 260,613 16,600,778 Graphalytics.100 1,695,613 147,437,275 Graphalytics.1000 12,775,613 1,363,747,260 Graphalytics.10000 90,025,613 10,872,109,028 1 10 100 1000 10000 Runtime[s] Datasets  16x Intel(R) Xeon(R) 2.50GHz (6 Cores)  16x 48 GB RAM  1 Gigabit Ethernet  Hadoop 2.6.0  Flink 1.0-SNAPSHOT  slots (per worker) 12  jobmanager.heap.mb 2048  taskmanager.heap.mb 40960
  • 55.
    Current State &Future Work
  • 56.
    Current State Operators Unary Binary GraphCollectionLogicalGraph Algorithms Aggregation PatternMatching Transformation Grouping Equality Call * Combination Overlap Exclusion Equality Union Intersection Difference Gelly Library BTG Extraction Frequent Subgraphs Limit Selection Distinct Sort Apply * Reduce * Call * * auxiliary Adaptive Partitioning Subgraph
  • 57.
    Current State  0.0.1First Prototype (May 2015)  Hadoop MapReduce and Giraph for operator implementations  Too much complexity  Performance loss through serialization in HDFS/HBase  0.0.2 Using Flink as execution layer (June 2015)  Basic operators  0.1 December 2015  System-side identifiers (UUID)  Improved property handling  More operator implementations (e.g., Equality, Bool operators)  Code refactoring  0.2-SNAPSHOT  Graph Pattern Matching  Frequent Subgraph Mining  Memory optimization (96-bit ID, Dictionary Encoding, …)  Tuple Implementation
  • 58.
    Contributions to Flink FLINK-2411 Add basic graph summarization algorithm  FLINK-2590 DataSetUtils.zipWithUniqueID creates duplicate Ids  FLINK-2905 Add intersect method to Graph class  FLINK-2910 Combine tests for binary graph operators  FLINK-2941 Implement a neo4j - Flink/Gelly connector  FLINK-2981 Update README for building docs  FLINK-3064 Missing size check in GroupReduceOperatorBase leads to NPE  FLINK-3118 Check if MessageFunction implements ResultTypeQueryable  FLINK-3122 Generalize value type in LabelPropagation  FLINK-3272 Generalize vertex value type in ConnectedComponents  Flink Forward (October 2015)  Meetup Big Data Usergroup Saxony (December 2015)
  • 59.
    Contributions welcome!  Code Operator implementations  Performance Tuning  Extend HBase Storage  People  Bachelor / Master Thesis  Open PhD positions in Leipzig, Germany  Data! and Use Cases  We are researchers, we assume ...
  • 60.