A Smarter Pig: Building a SQL interface to Pig using Apache Calcite Eli Levine & Julian Hyde Apache: Big Data, Miami 2017/05/17
Julian Hyde @julianhyde Original developer of Calcite PMC member of Calcite, Drill, Eagle, Kylin ASF member About us Eli Levine @teleturn PMC member of Phoenix ASF member
Apache Calcite Apache top-level project since October, 2015 Query planning framework ➢ Relational algebra, rewrite rules ➢ Cost model & statistics ➢ Federation via adapters ➢ Extensible Packaging ➢ Library
Apache Pig Apache top-level project Platform for Analyzing Large Datasets ➢ Uses Pig Latin language ○ Relational operators (join, filter) ○ Functional operators (mapreduce) ➢ Runs as MapReduce (also Tez) ➢ ETL ➢ Extensible
Outline Batch compute on Force.com Platform (Eli Levine) Apache Calcite deep dive (Julian Hyde) Building Pig adapter for Calcite (Eli Levine) Q&A
Salesforce Platform Object-relational data model in the cloud Contains standard objects that users can customize or add their own SQL-like query language SOQL - Real-time - Batch compute Federated data store: Oracle, HBase, external User queries span data sources (federated joins) SELECT DEPT.NAME FROM EMPLOYEE WHERE FIRST_NAME = ‘Eli’
Salesforce Platform - Batch Compute Called Async SOQL - REST API - Users supply SOQL and info about where to deposit results SOQL -> Pig Latin script Pig loaders move data/computation to HDFS for federated query execution Own SOQL parsing, no Calcite
Query Planning in Async SOQL Current Next generation
Apache Calcite for Next-Gen Optimizer - Strong relational algebra foundation - Support for different physical engines - Pluggable cost model - Optimization rules - Federation-aware
Architecture Conventional database Calcite
Calcite design Design goals: Not-just-SQL front end Federation Extensibility Caching / hybrid storage Materialized views Design points: tables on disk in-memory materializations select x, sum(y) from t group by x
Planning queries MySQL Splunk join Key: productId group Key: productName Agg: count filter Condition: action = 'purchase' sort Key: c desc scan scan Table: products select p.productName, count(*) as c from splunk.splunk as s join mysql.products as p on s.productId = p.productId where s.action = 'purchase' group by p.productName order by c desc Table: splunk
Optimized query MySQL Splunk join Key: productId group Key: productName Agg: count filter Condition: action = 'purchase' sort Key: c desc scan scan Table: splunk Table: products select p.productName, count(*) as c from splunk.splunk as s join mysql.products as p on s.productId = p.productId where s.action = 'purchase' group by p.productName order by c desc
Calcite framework Cost, statistics RelOptCost RelOptCostFactory RelMetadataProvider • RelMdColumnUniquensss • RelMdDistinctRowCount • RelMdSelectivity SQL parser SqlNode SqlParser SqlValidator Transformation rules RelOptRule • FilterMergeRule • AggregateUnionTransposeRule • 100+ more Global transformations • Unification (materialized view) • Column trimming • De-correlation Relational algebra RelNode (operator) • TableScan • Filter • Project • Union • Aggregate • … RelDataType (type) RexNode (expression) RelTrait (physical property) • RelConvention (calling-convention) • RelCollation (sortedness) • RelDistribution (partitioning) RelBuilder JDBC driver Metadata Schema Table Function • TableFunction • TableMacro Lattice
Adapter Implement SchemaFactory interface Connect to a data source using parameters Extract schema - return a list of tables Push down processing to the data source: A set of planner rules Calling convention (optional) "schemas": [ { "name": "HR", "type": "custom", "factory": "org.apache.calcite.adapter.file.FileSchemaFactory", "operand": { "directory": "hr-csv" } } ] $ ls -l hr-csv -rw-r--r-- 1 jhyde staff 62 Mar 29 12:57 DEPTS.csv -rw-r--r-- 1 jhyde staff 262 Mar 29 12:57 EMPS.csv.gz $ ./sqlline -u jdbc:calcite:model=hr.json -n scott -p tiger sqlline> select count(*) as c from emp; 'C' '5' 1 row selected (0.135 seconds)
Calcite Pig Adapter EMPLOYEE = LOAD 'EMPLOYEE' ... ; EMPLOYEE = GROUP EMPLOYEE BY (DEPT_ID); EMPLOYEE = FOREACH EMPLOYEE GENERATE COUNT(EMPLOYEE.DEPT_ID) as DEPT_ID__COUNT_, group as DEPT_ID; EMPLOYEE = FILTER EMPLOYEE BY (DEPT_ID__COUNT_ > 10); SELECT DEPT_ID FROM EMPLOYEE GROUP BY DEPT_ID HAVING COUNT(DEPT_ID) > 10
Building the Pig Adapter 1. Implement Pig-specific RelNodes. e.g. PigFilter 2. RelNode factories 3. Write RelOptRules for converting abstract RelNodes to Pig RelNodes 4. Schema implementation 5. Unit tests run local Pig
Lessons Learned Calcite is very flexible (both good and bad) “Recipe list” would be useful Lots of examples if you delve into existing adapters - e.g. Druid and Cassandra Lots available out of the box Dynamic code generation using Janino -- cryptic errors RelBuilder was really useful (if you are building non-SQL engine)
Florida Calcite
Thank you! Eli Levine @teleturn Julian Hyde @julianhyde http://calcite.apache.org http://pig.apache.org

