1 Timo Walther Apache Flink PMC @twalthr With slides from Fabian Hueske Flink Meetup @ Amsterdam, March 2nd, 2017 Table & SQL API unified APIs for batch and stream processing
2 Original creators of Apache Flink® Providers of the dA Platform, a supported Flink distribution
Motivation 3
DataStream API is not for Everyone 4 § Writing DataStream programs is not easy • Stream processing technology spreads rapidly § Requires Knowledge & Skill • Stream processing concepts (time, state, windows, ...) • Programming experience (Java / Scala) § Program logic goes into UDFs • great for expressiveness • bad for optimization - need for manual tuning
Why not a Relational API? 5 § Relational APIs are declarative • User says what is needed • System decides how to compute it § Users do not specify implementation § Queries are efficiently executed § “Everybody” knows SQL!
Goals § Flink is a platform for distributed stream and batch data processing § Relational APIs as a unifying layer • Queries on batch tables terminate and produce a finite result • Queries on streaming tables run continuously and produce result stream § Same syntax & semantics for both queries 6
Table API & SQL 7
Table API & SQL § Flink features two relational APIs • Table API: LINQ-style API for Java & Scala (since Flink 0.9.0) • SQL: Standard SQL (since Flink 1.1.0) § Equivalent feature set (at the moment) • Table API and SQL can be mixed § Both are tightly integrated with Flink’s core APIs • DataStream • DataSet 8
Table API Example 9 val sensorData: DataStream[(String, Long, Double)] = ??? // convert DataSet into Table val sensorTable: Table = sensorData .toTable(tableEnv, 'location, ’time, 'tempF) // define query on Table val avgTempCTable: Table = sensorTable .window(Tumble over 1.day on 'rowtime as 'w) .groupBy('location, ’w) .select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC) .where('location like "room%")
SQL Example 10 val sensorData: DataStream[(String, Long, Double)] = ??? // register DataStream tableEnv.registerDataStream( "sensorData", sensorData, 'location, ’time, 'tempF) // query registered Table val avgTempCTable: Table = tableEnv .sql(""" SELECT FLOOR(rowtime() TO DAY) AS day, location, AVG((tempF - 32) * 0.556) AS avgTempC FROM sensorData WHERE location LIKE 'room%' GROUP BY location, FLOOR(rowtime() TO DAY) """)
Architecture 2 APIs [SQL, Table API] * 2 backends [DataStream, DataSet] = 4 different translation paths? 11
Architecture 12
Architecture § Table API and SQL queries are translated into common logical plan representation. § Logical plans are translated and optimized depending on execution backend. § Plans are transformed into DataSet or DataStream programs. 13
Translation to Logical Plan 14 sensorTable .window(Tumble over 1.day on 'rowtime as 'w) .groupBy('location, ’w) .select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC) .where('location like "room%")
Translation to Optimized Plan 15
Translation to Flink Program 16
Current State (in master) § Batch SQL & Table API support • Selection, Projection, Sort, Inner & Outer Joins, Set operations • Windows for Slide, Tumble, Session § Streaming Table API support • Selection, Projection, Union • Windows for Slide, Tumble, Session § Streaming SQL • Selection, Projection, Union, Tumble, but … 17
Use Cases for Streaming SQL § Continuous ETL & Data Import § Live Dashboards & Reports § Ad-hoc Analytics & Exploration 18
Outlook: Dynamic Tables 19
Dynamic Tables § Dynamic tables change over time § Dynamic tables are treated like static batch tables • Dynamic tables are queried with standard SQL • A query returns another dynamic table § Stream ←→ Dynamic Table conversions without information loss • “Stream / Table Duality” 20
Stream to Dynamic Tables § Append: § Replace by key: 21
Querying Dynamic Tables § Dynamic tables change over time • A[t]: Table A at time t § Dynamic tables are queried with regular SQL • Result of a query changes as input table changes • q(A[t]): Evaluate query q on table A at time t § Query result is continuously updated as t progresses • Similar to maintaining a materialized view • t is current event time 22
Querying Dynamic Tables 23
Querying Dynamic Tables § Can we run any query on Dynamic Tables? No! § State may not grow infinitely as more data arrives • Set clean-up timeout or key constraints. § Input may only trigger partial re-computation § Queries with possibly unbounded state or computation are rejected 24
Dynamic Tables to Stream § Update: 25
Dynamic Tables to Stream § Add/Retract: 26
Result computation & refinement 27
Contributions welcome! § Huge interest and many contributors • Adding more window operators • Introducing dynamic tables § And there is a lot more to do • New operators and features for streaming and batch • Performance improvements • Tooling and integration § Try it out, give feedback, and start contributing! 28
29 One day of hands-on Flink training One day of conference Tickets are on sale Please visit our website: http://sf.flink-forward.org Follow us on Twitter: @FlinkForward
We are hiring! data-artisans.com/careers
3 Thank you! @twalthr @ApacheFlink @dataArtisans

