Unifying Batch and Stream Data Processing with Apache Calcite and Apache Beam Khai Tran Big Data @LinkedIn 1
1 2 3 4 Agenda Computation convergence problems LinkedIn metrics platform From offline to nearline Deep dive 2
Computation convergence problems 3
Online, nearline, and offline computation Messaging Systems Near Real Time Processing (Streaming Engines) Online Processing (OLTP Engines) Offline Processing (Batch Engines) Application servers Tracking events DB changes OLTP databases HDFS 4 DB dumps
Convergence problems Online - offline ● Execute OLTP query logics in batch engines Example ● Online query: compute the public profile of a LinkedIn member from the Profile table ● Batch computation: Execute the same logic of computing public profile on all LinkedIn members Online - nearline ● Execute OLTP query logics in streaming engines Example ● Online query: compute the public profile of a LinkedIn member from the Profile table ● Streaming computation: Incrementally compute public profiles on database changes captured from the Profile table Offline - nearline ● Execute the logics of batch scripts in streaming engines Example ● Batch scripts: scripts to compute metrics from raw tracking events ● Streaming computation: Deliver the metrics with same transformation logic as batch scripts in low latency. 5
Metrics platform at LinkedIn 6
LinkedIn Unified Metrics Platform (UMP) Site-facing Apps Experimentation Reporting Raw Tracking Data Unified Metrics Platform A platform for engineers and data scientists to define and onboard their metrics 7
Example - Metrics in reporting Number of RPC calls to HDFS namenode by command types 8
The onboarding process # code LOAD … # data # transformation # code STORE … # config Metrics: A = SUM(A’) B = Unique(id) Dimensions C, D Downstream apps Raptor User Code Platform Generated Code To App DefineDeclare Onboard Data MetadataUser To App UMP 9
Moving from offline to nearline 10
UMP offline computation flows Latency at least 2-3 hours ...... Metric union User code User code Cubing, Rollup Dimension decoration HDFS tables, Dali views Pinot, Presto Azkaban execution Espresso, Oracle, MySQL Espresso: LinkedIn distributed document store Goblin: LinkedIn universal data ingestion framework Dali view: LinkedIn abstraction layer on top of HDFS Azkaban: LinkedIn batch workflow job scheduler Pinot: LinkedIn real-time OLAP engine 11
What we want for nearline flows ...... Metric union User code User code Dimension decoration Pinot Samza jobs 12 Samza: LinkedIn streaming engine
Latency is not the only requirement • Low latency (~ minutes) • Easy to onboard • Easy to maintain 13
Putting things together Samza jobs Batch jobs UMP nearline platform UMP offline platform Raptor Lambda architecture with a single codebase code configMetrics definition HDFS Pinot 14
Deep dive on offline-nearline conversion 15
10,000 feet view ... Metric union User code User code Dimension decoration Calcite relational algebra as an IR convert generateoptimize Beam physical plan Pig to Calcite Calcite to Beam Streaming config Beam Java API code 16 Check out this blog post for details: https://engineering.linkedin.com/blog/2019/01/bridging-offline-and-nearline-computations-with-apache-calcite
Pig to Calcite # code LOAD … LOAD ... COGROUP ... STORE … GruntParser CO- GROUP LOAD LOAD PigRelConverter FULL OUTER JOIN AGGRE- GATE AGGRE- GATE TABLE SCAN TABLE SCAN PRO- JECT User scripts Pig Logical Plan Calcite logical plans (relational algebra) Code will be available in Calcite 21 17
Calcite to Beam Planner/optimizer • Calcite logical plan: What to do. • Beam physical plan: How to do. • Calcite Beam planner: optimized Calcite logical plans into Beam physical plans (using Calcite Volcano optimizer) Code generator • Generate Beam Java API code from Beam physical plan and streaming config Mappings: • Beam physical node to Beam APIs. • Relational expressions to Java implementation code 18
Example - Pig script 19
Example - Calcite logical plan 20
Example - Calcite logical plan Inner Join Filter Filter Project Project Table Scan Table Scan Project Aggregate Project 21
Example - Calcite Beam Planner Stream Stream SelfJoin Beam Project Beam Project Beam Filter Beam Filter Input Stream Inner Join Filter Filter Project Project Table Scan Table Scan Calcite Beam planner Calcite logical plan Beam physical plan Project Aggregate Project Beam Project Beam Aggregate Beam Project 22
Example - Beam autogen code for LOAD Original Pig script Beam API code Details: ● Pig script: https://gist.github.com/khaitranq/1d06c27832f15fa52a4a7e2fa7bec340 ● Beam code: https://gist.github.com/khaitranq/785dbb8495cd382788f3ca8200231d84 23
Example - Beam autogen code for FILTER Original Pig script Beam API code 24
Example - Beam autogen code for FOREACH Original Pig script Beam API code 25
Example - Beam autogen code for JOIN Original Pig script Beam API code 26
Example - Beam autogen code for JOIN Original Pig script Beam API code 27
Example - Beam autogen code for GROUP BY Original Pig script Beam API code 28
Example - Beam autogen code for GROUP BY Original Pig script Beam API code Initialize Aggregate Return 29
Example - Beam autogen code for STORE Original Pig script Beam API code 30
Example - Beam autogen code for Pig UDFs Original Pig script Beam API code Declare Init Use 31
Convergences at LinkedIn (1) Offline computation Intermediate representation Online computation Nearline computation 32
Convergences at LinkedIn (2) Implemented • Pig - Calcite - Beam (on Samza) • Hive - Calcite - Presto • Hive - Calcite - Spark • GraphQL - Calcite - Spark • Spark - Calcite - Beam (on Samza) Considering • Hive - Calcite - Pig • Pig - Calcite - Spark • GraphQL - Calcite - Beam (on Samza) 33
AORA principle: Author Once, Run Anywhere 34
Thank you 35