A Smarter Pig: Building a SQL interface to Pig using Apache Calcite

  • 1.
    A Smarter Pig:Building a SQL interface to Pig using Apache Calcite Eli Levine & Julian Hyde Apache: Big Data, Miami 2017/05/17
  • 2.
    Julian Hyde @julianhyde Originaldeveloper of Calcite PMC member of Calcite, Drill, Eagle, Kylin ASF member About us Eli Levine @teleturn PMC member of Phoenix ASF member
  • 3.
    Apache Calcite Apache top-levelproject since October, 2015 Query planning framework ➢ Relational algebra, rewrite rules ➢ Cost model & statistics ➢ Federation via adapters ➢ Extensible Packaging ➢ Library
  • 4.
    Apache Pig Apache top-levelproject Platform for Analyzing Large Datasets ➢ Uses Pig Latin language ○ Relational operators (join, filter) ○ Functional operators (mapreduce) ➢ Runs as MapReduce (also Tez) ➢ ETL ➢ Extensible
  • 5.
    Outline Batch compute onForce.com Platform (Eli Levine) Apache Calcite deep dive (Julian Hyde) Building Pig adapter for Calcite (Eli Levine) Q&A
  • 6.
    Salesforce Platform Object-relational datamodel in the cloud Contains standard objects that users can customize or add their own SQL-like query language SOQL - Real-time - Batch compute Federated data store: Oracle, HBase, external User queries span data sources (federated joins) SELECT DEPT.NAME FROM EMPLOYEE WHERE FIRST_NAME = ‘Eli’
  • 7.
    Salesforce Platform -Batch Compute Called Async SOQL - REST API - Users supply SOQL and info about where to deposit results SOQL -> Pig Latin script Pig loaders move data/computation to HDFS for federated query execution Own SOQL parsing, no Calcite
  • 8.
    Query Planning inAsync SOQL Current Next generation
  • 9.
    Apache Calcite forNext-Gen Optimizer - Strong relational algebra foundation - Support for different physical engines - Pluggable cost model - Optimization rules - Federation-aware
  • 10.
  • 11.
    Calcite design Design goals: Not-just-SQLfront end Federation Extensibility Caching / hybrid storage Materialized views Design points: tables on disk in-memory materializations select x, sum(y) from t group by x
  • 12.
    Planning queries MySQL Splunk join Key: productId group Key:productName Agg: count filter Condition: action = 'purchase' sort Key: c desc scan scan Table: products select p.productName, count(*) as c from splunk.splunk as s join mysql.products as p on s.productId = p.productId where s.action = 'purchase' group by p.productName order by c desc Table: splunk
  • 13.
    Optimized query MySQL Splunk join Key: productId group Key:productName Agg: count filter Condition: action = 'purchase' sort Key: c desc scan scan Table: splunk Table: products select p.productName, count(*) as c from splunk.splunk as s join mysql.products as p on s.productId = p.productId where s.action = 'purchase' group by p.productName order by c desc
  • 14.
    Calcite framework Cost, statistics RelOptCost RelOptCostFactory RelMetadataProvider •RelMdColumnUniquensss • RelMdDistinctRowCount • RelMdSelectivity SQL parser SqlNode SqlParser SqlValidator Transformation rules RelOptRule • FilterMergeRule • AggregateUnionTransposeRule • 100+ more Global transformations • Unification (materialized view) • Column trimming • De-correlation Relational algebra RelNode (operator) • TableScan • Filter • Project • Union • Aggregate • … RelDataType (type) RexNode (expression) RelTrait (physical property) • RelConvention (calling-convention) • RelCollation (sortedness) • RelDistribution (partitioning) RelBuilder JDBC driver Metadata Schema Table Function • TableFunction • TableMacro Lattice
  • 15.
    Adapter Implement SchemaFactory interface Connectto a data source using parameters Extract schema - return a list of tables Push down processing to the data source: A set of planner rules Calling convention (optional) "schemas": [ { "name": "HR", "type": "custom", "factory": "org.apache.calcite.adapter.file.FileSchemaFactory", "operand": { "directory": "hr-csv" } } ] $ ls -l hr-csv -rw-r--r-- 1 jhyde staff 62 Mar 29 12:57 DEPTS.csv -rw-r--r-- 1 jhyde staff 262 Mar 29 12:57 EMPS.csv.gz $ ./sqlline -u jdbc:calcite:model=hr.json -n scott -p tiger sqlline> select count(*) as c from emp; 'C' '5' 1 row selected (0.135 seconds)
  • 16.
    Calcite Pig Adapter EMPLOYEE= LOAD 'EMPLOYEE' ... ; EMPLOYEE = GROUP EMPLOYEE BY (DEPT_ID); EMPLOYEE = FOREACH EMPLOYEE GENERATE COUNT(EMPLOYEE.DEPT_ID) as DEPT_ID__COUNT_, group as DEPT_ID; EMPLOYEE = FILTER EMPLOYEE BY (DEPT_ID__COUNT_ > 10); SELECT DEPT_ID FROM EMPLOYEE GROUP BY DEPT_ID HAVING COUNT(DEPT_ID) > 10
  • 17.
    Building the PigAdapter 1. Implement Pig-specific RelNodes. e.g. PigFilter 2. RelNode factories 3. Write RelOptRules for converting abstract RelNodes to Pig RelNodes 4. Schema implementation 5. Unit tests run local Pig
  • 18.
    Lessons Learned Calcite isvery flexible (both good and bad) “Recipe list” would be useful Lots of examples if you delve into existing adapters - e.g. Druid and Cassandra Lots available out of the box Dynamic code generation using Janino -- cryptic errors RelBuilder was really useful (if you are building non-SQL engine)
  • 19.
  • 20.
    Thank you! Eli Levine@teleturn Julian Hyde @julianhyde http://calcite.apache.org http://pig.apache.org

