Improving Spark SQL At LinkedIn Fangshi Li Staff Software Engineer LinkedIn
1 2 3 4 Agenda Automated column pruning for Dataset 2d partitioned join Adaptive Execution Cost-based optimizer
Spark SQL adoptions at LinkedIn 60% jobs running on our cluster are Spark jobs Spark jobs: ⅔ Spark SQL ⅓ RDD Spark SQL jobs: ⅔ DataFrame/SQL API ⅓ Dataset API 60% 2/3 1/3
goals Enable computations that could not be completed before Make every job run faster
Spark SQL roadmap at Linkedin: 3-level optimization Operator-level Dataset ser-de joins Plan-level Adaptive Execution, CBO Cluster-level Multi-query optimization
1 2 3 4 Agenda Automated column pruning for Dataset 2d partitioned join Adaptive Execution Cost-based optimization (CBO)
Dataset performance val ds: Dataset<TrackingEvent> ds = createDataset() val ds2 = ds.filter(x.id > 0).map(x=> (x.id, x.key)) Dataset has performance issue due to 1. Excessive conversion overhead 2. No column pruning for Orc/Parquet
Solutions Apple: Spark + AI 2019 talk: “Bridging the Gap Between Datasets and DataFrames” Using a bytecode analyzer, converting the user lambda functions into SQL expressions E.g., x.id > 0 ----> isLargerThan(col(“id”) , Literal(0)) Linkedin: Using a bytecode analyzer, find out which columns are used in the user lambdas, and prune columns that are not needed val ds: Dataset<TrackingEvent> ds = createDataset() val ds2 = ds.filter(x.id > 0).map(x=> (x.id, x.key)) Big performance boost for orc/parquet since columns can be pushed to readers
1 2 3 4 Agenda Automated column pruning for Dataset 2d partitioned join Adaptive Execution Cost-based optimization (CBO)
A recommendation use case at Linkedin 1. Pair feature joins with viewer feature 2. Intermediate result joins with entity feature 3. Scores each joined record a ML model 4. Rank the top N entities for each viewer
Exploding intermediate data Can we perform 3-way join and score in a single step without exploding intermediate data?
2d partitioned join - Partition left, right, and pair table into M, N, M*N partitions - Left and pair table are sorted within each partition - For each partition in pair table - join left table with a sort-merge join - join right table with a shuffle-hash join - For each joined record, perform scoring right away, and output the scorable - Rank the scorables
10+hBefore 1hAfter
1 2 3 4 Agenda Automated column pruning for Dataset 2d hash partitioned join Adaptive Execution (AE) Cost-based optimization(CBO)
Adaptive Execution(AE) at LinkedIn Optimize query plan while job is running (SPARK-23128) Handle data skew in join Works great! Convert shuffle-based join to broadcast join at runtime Need shuffle map stage before converting to broadcast join Should we use Adaptive Execution to optimize join plan at runtime?
1 2 3 4 Agenda Automated column pruning for Dataset 2d hash partitioned join Adaptive Execution Cost-based optimization(CBO)
CBO(Cost-based optimizer) CBO in Spark can optimize the query plan based on the operators cost(data size, # of records). Benefits: Choose best join strategy: broadcast vs shuffle-hash vs sort-merge Multi-Join reordering
CBO(Cost-based optimizer) The native CBO in Spark has usability issue: Requires detailed stats(count, min,max,distinct, histograms) available for the input datasets. Requires scheduled jobs to compute stats on all datasets which is very expensive.
CBO(Cost-based optimizer) Can we learn the stats from history? YES!
Learning-based CBO Eliminate the CBO’s dependency on pre-computing stats by learning stats from job histories A general approach to benefit all SQL engines
Learning-based CBO Approach 2: Model-based learning Ref: “SageDB: A Learned Database System” Approach 1: Instance-based learning Ref: “LEO: DB2’s Learning Optimizer”
Learning-based CBO vs no-CBO Approach 2: Model-based learning Ref: “SageDB: A Learned Database System” Approach 1: Instance-based learning Ref: “LEO: DB2’s Learning Optimizer”
1 2 3 4 Summary Automated column pruning for Dataset 2d partitioned join Adaptive Execution History-based CBO (Cost-based optimizer)
Thank you

Improving Spark SQL at LinkedIn

  • 1.
    Improving Spark SQL AtLinkedIn Fangshi Li Staff Software Engineer LinkedIn
  • 2.
    1 2 3 4 Agenda Automated column pruning forDataset 2d partitioned join Adaptive Execution Cost-based optimizer
  • 3.
    Spark SQL adoptionsat LinkedIn 60% jobs running on our cluster are Spark jobs Spark jobs: ⅔ Spark SQL ⅓ RDD Spark SQL jobs: ⅔ DataFrame/SQL API ⅓ Dataset API 60% 2/3 1/3
  • 4.
    goals Enable computations that couldnot be completed before Make every job run faster
  • 5.
    Spark SQL roadmapat Linkedin: 3-level optimization Operator-level Dataset ser-de joins Plan-level Adaptive Execution, CBO Cluster-level Multi-query optimization
  • 6.
    1 2 3 4 Agenda Automated column pruning forDataset 2d partitioned join Adaptive Execution Cost-based optimization (CBO)
  • 7.
    Dataset performance val ds:Dataset<TrackingEvent> ds = createDataset() val ds2 = ds.filter(x.id > 0).map(x=> (x.id, x.key)) Dataset has performance issue due to 1. Excessive conversion overhead 2. No column pruning for Orc/Parquet
  • 8.
    Solutions Apple: Spark + AI2019 talk: “Bridging the Gap Between Datasets and DataFrames” Using a bytecode analyzer, converting the user lambda functions into SQL expressions E.g., x.id > 0 ----> isLargerThan(col(“id”) , Literal(0)) Linkedin: Using a bytecode analyzer, find out which columns are used in the user lambdas, and prune columns that are not needed val ds: Dataset<TrackingEvent> ds = createDataset() val ds2 = ds.filter(x.id > 0).map(x=> (x.id, x.key)) Big performance boost for orc/parquet since columns can be pushed to readers
  • 9.
    1 2 3 4 Agenda Automated column pruning forDataset 2d partitioned join Adaptive Execution Cost-based optimization (CBO)
  • 10.
    A recommendation usecase at Linkedin 1. Pair feature joins with viewer feature 2. Intermediate result joins with entity feature 3. Scores each joined record a ML model 4. Rank the top N entities for each viewer
  • 11.
    Exploding intermediate data Canwe perform 3-way join and score in a single step without exploding intermediate data?
  • 12.
    2d partitioned join -Partition left, right, and pair table into M, N, M*N partitions - Left and pair table are sorted within each partition - For each partition in pair table - join left table with a sort-merge join - join right table with a shuffle-hash join - For each joined record, perform scoring right away, and output the scorable - Rank the scorables
  • 13.
  • 14.
    1 2 3 4 Agenda Automated column pruning forDataset 2d hash partitioned join Adaptive Execution (AE) Cost-based optimization(CBO)
  • 15.
    Adaptive Execution(AE) atLinkedIn Optimize query plan while job is running (SPARK-23128) Handle data skew in join Works great! Convert shuffle-based join to broadcast join at runtime Need shuffle map stage before converting to broadcast join Should we use Adaptive Execution to optimize join plan at runtime?
  • 16.
    1 2 3 4 Agenda Automated column pruning forDataset 2d hash partitioned join Adaptive Execution Cost-based optimization(CBO)
  • 17.
    CBO(Cost-based optimizer) CBO inSpark can optimize the query plan based on the operators cost(data size, # of records). Benefits: Choose best join strategy: broadcast vs shuffle-hash vs sort-merge Multi-Join reordering
  • 18.
    CBO(Cost-based optimizer) The nativeCBO in Spark has usability issue: Requires detailed stats(count, min,max,distinct, histograms) available for the input datasets. Requires scheduled jobs to compute stats on all datasets which is very expensive.
  • 19.
    CBO(Cost-based optimizer) Can welearn the stats from history? YES!
  • 20.
    Learning-based CBO Eliminate theCBO’s dependency on pre-computing stats by learning stats from job histories A general approach to benefit all SQL engines
  • 21.
    Learning-based CBO Approach 2:Model-based learning Ref: “SageDB: A Learned Database System” Approach 1: Instance-based learning Ref: “LEO: DB2’s Learning Optimizer”
  • 22.
    Learning-based CBO vsno-CBO Approach 2: Model-based learning Ref: “SageDB: A Learned Database System” Approach 1: Instance-based learning Ref: “LEO: DB2’s Learning Optimizer”
  • 23.
    1 2 3 4 Summary Automated column pruning forDataset 2d partitioned join Adaptive Execution History-based CBO (Cost-based optimizer)
  • 24.