Using Delta Lake to transform a legacy SparkSQL to support complex CRUD operations Lantao Jin Staff Software Engineer @ eBay
About Me Lantao Jin is a software engineer at eBay's Infrastructure Data Platform. 8+ years big data infra development exp. Focusing on Spark internal optimization and efficient platform building. https://www.linkedin.com/in/lantaojin https://github.com/LantaoJin https://databricks.com/speaker/lantao-jin Previous presentation https://databricks.com/session/managing- apache-spark-workload-and-automatic- optimizing
Agenda 1. Background Our requirements and technical selection 2. Implementations Cross tables update/delete and insertion 3. Optimizations 10X faster and memory reduce 4. Managements Auto vacuum and UI
Background ▪ A Blocker that offload commercial data warehouse to open source ▪ SQL syntax compatible with the commercial product ▪ For example, migrate the ad-hoc workload from the MPP engine to Spark SQL ▪ CRUD is a fundamental requirement in data processing ▪ Developed a FileFormat (Hive ACID) ▪ Left join to perform incremental data to do update ▪ Databaselization is a trend in analytic datasets ▪ Google BigQuery ▪ Provide an option with new approach in many scenarios
Requirements ▪ Fully support the commercial data warehouse SQL syntax ▪ Complex update/delete SQL syntax ▪ Match it in performance ▪ Based on Spark 2.3.0 ▪ Legacy Spark ▪ Deliver in a short time
Project Timeline ▪ Started from Nov. 2019 based on Spark 2.3.0 + Delta Lake 0.4.0 ▪ Delivered to customers at Mar. 2020 ▪ Migration signed off at May. 2020 ▪ Forward-ported to Spark 3.0.0 + Delta Lake 0.7.0 at Sep. 2020
Usage data end of Sep. 2020 ▪ Totally, 5x ~ 10x faster than open source version in our scenarios ▪ 10+ business units are using Delta tables ▪ 2000+ production tables converted to Delta tables ▪ 3000+ update/delete statements per day
https://delta.io/ Why we choose Delta Lake?
https://delta.io/ Why we choose Delta Lake?
Why we choose Delta Lake? Evaluated at Nov. 2019
What did we do? ▪ Stage 1 ▪ From Nov. 2019 To Mar. 2020 ▪ Refactor Delta Lake to support Spark2.3 ▪ Cross table update/delete syntax ▪ Auto vacuuming ▪ Rollback and At by SQL ▪ Stage 2 ▪ From Apr. 2020 To Jun 2020 ▪ Bugfix ▪ Support bucketing join ▪ Performance improvements ▪ Delta table UI ▪ Stage 3 ▪ From Jul. 2020 To Oct. 2020 ▪ Migrate to Spark3.0 + Delta Lake0.7 ▪ Reduce memory consumption ▪ Support subquery in WHERE ▪ Support index for Delta ▪ Stage 4 ▪ From Nov. 2020 ▪ Support Kudu ▪ Runtime Filter ▪ Z-ordering ▪ Native engine
Challenges – Stage 1 ▪ SQL hadn’t been supported in Delta Lake 0.4 ▪ Delta Lake requires Apache Spark 2.4 and above ▪ Spark 3.0 only supports single table update/delete syntax ▪ Integration with our internal features
Stage 1 Implementations - Support SQL (Delta Lake 0.4) - Cross tables update/delete - Insertion - SQL based Time Travel Management - Auto Vacuuming
Delta Lake 0.4 + Spark 2.3 ▪ Added some patches in Spark 2.3 ▪ Backported update/delete code to Spark 2.3 ▪ Downgraded partial codegen interfaces in Delta Lake 0.4 ▪ Rewritten the resolution code with Data Source V1
Cross tables update/delete
Support SQL ▪ Implements it in Catalyst ▪ SparkSessionExtensions ▪ Store essential metadata in HiveMetastore ▪ Rewrite in DataSource V1 Based on Delta Lake 0.4 + Spark 2.3.0 Delta Lake Delta Lake ❌
SqlBase.g4 SparkSqlParser.scala visitUpdateTable() Parse From clause Build a cross table join context Package Assignments and Condition Generate UpdateTable node Inject resolution rule via SparkSessionExtension Support SQL Update internals
Resolve Assignments and Conditions Return single table update Using join conditions and the attributes in assignments to add ProjectionPushdown to source side Assignments foldable && Condition empty yes no Source side contains join Infer out all conditions which only appear in source side, push down them. Generate node UpdateWithJoinTable yes no
Filter out the files which not correlated with target table yes Multiple rows matched? Match out tahoeFileIndex and build UpdateWithJoinCommand Get touched files by inner join exception Mark as RemoveFiles Get filesToRewrite no FileFormatWrite.write and mark as AddedFiles no Build left join plan mapPartition on the plan: if a row matched,write output of source side, otherwise write the output of target side Repartition the planIs bucket table? Commit to transaction log no yes
INSERT INTO/OVERWRITE ... DataSourceStrategy: Case InsertIntoTable Add static partition projection Build InsertIntoDataSource with the plan as its child Contains static partition? no yes Get the actualQuery and package it to InsertIntoDataSourceC ommand Support SQL Insert internals
SparkStrategy: BasicOperators: Case InsertIntoDataSource Add HashClusteredDistribution InsertIntoDataSourceComm and.run() Is bucketed table no yes Generate InsertIntoDataSourceExec EnsureRequirements: ensureDistributionAndOrde ring() EnsurePartitionForWriting: Add ShuffleExchangeExec
InsertableRelation.insert() insert static partition && overwrite Fill replace_where OptimisticTransaction.write no yes FileFormatWrite.write and mark as AddedFiles Assemble predicates, use snapshot.fileForScan to get deleteFiles Mark as RemoveFiles Commit to transaction log
AT ROLLBACK Time Travel via SQL Rollback & At
JDBC/ ODBC Carmel SSD Hive Metastore Gateway Tenant A Tenant B Tenant C Alation Tableau Zeta Shuffle Cache Apollo L B Hercules HDD Prod DBs Hermes Gateway Gateway VDM …… …… ▪ 1 BU, 1 queue (YARN) ▪ 1 queue, 1 or N STS(s) ▪ 1 queue (1 STS) is reserved Architecture background Auto Vacuuming
Auto Vacuuming ▪ Every STS uses listener to store delta metadata to third-part storage async. o Convert to delta o Create table using delta o Rename table o Alter table o Drop table ▪ The STS in reserved queue double checks if events lost ▪ The STS in reserved queue triggers auto vacuuming and attaches Delta UI Implementation
Main contributions ▪ Support cross tables update/delete ▪ Support update/delete with multiple tables join ▪ Support join conditions inferring Based on Delta Lake 0.4 + Spark 2.3.0
Main contributions ▪ Insertion with static/dynamic partitions ▪ Auto vacuuming ▪ Time travel via SQL Based on Delta Lake 0.4 + Spark 2.3.0
Challenges – Stage 2 ▪ The performance of update/delete in Delta Lake has a certain gap with the commercial product. ▪ In a long running Spark thrift-server, a big query on Delta table easily causes Spark Driver OOM. ▪ Manage the capacity of Delta and small files problem.
Stage 2 Optimizations - Support bucketing join - Resolving small files problem automitically - Rewrite outer join to reduce shuffle - More FilterPushDown Management - Delta UI
Support bucketing join ▪ Store bucketSpec in delta table metadata ▪ requireChildDistribution ▪ HashClusteredDistribution ▪ Example ▪ UPDATE a big table(4.7TB) join with a table(200GB) (without AQE) ▪ Before OOM ▪ After 180s
Auto resolving small files problem ▪ community (0.7.0)
Auto resolving small files problem
Auto resolving small files problem ▪ our solution
Auto resolving small files problem
Auto resolving small files problem
Auto resolving small files problem
Rewrite heavy outer join to reduce shuffle data ▪ In right outer join, even there are some predicates in right side, it still needs all rows in filesToRewrite to preform join. ▪ We move right side only predicates from join conditions to its filters, then union the join and the right side which applied anti-predicates filters. ▪ By our testing and practice, after applying this patch, the SMJ could be 5~10 times faster, which depends on how much data is skipping from shuffle.
Rewrite heavy outer join to reduce shuffle data
Delta UI
Our contributions Based on Delta Lake 0.4 + Spark 2.3.0 ▪ Support bucket Delta table ▪ Support bucketing join ▪ Auto resolving small files problem
Our contributions Based on Delta Lake 0.4 + Spark 2.3.0 ▪ Rewrite heavy outer join to reduce shuffle data ▪ https://github.com/delta-io/delta/pull/435 ▪ Apply filter pushdown to source rows for the right outer join of matched only case ▪ https://github.com/delta-io/delta/pull/438 ▪ Apply filter pushdown to the inner join to avoid to scan all rows in parquet files ▪ https://github.com/delta-io/delta/pull/432
Challenges – Stage 3 ▪ Planed to upgrade to Spark3.0 in this year ▪ Subquery statements not supported ▪ File index, materialized view, range partition not supported ▪ Availability & Robustness
Stage 3 Implementations - Migrate our changes to Spark3.0 + Delta 0.7 - Support Subquery in WHERE Optimization - Reduce memory consumption
Support Subquery in WHERE Supported: § IN § EXISTS § NOT IN with IS NOT NULL § NOT EXISTS § Correlated Subquery § Nested Subquery § Multiple subqueries with conjunctive Unsupported: § NOT IN without IS NOT NULL § Scalar Subquery § Multiple subqueries with disjunctive
Support Subquery in WHERE
Support Subquery in WHERE UPDATE target t SET t.b = 0 WHERE t.a IN (SELECT s.a FROM source s WHERE s.a % 2 = 0)
Support Subquery in WHERE UPDATE target t SET t.b = 0 WHERE NOT EXISTS (SELECT * FROM source s WHERE t.a = s.a AND s.a % 2 = 0)
Our contributions Based on Delta Lake 0.7 + Spark 3.0.0 ▪ Migrate all changes and improvements to latest version ▪ Support Subquery in Where ▪ Reduce memory consumption in Driver ▪ [SPARK-32994][CORE] Update external heavy accumulators before they entering into listener event loop ▪ Skip schema infer and merge when table schema can be read from catalog ▪ Fallback to simple update if all SET statements are foldable and no join
Commerical Data Warehouse Stage2 Stage3 PS: Pulse is our regular release
Future work
Future work – Stage 4 ▪ Range partition for delta (WIP) ▪ File index for delta (WIP) ▪ Runtime Filter Join optimization (WIP) ▪ Support Kudu (WIP) ▪ Z-ordering ▪ Native engine
Feedback Your feedback is important to us. Don’t forget to rate and review the sessions.

Using Delta Lake to Transform a Legacy Apache Spark to Support Complex Update/Delete SQL Operation

  • 1.
    Using Delta Laketo transform a legacy SparkSQL to support complex CRUD operations Lantao Jin Staff Software Engineer @ eBay
  • 2.
    About Me Lantao Jinis a software engineer at eBay's Infrastructure Data Platform. 8+ years big data infra development exp. Focusing on Spark internal optimization and efficient platform building. https://www.linkedin.com/in/lantaojin https://github.com/LantaoJin https://databricks.com/speaker/lantao-jin Previous presentation https://databricks.com/session/managing- apache-spark-workload-and-automatic- optimizing
  • 3.
    Agenda 1. Background Our requirementsand technical selection 2. Implementations Cross tables update/delete and insertion 3. Optimizations 10X faster and memory reduce 4. Managements Auto vacuum and UI
  • 4.
    Background ▪ A Blockerthat offload commercial data warehouse to open source ▪ SQL syntax compatible with the commercial product ▪ For example, migrate the ad-hoc workload from the MPP engine to Spark SQL ▪ CRUD is a fundamental requirement in data processing ▪ Developed a FileFormat (Hive ACID) ▪ Left join to perform incremental data to do update ▪ Databaselization is a trend in analytic datasets ▪ Google BigQuery ▪ Provide an option with new approach in many scenarios
  • 5.
    Requirements ▪ Fully supportthe commercial data warehouse SQL syntax ▪ Complex update/delete SQL syntax ▪ Match it in performance ▪ Based on Spark 2.3.0 ▪ Legacy Spark ▪ Deliver in a short time
  • 6.
    Project Timeline ▪ Startedfrom Nov. 2019 based on Spark 2.3.0 + Delta Lake 0.4.0 ▪ Delivered to customers at Mar. 2020 ▪ Migration signed off at May. 2020 ▪ Forward-ported to Spark 3.0.0 + Delta Lake 0.7.0 at Sep. 2020
  • 7.
    Usage data endof Sep. 2020 ▪ Totally, 5x ~ 10x faster than open source version in our scenarios ▪ 10+ business units are using Delta tables ▪ 2000+ production tables converted to Delta tables ▪ 3000+ update/delete statements per day
  • 8.
  • 9.
  • 10.
    Why we chooseDelta Lake? Evaluated at Nov. 2019
  • 11.
    What did wedo? ▪ Stage 1 ▪ From Nov. 2019 To Mar. 2020 ▪ Refactor Delta Lake to support Spark2.3 ▪ Cross table update/delete syntax ▪ Auto vacuuming ▪ Rollback and At by SQL ▪ Stage 2 ▪ From Apr. 2020 To Jun 2020 ▪ Bugfix ▪ Support bucketing join ▪ Performance improvements ▪ Delta table UI ▪ Stage 3 ▪ From Jul. 2020 To Oct. 2020 ▪ Migrate to Spark3.0 + Delta Lake0.7 ▪ Reduce memory consumption ▪ Support subquery in WHERE ▪ Support index for Delta ▪ Stage 4 ▪ From Nov. 2020 ▪ Support Kudu ▪ Runtime Filter ▪ Z-ordering ▪ Native engine
  • 12.
    Challenges – Stage1 ▪ SQL hadn’t been supported in Delta Lake 0.4 ▪ Delta Lake requires Apache Spark 2.4 and above ▪ Spark 3.0 only supports single table update/delete syntax ▪ Integration with our internal features
  • 13.
    Stage 1 Implementations - SupportSQL (Delta Lake 0.4) - Cross tables update/delete - Insertion - SQL based Time Travel Management - Auto Vacuuming
  • 14.
    Delta Lake 0.4+ Spark 2.3 ▪ Added some patches in Spark 2.3 ▪ Backported update/delete code to Spark 2.3 ▪ Downgraded partial codegen interfaces in Delta Lake 0.4 ▪ Rewritten the resolution code with Data Source V1
  • 15.
  • 16.
    Support SQL ▪ Implementsit in Catalyst ▪ SparkSessionExtensions ▪ Store essential metadata in HiveMetastore ▪ Rewrite in DataSource V1 Based on Delta Lake 0.4 + Spark 2.3.0 Delta Lake Delta Lake ❌
  • 17.
    SqlBase.g4 SparkSqlParser.scala visitUpdateTable() Parse From clause Builda cross table join context Package Assignments and Condition Generate UpdateTable node Inject resolution rule via SparkSessionExtension Support SQL Update internals
  • 18.
    Resolve Assignments and Conditions Returnsingle table update Using join conditions and the attributes in assignments to add ProjectionPushdown to source side Assignments foldable && Condition empty yes no Source side contains join Infer out all conditions which only appear in source side, push down them. Generate node UpdateWithJoinTable yes no
  • 19.
    Filter out thefiles which not correlated with target table yes Multiple rows matched? Match out tahoeFileIndex and build UpdateWithJoinCommand Get touched files by inner join exception Mark as RemoveFiles Get filesToRewrite no FileFormatWrite.write and mark as AddedFiles no Build left join plan mapPartition on the plan: if a row matched,write output of source side, otherwise write the output of target side Repartition the planIs bucket table? Commit to transaction log no yes
  • 20.
    INSERT INTO/OVERWRITE ... DataSourceStrategy: Case InsertIntoTable Addstatic partition projection Build InsertIntoDataSource with the plan as its child Contains static partition? no yes Get the actualQuery and package it to InsertIntoDataSourceC ommand Support SQL Insert internals
  • 21.
    SparkStrategy: BasicOperators: Case InsertIntoDataSource Add HashClusteredDistribution InsertIntoDataSourceComm and.run() Is bucketedtable no yes Generate InsertIntoDataSourceExec EnsureRequirements: ensureDistributionAndOrde ring() EnsurePartitionForWriting: Add ShuffleExchangeExec
  • 22.
    InsertableRelation.insert() insert static partition &&overwrite Fill replace_where OptimisticTransaction.write no yes FileFormatWrite.write and mark as AddedFiles Assemble predicates, use snapshot.fileForScan to get deleteFiles Mark as RemoveFiles Commit to transaction log
  • 23.
    AT ROLLBACK Time Travel viaSQL Rollback & At
  • 24.
    JDBC/ ODBC Carmel SSD Hive Metastore Gateway Tenant A Tenant B Tenant C Alation Tableau Zeta Shuffle Cache Apollo L B Hercules HDD ProdDBs Hermes Gateway Gateway VDM …… …… ▪ 1 BU, 1 queue (YARN) ▪ 1 queue, 1 or N STS(s) ▪ 1 queue (1 STS) is reserved Architecture background Auto Vacuuming
  • 25.
    Auto Vacuuming ▪ EverySTS uses listener to store delta metadata to third-part storage async. o Convert to delta o Create table using delta o Rename table o Alter table o Drop table ▪ The STS in reserved queue double checks if events lost ▪ The STS in reserved queue triggers auto vacuuming and attaches Delta UI Implementation
  • 26.
    Main contributions ▪ Supportcross tables update/delete ▪ Support update/delete with multiple tables join ▪ Support join conditions inferring Based on Delta Lake 0.4 + Spark 2.3.0
  • 27.
    Main contributions ▪ Insertionwith static/dynamic partitions ▪ Auto vacuuming ▪ Time travel via SQL Based on Delta Lake 0.4 + Spark 2.3.0
  • 28.
    Challenges – Stage2 ▪ The performance of update/delete in Delta Lake has a certain gap with the commercial product. ▪ In a long running Spark thrift-server, a big query on Delta table easily causes Spark Driver OOM. ▪ Manage the capacity of Delta and small files problem.
  • 29.
    Stage 2 Optimizations - Supportbucketing join - Resolving small files problem automitically - Rewrite outer join to reduce shuffle - More FilterPushDown Management - Delta UI
  • 30.
    Support bucketing join ▪Store bucketSpec in delta table metadata ▪ requireChildDistribution ▪ HashClusteredDistribution ▪ Example ▪ UPDATE a big table(4.7TB) join with a table(200GB) (without AQE) ▪ Before OOM ▪ After 180s
  • 31.
    Auto resolving smallfiles problem ▪ community (0.7.0)
  • 32.
    Auto resolving smallfiles problem
  • 33.
    Auto resolving smallfiles problem ▪ our solution
  • 34.
    Auto resolving smallfiles problem
  • 35.
    Auto resolving smallfiles problem
  • 36.
    Auto resolving smallfiles problem
  • 37.
    Rewrite heavy outerjoin to reduce shuffle data ▪ In right outer join, even there are some predicates in right side, it still needs all rows in filesToRewrite to preform join. ▪ We move right side only predicates from join conditions to its filters, then union the join and the right side which applied anti-predicates filters. ▪ By our testing and practice, after applying this patch, the SMJ could be 5~10 times faster, which depends on how much data is skipping from shuffle.
  • 38.
    Rewrite heavy outerjoin to reduce shuffle data
  • 39.
  • 40.
    Our contributions Based onDelta Lake 0.4 + Spark 2.3.0 ▪ Support bucket Delta table ▪ Support bucketing join ▪ Auto resolving small files problem
  • 41.
    Our contributions Based onDelta Lake 0.4 + Spark 2.3.0 ▪ Rewrite heavy outer join to reduce shuffle data ▪ https://github.com/delta-io/delta/pull/435 ▪ Apply filter pushdown to source rows for the right outer join of matched only case ▪ https://github.com/delta-io/delta/pull/438 ▪ Apply filter pushdown to the inner join to avoid to scan all rows in parquet files ▪ https://github.com/delta-io/delta/pull/432
  • 42.
    Challenges – Stage3 ▪ Planed to upgrade to Spark3.0 in this year ▪ Subquery statements not supported ▪ File index, materialized view, range partition not supported ▪ Availability & Robustness
  • 43.
    Stage 3 Implementations - Migrateour changes to Spark3.0 + Delta 0.7 - Support Subquery in WHERE Optimization - Reduce memory consumption
  • 44.
    Support Subquery inWHERE Supported: § IN § EXISTS § NOT IN with IS NOT NULL § NOT EXISTS § Correlated Subquery § Nested Subquery § Multiple subqueries with conjunctive Unsupported: § NOT IN without IS NOT NULL § Scalar Subquery § Multiple subqueries with disjunctive
  • 45.
  • 46.
    Support Subquery inWHERE UPDATE target t SET t.b = 0 WHERE t.a IN (SELECT s.a FROM source s WHERE s.a % 2 = 0)
  • 47.
    Support Subquery inWHERE UPDATE target t SET t.b = 0 WHERE NOT EXISTS (SELECT * FROM source s WHERE t.a = s.a AND s.a % 2 = 0)
  • 48.
    Our contributions Based onDelta Lake 0.7 + Spark 3.0.0 ▪ Migrate all changes and improvements to latest version ▪ Support Subquery in Where ▪ Reduce memory consumption in Driver ▪ [SPARK-32994][CORE] Update external heavy accumulators before they entering into listener event loop ▪ Skip schema infer and merge when table schema can be read from catalog ▪ Fallback to simple update if all SET statements are foldable and no join
  • 49.
    Commerical Data WarehouseStage2 Stage3 PS: Pulse is our regular release
  • 50.
  • 51.
    Future work –Stage 4 ▪ Range partition for delta (WIP) ▪ File index for delta (WIP) ▪ Runtime Filter Join optimization (WIP) ▪ Support Kudu (WIP) ▪ Z-ordering ▪ Native engine
  • 52.
    Feedback Your feedback isimportant to us. Don’t forget to rate and review the sessions.