Editor's Notes

  • #5 ETL flow: directed asycl graph of data transformations
  • #7 SFDC: enterprise cloud software: CRM, Service, etc. + Platform. Extensibility/customization important. Platform allows extend and manipulate data for CRM, service, etc. and build fully custom business applications: query, reporting, CRUD, permissions/security, etc. Customers/ISVs define custom data model and operate on it. SOQL maps nicely to SQL. Similar to OQL/HQL. Implicit join notation. EMPLOYEE and DEPT could be backed by different data stores.
  • #9 Current impl translates SOQL into Pig. Pig is essentially the physical execution plan. Pig works well for large queries Large investment into Pig @ SFDC. Operational maturity. Have started looking at modern engines: Spark, Flink, etc. More efficient, especially for smaller data sets that can be cached into mem and reused between steps Additionally, want more flexible optimization capabilities, based on runtime info (e.g. stats)
  • #10 Strong rel algebra foundation: writing query optimizers is hard. Another unseen benefit of Calcite is that it helps write better optimizer code: separation of concerns, clear responsibilities of components, extensible for adding custom rules, operators, etc. Next gen query planner will allow choice of physical exec engines (Pig for M/R, Phoenix for simple, small queries, Spark) without having to rewrite everything. Define optimization rules Cost models based on runtime info (stats, other signals) available
  • #14 Two rules have fired, filter-join-transpose-rule followed by splunk-filter-rule, and the query has become more efficient, because many fewer rows are coming out of Splunk. To truly optimize the query we would need to fire many more rules, and apply a cost model to choose which of the many equivalent plans is the best.
  • #17 Transforms SQL queries into Pig scripts
  • #18 This enables Calcite to build a tree of rel operators that know how to be executed in Pig Can construct rel expression tree using Calcite’s RelBuilder instead of doing SQL parsing. Useful if don’t need parsing. Calcite creates abstract expression trees. RelOptRule converters transform them into concrete ones (e.g. Cassandra, Pig) Schema impl is needed to transform your data model into rel tables for Calcite Verify script and data correctness
  • #19 Calcite is a library with numerous components. It’s both good and bad. Good that this functionality is available. Challenge is finding what components actually needed to achieve specific goals. What would help is something of a recipe list: “To instruct Calcite how to run a physical plan and retrieve results need to implement EnumerableConverter” Looking into existing adapters (e.g. Cassandra) useful. Besides major components, like optimization engine, SQL parser, etc. Lots of reusable parts, especially optimizer rules. Pig has nested collections (e.g. after GROUP). Easy to flatten for Pig adapter. Pig allows collections to be reused multiple times in same script. Flow does not map to (mostly) linear SQL.