Beam summit 2019 - Unifying Batch and Stream Data Processing with Apache Calcite and Apache Beam

  • 1.
    Unifying Batch andStream Data Processing with Apache Calcite and Apache Beam Khai Tran Big Data @LinkedIn 1
  • 2.
  • 3.
  • 4.
    Online, nearline, andoffline computation Messaging Systems Near Real Time Processing (Streaming Engines) Online Processing (OLTP Engines) Offline Processing (Batch Engines) Application servers Tracking events DB changes OLTP databases HDFS 4 DB dumps
  • 5.
    Convergence problems Online -offline ● Execute OLTP query logics in batch engines Example ● Online query: compute the public profile of a LinkedIn member from the Profile table ● Batch computation: Execute the same logic of computing public profile on all LinkedIn members Online - nearline ● Execute OLTP query logics in streaming engines Example ● Online query: compute the public profile of a LinkedIn member from the Profile table ● Streaming computation: Incrementally compute public profiles on database changes captured from the Profile table Offline - nearline ● Execute the logics of batch scripts in streaming engines Example ● Batch scripts: scripts to compute metrics from raw tracking events ● Streaming computation: Deliver the metrics with same transformation logic as batch scripts in low latency. 5
  • 6.
  • 7.
    LinkedIn Unified MetricsPlatform (UMP) Site-facing Apps Experimentation Reporting Raw Tracking Data Unified Metrics Platform A platform for engineers and data scientists to define and onboard their metrics 7
  • 8.
    Example - Metricsin reporting Number of RPC calls to HDFS namenode by command types 8
  • 9.
    The onboarding process #code LOAD … # data # transformation # code STORE … # config Metrics: A = SUM(A’) B = Unique(id) Dimensions C, D Downstream apps Raptor User Code Platform Generated Code To App DefineDeclare Onboard Data MetadataUser To App UMP 9
  • 10.
    Moving from offlineto nearline 10
  • 11.
    UMP offline computationflows Latency at least 2-3 hours ...... Metric union User code User code Cubing, Rollup Dimension decoration HDFS tables, Dali views Pinot, Presto Azkaban execution Espresso, Oracle, MySQL Espresso: LinkedIn distributed document store Goblin: LinkedIn universal data ingestion framework Dali view: LinkedIn abstraction layer on top of HDFS Azkaban: LinkedIn batch workflow job scheduler Pinot: LinkedIn real-time OLAP engine 11
  • 12.
    What we wantfor nearline flows ...... Metric union User code User code Dimension decoration Pinot Samza jobs 12 Samza: LinkedIn streaming engine
  • 13.
    Latency is notthe only requirement • Low latency (~ minutes) • Easy to onboard • Easy to maintain 13
  • 14.
    Putting things together Samzajobs Batch jobs UMP nearline platform UMP offline platform Raptor Lambda architecture with a single codebase code configMetrics definition HDFS Pinot 14
  • 15.
    Deep dive onoffline-nearline conversion 15
  • 16.
    10,000 feet view ... Metricunion User code User code Dimension decoration Calcite relational algebra as an IR convert generateoptimize Beam physical plan Pig to Calcite Calcite to Beam Streaming config Beam Java API code 16 Check out this blog post for details: https://engineering.linkedin.com/blog/2019/01/bridging-offline-and-nearline-computations-with-apache-calcite
  • 17.
    Pig to Calcite #code LOAD … LOAD ... COGROUP ... STORE … GruntParser CO- GROUP LOAD LOAD PigRelConverter FULL OUTER JOIN AGGRE- GATE AGGRE- GATE TABLE SCAN TABLE SCAN PRO- JECT User scripts Pig Logical Plan Calcite logical plans (relational algebra) Code will be available in Calcite 21 17
  • 18.
    Calcite to Beam Planner/optimizer •Calcite logical plan: What to do. • Beam physical plan: How to do. • Calcite Beam planner: optimized Calcite logical plans into Beam physical plans (using Calcite Volcano optimizer) Code generator • Generate Beam Java API code from Beam physical plan and streaming config Mappings: • Beam physical node to Beam APIs. • Relational expressions to Java implementation code 18
  • 19.
    Example - Pigscript 19
  • 20.
    Example - Calcitelogical plan 20
  • 21.
    Example - Calcitelogical plan Inner Join Filter Filter Project Project Table Scan Table Scan Project Aggregate Project 21
  • 22.
    Example - CalciteBeam Planner Stream Stream SelfJoin Beam Project Beam Project Beam Filter Beam Filter Input Stream Inner Join Filter Filter Project Project Table Scan Table Scan Calcite Beam planner Calcite logical plan Beam physical plan Project Aggregate Project Beam Project Beam Aggregate Beam Project 22
  • 23.
    Example - Beamautogen code for LOAD Original Pig script Beam API code Details: ● Pig script: https://gist.github.com/khaitranq/1d06c27832f15fa52a4a7e2fa7bec340 ● Beam code: https://gist.github.com/khaitranq/785dbb8495cd382788f3ca8200231d84 23
  • 24.
    Example - Beamautogen code for FILTER Original Pig script Beam API code 24
  • 25.
    Example - Beamautogen code for FOREACH Original Pig script Beam API code 25
  • 26.
    Example - Beamautogen code for JOIN Original Pig script Beam API code 26
  • 27.
    Example - Beamautogen code for JOIN Original Pig script Beam API code 27
  • 28.
    Example - Beamautogen code for GROUP BY Original Pig script Beam API code 28
  • 29.
    Example - Beamautogen code for GROUP BY Original Pig script Beam API code Initialize Aggregate Return 29
  • 30.
    Example - Beamautogen code for STORE Original Pig script Beam API code 30
  • 31.
    Example - Beamautogen code for Pig UDFs Original Pig script Beam API code Declare Init Use 31
  • 32.
    Convergences at LinkedIn(1) Offline computation Intermediate representation Online computation Nearline computation 32
  • 33.
    Convergences at LinkedIn(2) Implemented • Pig - Calcite - Beam (on Samza) • Hive - Calcite - Presto • Hive - Calcite - Spark • GraphQL - Calcite - Spark • Spark - Calcite - Beam (on Samza) Considering • Hive - Calcite - Pig • Pig - Calcite - Spark • GraphQL - Calcite - Beam (on Samza) 33
  • 34.
  • 35.