WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics
Hendrik Frentrup, systemati.co Maps and Meaning Graph-based Entity Resolution #UnifiedDataAnalytics #SparkAISummit
Maps and Meaning Graph based Entity Resolution 3#UnifiedDataAnalytics #SparkAISummit Source: Jordi Guzmán (creative commons) Data is the new oil
Building Value Streams Source: Malcolm Manners (creative commons) Data Extraction Data Refining Data Warehousing
Data Pipeline Source 1 Source 3 Source N … Visualisation Presentation Dashboards Machine Learning Statistical Analysis Inference Predictions Data Extraction Transformation Integration Data Modelling
Upstream integrations Source 1 Source 3 Source N … First Order Transformation: • Deduplication -> df.dictinct() • Transformations -> df.withColumn(col, expr(col)) • Mapping -> df.withColumnRenamed(old, new) Nth Order Transformation: • Merge N Sources -> Entity Resolution Second Order Transformation: • Denormalisation -> lhs.join(rhs, key)
Outline • Motivation • Entity Resolution Example • Graph-based Entity Resolution Algorithm • Data Pipeline Architecture • Implementation – In GraphFrames (Python API) – In GraphX (Scala API) • The Role of Machine Learning in Entity Resolution
Example: Find Duplicates • Merge records in your Address Book ID First Name Last Name Email Mobile Number Phone Number 1 Harry Mulisch harry@mulisch.nl +31 101 1001 2 HKV Mulisch Harry.Mulish@gmail.com +31 666 7777 3 author@heaven.nl +31 101 1001 4 Harry Mulisch +31 123 4567 +31 666 7777 ID First Name Last Name Email Mobile Number Phone Number 1 Harry/HKV Mulisch harry@mulisch.nl, Harry.Mulish@gmail.com, author@heaven.nl +31 101 1001, +31 123 4567 +31 666 7777
…such as Google Contacts
ID First Name Last Name Email Mobile Number Phone Number Source 1 Harry Mulisch harry@mulisch.nl +31 101 1001 Phone 2 S Nadolny +49 899 9898 Phone 3 Harry Mulisch +31 123 4567 +31 666 7777 Phone 4 author@heaven.nl +31 101 1001 Gmail 5 Sten Nadolny sten@slow.de +49 899 9898 Gmail 6 Max Frisch max@andorra.ch Outlook 7 HKV Harry.Mulish@gmail.com +31 666 7777 Outlook Example: Resolving records
Graph Algorithm Walkthrough
2 1 Harry Mulisch harry@mulisch.nl +31 101 1001 S Nadolny +49 899 9898 3 4 Harry Mulisch +31 123 4567 +31 666 7777 Sten Nadolny sten@slow.de +49 899 9898 5 author@heaven.nl +31 101 1001 6 7 Max Frisch max@andorra.ch HKV Harry.Mulish@gmail.com +31 666 7777 • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
2 1 Copyright 2019 © systemati.co Harry Mulisch harry@mulisch.nl +31 101 1001 S Nadolny +49 899 9898 3 4 Harry Mulisch +31 123 4567 +31 666 7777 Sten Nadolny sten@slow.de +49 899 9898 5 author@heaven.nl +31 101 1001 6 7 Max Frisch max@andorra.ch HKV Harry.Mulish@gmail.com +31 666 7777 • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
2 1 Harry Mulisch harry@mulisch.nl +31 101 1001 S Nadolny +49 899 9898 3 4 Harry Mulisch +31 123 4567 +31 666 7777 Sten Nadolny sten@slow.de +49 899 9898 5 author@heaven.nl +31 101 1001 6 7 Max Frisch max@andorra.ch HKV Harry.Mulish@gmail.com +31 666 7777 • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
2 1 Harry Mulisch/HKV harry@mulisch.nl, author@heaven.nl, Harry.Mulish@gmail.com +31 123 4567 +31 666 7777 +31 101 1001 3 4 Sten/S Nadolny sten@slow.de +49 899 9898 5 6 7 Max Frisch max@andorra.ch • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
Entity Resolution Pipeline Architecture Source 1 Source 3 Source N … Extract Data Hub/Lake/Warehouse Clean Records Source Copy …… Consolidated Nodes Appended records Resolved records Resolve Entities Merge Entities
Technical Implementation
Graphs in Apache Spark GraphX GraphFrames Python API 👍 Scala API 👍 👍
With GraphFrames
Create nodes • Add an id column to the dataframe of records +---+------------+-----------+-----------+---------+----------+--------------+ | id| ssn| email| phone| address| DoB| Name| +---+------------+-----------+-----------+---------+----------+--------------+ | 0| 714-12-4462| len@sma.ll| 6088881234| ...| 15/4/1937| Lennie Small | | 1| 481-33-1024| geo@mil.tn| 6077654980| ...| 15/4/1937| Goerge Milton| Identifiers Attributes from pyspark.sql.functions import monotonically_increasing_id nodes = records.withColumn("id", monotonically_increasing_id())
Edge creation match_cols = [”ssn", ”email"] mirrorColNames = [f"_{col}" for col in records.columns] mirror = records.toDF(*mirrorColNames) mcond = [col(c) == col(f'_{c}') for c in match_cols] cond = [(col("id") != col("_id")) & reduce(lambda x,y: x | y, mcond)] edges = records.join(mirror, cond) cond: [Column<b'((NOT (id = _id)) AND (((ssn = _ssn) OR (email = _email))
Resolve entities and consolidation • Connected Components graph = gf.GraphFrame(nodes, edges) sc.setCheckpointDir("/tmp/checkpoints") cc = graph.connectedComponents() entities = cc.groupby(”components”).collect_set(”name”) • Consolidate Components
With GraphX
Strongly Typed Scala • Defining the schema of our data 24 val record_schema = StructType( Seq( StructField(name = ”id", dataType = LongType, nullable = false), StructField(name = ”name", StringType, true), StructField(name = ”email", StringType, true), StructField(name = ”ssn", LongType, true), StructField(name = ”attr", StringType, true) ))
Node creation • Add an ID column to records • Turn DataFrame into RDD val nodesRDD = records.map(r => (r.getAs[VertexId]("id"), 1)).rdd
Edge creation val mirrorColNames = for (col <- records.columns) yield "_"+col.toString val mirror = records.toDF(mirrorColNames: _*) def conditions(matchCols: Seq[String]): Column = { col("id")=!=col("_id") && matchCols.map(c => col(c)===col("_"+c)).reduce(_ || _) } val edges = records.join(mirror, conditions(Seq(”ssn", ”email”))) val edgesRDD = edges .select("id","_id") .map(r => Edge(r.getAs[VertexId](0),r.getAs[VertexId](1),null)) .rdd
Resolve entities and consolidation • Connected Components val graph = Graph(nodesRDD, edgesRDD) val cc = graph.connectedComponents() val entities = cc.vertices.toDF() val resolved_records = records.join(entities, $"id"===$"_1") val res_records = resolved_records .withColumnRenamed("_2", ”e_id") .groupBy(”e_id") .agg(collect_set($”name")) • Consolidate Components
Resolve operation Columns to match: [“ssn”,”email”] Input: DataFrame Output: DataFrame
Evaluation • Number of source records per entity • Business logic: – Conflicts (multiple SSNs) • Distribution of matches vs. 0 50 100 150 200 250 300 in one source in two sources in three sources in four sources Entities by Nr of Source
Evolving Entity Resolution
Machine learning in Entity Resolution • Pairwise comparison – String matching / distance measures – Incorporate temporal data into edge creation { 1, 0 } or P(match)=0.8762 H Muiisch Harry.Mulish@gmail.com 1 Harry Mulisch harry@mulisch.nl • Edge creation is the most computationally heavy step
Machine learning in Entity Resolution • Structuring connected Data • Partitioning of the graph based on clustering of records • Using weighted edges and learning a classifier to evaluate links between records
Feeding a knowledge graph Human Interface: • Analytics • Forensics • Discovery • Iterative Improvements: • Data Quality • Contextual Information • Use case driven
Get started yourself • GitHub Project: Resolver & Notebook: – https://github.com/hendrikfrentrup/maps-meaning • Docker container with pySpark & GraphFrames: – https://hub.docker.com/r/hendrikfrentrup/pyspark- graphframes 34
Key Takeaways • Data pipeline coalesces into a single record table • Connected Components at the core of resolving • Edge creation is the expensive operation • Batch operation over a single corpus 35
Thanks! Any questions? Comments? Observations? 36
DON’T FORGET TO RATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT

Maps and Meaning: Graph-based Entity Resolution in Apache Spark & GraphX

  • 1.
    WIFI SSID:Spark+AISummit |Password: UnifiedDataAnalytics
  • 2.
    Hendrik Frentrup, systemati.co Mapsand Meaning Graph-based Entity Resolution #UnifiedDataAnalytics #SparkAISummit
  • 3.
    Maps and Meaning Graphbased Entity Resolution 3#UnifiedDataAnalytics #SparkAISummit Source: Jordi Guzmán (creative commons) Data is the new oil
  • 4.
    Building Value Streams Source:Malcolm Manners (creative commons) Data Extraction Data Refining Data Warehousing
  • 5.
    Data Pipeline Source 1 Source3 Source N … Visualisation Presentation Dashboards Machine Learning Statistical Analysis Inference Predictions Data Extraction Transformation Integration Data Modelling
  • 6.
    Upstream integrations Source 1 Source3 Source N … First Order Transformation: • Deduplication -> df.dictinct() • Transformations -> df.withColumn(col, expr(col)) • Mapping -> df.withColumnRenamed(old, new) Nth Order Transformation: • Merge N Sources -> Entity Resolution Second Order Transformation: • Denormalisation -> lhs.join(rhs, key)
  • 7.
    Outline • Motivation • EntityResolution Example • Graph-based Entity Resolution Algorithm • Data Pipeline Architecture • Implementation – In GraphFrames (Python API) – In GraphX (Scala API) • The Role of Machine Learning in Entity Resolution
  • 8.
    Example: Find Duplicates •Merge records in your Address Book ID First Name Last Name Email Mobile Number Phone Number 1 Harry Mulisch harry@mulisch.nl +31 101 1001 2 HKV Mulisch Harry.Mulish@gmail.com +31 666 7777 3 author@heaven.nl +31 101 1001 4 Harry Mulisch +31 123 4567 +31 666 7777 ID First Name Last Name Email Mobile Number Phone Number 1 Harry/HKV Mulisch harry@mulisch.nl, Harry.Mulish@gmail.com, author@heaven.nl +31 101 1001, +31 123 4567 +31 666 7777
  • 9.
  • 10.
    ID First NameLast Name Email Mobile Number Phone Number Source 1 Harry Mulisch harry@mulisch.nl +31 101 1001 Phone 2 S Nadolny +49 899 9898 Phone 3 Harry Mulisch +31 123 4567 +31 666 7777 Phone 4 author@heaven.nl +31 101 1001 Gmail 5 Sten Nadolny sten@slow.de +49 899 9898 Gmail 6 Max Frisch max@andorra.ch Outlook 7 HKV Harry.Mulish@gmail.com +31 666 7777 Outlook Example: Resolving records
  • 11.
  • 12.
    2 1 Harry Mulisch harry@mulisch.nl +31 1011001 S Nadolny +49 899 9898 3 4 Harry Mulisch +31 123 4567 +31 666 7777 Sten Nadolny sten@slow.de +49 899 9898 5 author@heaven.nl +31 101 1001 6 7 Max Frisch max@andorra.ch HKV Harry.Mulish@gmail.com +31 666 7777 • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
  • 13.
    2 1 Copyright 2019 ©systemati.co Harry Mulisch harry@mulisch.nl +31 101 1001 S Nadolny +49 899 9898 3 4 Harry Mulisch +31 123 4567 +31 666 7777 Sten Nadolny sten@slow.de +49 899 9898 5 author@heaven.nl +31 101 1001 6 7 Max Frisch max@andorra.ch HKV Harry.Mulish@gmail.com +31 666 7777 • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
  • 14.
    2 1 Harry Mulisch harry@mulisch.nl +31 1011001 S Nadolny +49 899 9898 3 4 Harry Mulisch +31 123 4567 +31 666 7777 Sten Nadolny sten@slow.de +49 899 9898 5 author@heaven.nl +31 101 1001 6 7 Max Frisch max@andorra.ch HKV Harry.Mulish@gmail.com +31 666 7777 • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
  • 15.
    2 1 Harry Mulisch/HKV harry@mulisch.nl, author@heaven.nl, Harry.Mulish@gmail.com +31 1234567 +31 666 7777 +31 101 1001 3 4 Sten/S Nadolny sten@slow.de +49 899 9898 5 6 7 Max Frisch max@andorra.ch • Each record is a node • Create edges based on similarities • Collect connected nodes • Consolidate information in records
  • 16.
    Entity Resolution Pipeline Architecture Source1 Source 3 Source N … Extract Data Hub/Lake/Warehouse Clean Records Source Copy …… Consolidated Nodes Appended records Resolved records Resolve Entities Merge Entities
  • 17.
  • 18.
    Graphs in ApacheSpark GraphX GraphFrames Python API 👍 Scala API 👍 👍
  • 19.
  • 20.
    Create nodes • Addan id column to the dataframe of records +---+------------+-----------+-----------+---------+----------+--------------+ | id| ssn| email| phone| address| DoB| Name| +---+------------+-----------+-----------+---------+----------+--------------+ | 0| 714-12-4462| len@sma.ll| 6088881234| ...| 15/4/1937| Lennie Small | | 1| 481-33-1024| geo@mil.tn| 6077654980| ...| 15/4/1937| Goerge Milton| Identifiers Attributes from pyspark.sql.functions import monotonically_increasing_id nodes = records.withColumn("id", monotonically_increasing_id())
  • 21.
    Edge creation match_cols =[”ssn", ”email"] mirrorColNames = [f"_{col}" for col in records.columns] mirror = records.toDF(*mirrorColNames) mcond = [col(c) == col(f'_{c}') for c in match_cols] cond = [(col("id") != col("_id")) & reduce(lambda x,y: x | y, mcond)] edges = records.join(mirror, cond) cond: [Column<b'((NOT (id = _id)) AND (((ssn = _ssn) OR (email = _email))
  • 22.
    Resolve entities andconsolidation • Connected Components graph = gf.GraphFrame(nodes, edges) sc.setCheckpointDir("/tmp/checkpoints") cc = graph.connectedComponents() entities = cc.groupby(”components”).collect_set(”name”) • Consolidate Components
  • 23.
  • 24.
    Strongly Typed Scala •Defining the schema of our data 24 val record_schema = StructType( Seq( StructField(name = ”id", dataType = LongType, nullable = false), StructField(name = ”name", StringType, true), StructField(name = ”email", StringType, true), StructField(name = ”ssn", LongType, true), StructField(name = ”attr", StringType, true) ))
  • 25.
    Node creation • Addan ID column to records • Turn DataFrame into RDD val nodesRDD = records.map(r => (r.getAs[VertexId]("id"), 1)).rdd
  • 26.
    Edge creation val mirrorColNames= for (col <- records.columns) yield "_"+col.toString val mirror = records.toDF(mirrorColNames: _*) def conditions(matchCols: Seq[String]): Column = { col("id")=!=col("_id") && matchCols.map(c => col(c)===col("_"+c)).reduce(_ || _) } val edges = records.join(mirror, conditions(Seq(”ssn", ”email”))) val edgesRDD = edges .select("id","_id") .map(r => Edge(r.getAs[VertexId](0),r.getAs[VertexId](1),null)) .rdd
  • 27.
    Resolve entities andconsolidation • Connected Components val graph = Graph(nodesRDD, edgesRDD) val cc = graph.connectedComponents() val entities = cc.vertices.toDF() val resolved_records = records.join(entities, $"id"===$"_1") val res_records = resolved_records .withColumnRenamed("_2", ”e_id") .groupBy(”e_id") .agg(collect_set($”name")) • Consolidate Components
  • 28.
    Resolve operation Columns tomatch: [“ssn”,”email”] Input: DataFrame Output: DataFrame
  • 29.
    Evaluation • Number ofsource records per entity • Business logic: – Conflicts (multiple SSNs) • Distribution of matches vs. 0 50 100 150 200 250 300 in one source in two sources in three sources in four sources Entities by Nr of Source
  • 30.
  • 31.
    Machine learning inEntity Resolution • Pairwise comparison – String matching / distance measures – Incorporate temporal data into edge creation { 1, 0 } or P(match)=0.8762 H Muiisch Harry.Mulish@gmail.com 1 Harry Mulisch harry@mulisch.nl • Edge creation is the most computationally heavy step
  • 32.
    Machine learning inEntity Resolution • Structuring connected Data • Partitioning of the graph based on clustering of records • Using weighted edges and learning a classifier to evaluate links between records
  • 33.
    Feeding a knowledgegraph Human Interface: • Analytics • Forensics • Discovery • Iterative Improvements: • Data Quality • Contextual Information • Use case driven
  • 34.
    Get started yourself •GitHub Project: Resolver & Notebook: – https://github.com/hendrikfrentrup/maps-meaning • Docker container with pySpark & GraphFrames: – https://hub.docker.com/r/hendrikfrentrup/pyspark- graphframes 34
  • 35.
    Key Takeaways • Datapipeline coalesces into a single record table • Connected Components at the core of resolving • Edge creation is the expensive operation • Batch operation over a single corpus 35
  • 36.
  • 37.
    DON’T FORGET TORATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT