The document discusses the implementation of dynamic partition pruning in Apache Spark to enhance the performance of SQL analytics workloads. It outlines the benefits of this approach, leading to significant speedups in query execution, particularly with TPC-DS benchmarks like query 98. The innovation allows Spark to efficiently handle star-schema queries, reducing the need for complex ETL processes with denormalized tables.
2 About Us BI Experienceteam in the Databricks Amsterdam European Development Centre ● Working on improving the experience and performance of Business Intelligence / SQL analytics workloads using Databricks ○ JDBC / ODBC connectivity to Databricks clusters ○ Integrations with BI tools such as Tableau ○ But also: core performance improvements in Apache Spark for common SQL analytics query patterns Bogdan Ghit Juliusz Sompolski
3.
TPCDS Q98 on10 TB How to Make a Query 100x Faster?
4.
Static Partition Pruning SELECT* FROM Sales WHERE day_of_week = ‘Mon’ Filter Scan Basic data-flow Filter Scan Filter Push-down Filter Scan Partition files with multi-columnar data
5.
Table Denormalization SELECT *FROM Sales JOIN Date WHERE Date.day_of_week = ‘Mon’ Static pruning not possible Scan Sales Filter day_of_week = ‘mon’ Join Simple workaround Scan Sales Join Scan Date Filter day_of_week = ‘mon’ Scan Scan Date
Spark In aNutshell Query Logical Plan Optimization Physical Plan Selection RDD batches Cluster slots Stats-based cost model Rule-based transformations APIs
A Simple Approach Partitionfiles with multi-columnar data Scan FACT TABLE Scan DIM TABLE Non-partitioned dataset Filter DIM Join on partition id Scan DIM TABLE Filter DIM Work duplication may be expensive Heuristics based on inaccurate stats
10.
Broadcast Hash Join FileScanFileScan with Dim Filter Non-partitioned dataset BroadcastExchange Broadcast Hash Join Execute the build side of the join Place the result in a broadcast variableBroadcast the build side results Execute the join locally without a shuffle
11.
Reusing Broadcast Results Partitionfiles with multi-columnar data FileScan FileScan with Dim Filter Non-partitioned dataset BroadcastExchange Broadcast Hash Join Dynamic Filter
Query 98 SELECT i_item_desc,i_category, i_class, i_current_price, sum(ss_ext_sales_price) as itemrevenue, sum(ss_ext_sales_price)*100/sum(sum(ss_ext_sales_price)) over (partition by i_class) as revenueratio FROM store_sales, item, date_dim WHERE ss_item_sk = i_item_sk and i_category in ('Sports', 'Books', 'Home') and ss_sold_date_sk = d_date_sk and cast(d_date as date) between cast('1999-02-22' as date) and (cast('1999-02-22' as date) + interval '30' day) GROUP BY i_item_id, i_item_desc, i_category, i_class, i_current_price ORDER BY i_category, i_class, i_item_id, i_item_desc, revenueratio
18.
TPCDS 10 TB Highlyselective dimension filter that retains only one month out of 5 years of data
19.
Conclusion Apache Spark 3.0introduces Dynamic Partition Pruning - Strawman approach at logical planning time - Optimized approach during execution time Significant speedup, exhibited in many TPC-DS queries With this optimization Spark may now work good with star-schema queries, making it unnecessary to ETL denormalized tables.