Apache Flink's Table & SQL API - unified APIs for batch and stream processing

  • 1.
    1 Timo Walther Apache FlinkPMC @twalthr With slides from Fabian Hueske Flink Meetup @ Amsterdam, March 2nd, 2017 Table & SQL API unified APIs for batch and stream processing
  • 2.
    2 Original creators of ApacheFlink® Providers of the dA Platform, a supported Flink distribution
  • 3.
  • 4.
    DataStream API isnot for Everyone 4 § Writing DataStream programs is not easy • Stream processing technology spreads rapidly § Requires Knowledge & Skill • Stream processing concepts (time, state, windows, ...) • Programming experience (Java / Scala) § Program logic goes into UDFs • great for expressiveness • bad for optimization - need for manual tuning
  • 5.
    Why not aRelational API? 5 § Relational APIs are declarative • User says what is needed • System decides how to compute it § Users do not specify implementation § Queries are efficiently executed § “Everybody” knows SQL!
  • 6.
    Goals § Flink isa platform for distributed stream and batch data processing § Relational APIs as a unifying layer • Queries on batch tables terminate and produce a finite result • Queries on streaming tables run continuously and produce result stream § Same syntax & semantics for both queries 6
  • 7.
  • 8.
    Table API &SQL § Flink features two relational APIs • Table API: LINQ-style API for Java & Scala (since Flink 0.9.0) • SQL: Standard SQL (since Flink 1.1.0) § Equivalent feature set (at the moment) • Table API and SQL can be mixed § Both are tightly integrated with Flink’s core APIs • DataStream • DataSet 8
  • 9.
    Table API Example 9 valsensorData: DataStream[(String, Long, Double)] = ??? // convert DataSet into Table val sensorTable: Table = sensorData .toTable(tableEnv, 'location, ’time, 'tempF) // define query on Table val avgTempCTable: Table = sensorTable .window(Tumble over 1.day on 'rowtime as 'w) .groupBy('location, ’w) .select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC) .where('location like "room%")
  • 10.
    SQL Example 10 val sensorData:DataStream[(String, Long, Double)] = ??? // register DataStream tableEnv.registerDataStream( "sensorData", sensorData, 'location, ’time, 'tempF) // query registered Table val avgTempCTable: Table = tableEnv .sql(""" SELECT FLOOR(rowtime() TO DAY) AS day, location, AVG((tempF - 32) * 0.556) AS avgTempC FROM sensorData WHERE location LIKE 'room%' GROUP BY location, FLOOR(rowtime() TO DAY) """)
  • 11.
    Architecture 2 APIs [SQL,Table API] * 2 backends [DataStream, DataSet] = 4 different translation paths? 11
  • 12.
  • 13.
    Architecture § Table APIand SQL queries are translated into common logical plan representation. § Logical plans are translated and optimized depending on execution backend. § Plans are transformed into DataSet or DataStream programs. 13
  • 14.
    Translation to LogicalPlan 14 sensorTable .window(Tumble over 1.day on 'rowtime as 'w) .groupBy('location, ’w) .select('w.start as 'day, 'location, (('tempF.avg - 32) * 0.556) as 'avgTempC) .where('location like "room%")
  • 15.
  • 16.
  • 17.
    Current State (inmaster) § Batch SQL & Table API support • Selection, Projection, Sort, Inner & Outer Joins, Set operations • Windows for Slide, Tumble, Session § Streaming Table API support • Selection, Projection, Union • Windows for Slide, Tumble, Session § Streaming SQL • Selection, Projection, Union, Tumble, but … 17
  • 18.
    Use Cases forStreaming SQL § Continuous ETL & Data Import § Live Dashboards & Reports § Ad-hoc Analytics & Exploration 18
  • 19.
  • 20.
    Dynamic Tables § Dynamictables change over time § Dynamic tables are treated like static batch tables • Dynamic tables are queried with standard SQL • A query returns another dynamic table § Stream ←→ Dynamic Table conversions without information loss • “Stream / Table Duality” 20
  • 21.
    Stream to DynamicTables § Append: § Replace by key: 21
  • 22.
    Querying Dynamic Tables §Dynamic tables change over time • A[t]: Table A at time t § Dynamic tables are queried with regular SQL • Result of a query changes as input table changes • q(A[t]): Evaluate query q on table A at time t § Query result is continuously updated as t progresses • Similar to maintaining a materialized view • t is current event time 22
  • 23.
  • 24.
    Querying Dynamic Tables §Can we run any query on Dynamic Tables? No! § State may not grow infinitely as more data arrives • Set clean-up timeout or key constraints. § Input may only trigger partial re-computation § Queries with possibly unbounded state or computation are rejected 24
  • 25.
    Dynamic Tables toStream § Update: 25
  • 26.
    Dynamic Tables toStream § Add/Retract: 26
  • 27.
    Result computation &refinement 27
  • 28.
    Contributions welcome! § Hugeinterest and many contributors • Adding more window operators • Introducing dynamic tables § And there is a lot more to do • New operators and features for streaming and batch • Performance improvements • Tooling and integration § Try it out, give feedback, and start contributing! 28
  • 29.
    29 One day ofhands-on Flink training One day of conference Tickets are on sale Please visit our website: http://sf.flink-forward.org Follow us on Twitter: @FlinkForward
  • 30.
  • 31.