Vectorized Query Execution in Apache Spark at Facebook Chen Yang Spark Summit | 04/24/2019
About me Chen Yang • Software Engineer at Facebook (Data Warehouse Team) • Working on Spark execution engine improvements • Worked on Hive and ORC in the past
Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
• Largest SQL query engine at Facebook (by CPU usage) • We use Spark on disaggregated compute/storage clusters • Scale/upgrade clusters independently • Efficiency is top priority for Spark at Facebook given the scale • Compute efficiency: Optimize CPU and Memory usage • Storage efficiency: Optimize on disk size and IOPS Spark at Facebook
• Compute efficiency : Optimize CPU and Memory usage • Significant percentage (>40%) of CPU time is spent in reading/writing • Storage efficiency : Optimize on disk size and IOPS • Storage format have big impact on on-disk size and IOPS • Facebook data warehouse use ORC format Spark at Facebook
Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
Row format vs Columnar format Row format user_id os 1 android 1 ios 3 ios 6 android 1 android 1 ios 3 ios 6 android 1 1 3 6 android ios ios android Columnar format Logical table
Row format vs Columnar format Row format 1 android 1 ios 3 ios 6 android 1 1 3 6 android ios ios android Columnar format Logical table user_id os 1 android 1 ios 3 ios 6 android On disk row format: csv In memory row format: UnsafeRow, OrcLazyRow
Row format vs Columnar format Row format 1 android 1 ios 3 ios 6 android 1 1 3 6 android ios ios android Columnar format Logical table user_id os 1 android 1 ios 3 ios 6 android On disk row format: csv On disk columnar format: Parquet, ORC In memory row format: UnsafeRow, OrcLazyRow In memory columnar format: Arrow, VectorizedRowBatch
Columnar on disk format • Better compression ratio • Type specific encoding • Minimize I/O • Projection push down (column pruning) • Predicate push down (filtering based on stats) 1 1 3 6 android ios ios android Columnar format Logical table user_id os 1 android 1 ios 3 ios 6 android
Better compression ratio • Encoding • Dictionary encoding • Run Length encoding • Delta encoding • Compression • zstd • zlib • snappy android ios ios android 0 1 1 0 0 android 1 ios Dictionary encoding
Minimize I/O • Projection push down • Column pruning • Predicate push down • Filtering based on stats user_id os 1 android 1 ios 3 ios 6 android user_id os 1 android 1 ios 3 ios 6 android user_id os 1 android 1 ios 3 ios 6 android Column pruning Filtering based on stats Minimize I/O SELECT COUNT(*) FROM user_os_info WHERE user_id = 3
ORC in open source vs at Facebook Open source: • ORC(Optimized Row Columnar) is a self-describing type-aware columnar file format designed for Hadoop workloads. • It is optimized for large streaming reads, but with integrated support for finding required rows quickly. Facebook: • Fork of Apache ORC, Also called DWRF • Various perf improvements (IOPS, memory etc) • Some special format for specific use case in Facebook (FlatMap)
Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Row-at-a-time vs vector-at-a-time user_id os 1 android 1 ios 3 ios 6 android
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Row-at-a-time processing Orc Reader Row Orc Writer Row user_id os 1 android 1 ios 3 ios 6 android
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Row-at-a-time processing Orc Reader OrcLazyRow Orc Writer UnsafeRow UnsafeRow OrcRow
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Row-at-a-time processing private void agg_doAggregateWithoutKey() { while (inputadapter_input.hasNext()) { inputadapter_row = inputadapter_input.next(); value = inputadapter_row.getLong(0); // do aggr } } Orc Reader OrcLazyRow Orc Writer UnsafeRow UnsafeRow OrcRow
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Vector-at-a-time processing Orc Reader Vector Orc Writer Vector user_id os 1 android 1 ios 3 ios 6 android
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Vector-at-a-time processing 1 1 3 6 VectorizedRowBatch Orc Reader VectorizedRowBatch Orc Writer VectorizedRowBatch
SELECT COUNT(*) FROM user_os_info WHERE user_id = 3 Vector-at-a-time processing Orc Reader VectorizedRowBatch Orc Writer VectorizedRowBatch private void agg_doAggregateWithoutKey() { while (orc_scan_batch != null) { int numRows = orc_scan_batch.size; while (orc_scan_batch_idx < numRows) { value = orc_scan_col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; // do aggr } nextBatch(); } } private void nextBatch() { if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; } } 1 1 3 6 VectorizedRowBatch
Row-at-a-time vs vector-at-a-time private void agg_doAggregateWithoutKey() { while (inputadapter_input.hasNext()) { row = inputadapter_input.next(); value = inputadapter_row.getLong(0); // do aggr } } private void agg_doAggregateWithoutKey() { while (orc_scan_batch != null) { int numRows = orc_scan_batch.size; while (orc_scan_batch_idx < numRows) { value = col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; // do aggr } nextBatch(); } } private void nextBatch() { if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; } } • Lower overhead per row • Avoid virtual function dispatch cost per row • Better cache locality • Avoid unnecessary copy/conversion • No need to convert between OrcLazyRow and UnsafeRow Row-at-a-time processing Vector-at-a-time processing
Row-at-a-time vs vector-at-a-time • Lower overhead per row • Avoid virtual function dispatch cost per row • Better cache locality • Avoid unnecessary copy/conversion • No need to convert between OrcLazyRow and UnsafeRow Orc Reader RowBatch Orc Reader OrcLazyRow UnsafeRow WholeStage CodeGen WholeStage CodeGen Row-at-a-time processing Vector-at-a-time processing
Vectorized ORC reader/writer in open source vs at Facebook Open source: • Vectorized reader only support simple type • Vectorized writer is not supported Facebook: • Vectorized reader support all types, integrated with codegen • Vectorized writer support all types, plan to integrated with codegen
Vector-at-a-time + Whole Stage CodeGen • Currently only HiveTableScan and InsertIntoHiveTable understands ORC columnar format • Most of Spark Operators still process one row at a time
Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
Vectorized reader + vector-at-a-time microbenchmark Up to 8x speed up when reading 10M row from a single column table
Vectorized reader and writer + vector-at-a-time microbenchmark Up to 3.5x speed up when reading and writing 10M row from a single column table
Agenda • Spark at Facebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
Road ahead • SIMD/Avoid branching/prefetching • Optimize codegen code to trigger auto- vectorization in JVM • Customize JVM to make better compiler optimization decisions for Spark 1 1 3 6 SELECT COUNT(*) FROM user_os_info WHERE user_id > 1 1 1 1 1 0 0 1 1 > > > > ↓ ↓ ↓ ↓ ⟶ 2
Road ahead • Efficient in memory data format • Use Apache Arrow as in memory format • Encoded columnar in memory format • Use columnar format for shuffle • Filter/projection push down to reader • Avoid decoding cost android ios ios android 0 1 1 0 0 android 1 ios Dictionary encoding SELECT COUNT(*) FROM user_os_info WHERE os = ‘ios’
INFRASTRUCTURE

Vectorized Query Execution in Apache Spark at Facebook

  • 1.
    Vectorized Query Executionin Apache Spark at Facebook Chen Yang Spark Summit | 04/24/2019
  • 2.
    About me Chen Yang •Software Engineer at Facebook (Data Warehouse Team) • Working on Spark execution engine improvements • Worked on Hive and ORC in the past
  • 3.
    Agenda • Spark atFacebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
  • 4.
    Agenda • Spark atFacebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
  • 5.
    • Largest SQLquery engine at Facebook (by CPU usage) • We use Spark on disaggregated compute/storage clusters • Scale/upgrade clusters independently • Efficiency is top priority for Spark at Facebook given the scale • Compute efficiency: Optimize CPU and Memory usage • Storage efficiency: Optimize on disk size and IOPS Spark at Facebook
  • 6.
    • Compute efficiency: Optimize CPU and Memory usage • Significant percentage (>40%) of CPU time is spent in reading/writing • Storage efficiency : Optimize on disk size and IOPS • Storage format have big impact on on-disk size and IOPS • Facebook data warehouse use ORC format Spark at Facebook
  • 7.
    Agenda • Spark atFacebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
  • 8.
    Row format vsColumnar format Row format user_id os 1 android 1 ios 3 ios 6 android 1 android 1 ios 3 ios 6 android 1 1 3 6 android ios ios android Columnar format Logical table
  • 9.
    Row format vsColumnar format Row format 1 android 1 ios 3 ios 6 android 1 1 3 6 android ios ios android Columnar format Logical table user_id os 1 android 1 ios 3 ios 6 android On disk row format: csv In memory row format: UnsafeRow, OrcLazyRow
  • 10.
    Row format vsColumnar format Row format 1 android 1 ios 3 ios 6 android 1 1 3 6 android ios ios android Columnar format Logical table user_id os 1 android 1 ios 3 ios 6 android On disk row format: csv On disk columnar format: Parquet, ORC In memory row format: UnsafeRow, OrcLazyRow In memory columnar format: Arrow, VectorizedRowBatch
  • 11.
    Columnar on diskformat • Better compression ratio • Type specific encoding • Minimize I/O • Projection push down (column pruning) • Predicate push down (filtering based on stats) 1 1 3 6 android ios ios android Columnar format Logical table user_id os 1 android 1 ios 3 ios 6 android
  • 12.
    Better compression ratio •Encoding • Dictionary encoding • Run Length encoding • Delta encoding • Compression • zstd • zlib • snappy android ios ios android 0 1 1 0 0 android 1 ios Dictionary encoding
  • 13.
    Minimize I/O • Projectionpush down • Column pruning • Predicate push down • Filtering based on stats user_id os 1 android 1 ios 3 ios 6 android user_id os 1 android 1 ios 3 ios 6 android user_id os 1 android 1 ios 3 ios 6 android Column pruning Filtering based on stats Minimize I/O SELECT COUNT(*) FROM user_os_info WHERE user_id = 3
  • 14.
    ORC in opensource vs at Facebook Open source: • ORC(Optimized Row Columnar) is a self-describing type-aware columnar file format designed for Hadoop workloads. • It is optimized for large streaming reads, but with integrated support for finding required rows quickly. Facebook: • Fork of Apache ORC, Also called DWRF • Various perf improvements (IOPS, memory etc) • Some special format for specific use case in Facebook (FlatMap)
  • 15.
    Agenda • Spark atFacebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
  • 16.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Row-at-a-time vs vector-at-a-time user_id os 1 android 1 ios 3 ios 6 android
  • 17.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Row-at-a-time processing Orc Reader Row Orc Writer Row user_id os 1 android 1 ios 3 ios 6 android
  • 18.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Row-at-a-time processing Orc Reader OrcLazyRow Orc Writer UnsafeRow UnsafeRow OrcRow
  • 19.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Row-at-a-time processing private void agg_doAggregateWithoutKey() { while (inputadapter_input.hasNext()) { inputadapter_row = inputadapter_input.next(); value = inputadapter_row.getLong(0); // do aggr } } Orc Reader OrcLazyRow Orc Writer UnsafeRow UnsafeRow OrcRow
  • 20.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Vector-at-a-time processing Orc Reader Vector Orc Writer Vector user_id os 1 android 1 ios 3 ios 6 android
  • 21.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Vector-at-a-time processing 1 1 3 6 VectorizedRowBatch Orc Reader VectorizedRowBatch Orc Writer VectorizedRowBatch
  • 22.
    SELECT COUNT(*) FROM user_os_info WHERE user_id= 3 Vector-at-a-time processing Orc Reader VectorizedRowBatch Orc Writer VectorizedRowBatch private void agg_doAggregateWithoutKey() { while (orc_scan_batch != null) { int numRows = orc_scan_batch.size; while (orc_scan_batch_idx < numRows) { value = orc_scan_col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; // do aggr } nextBatch(); } } private void nextBatch() { if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; } } 1 1 3 6 VectorizedRowBatch
  • 23.
    Row-at-a-time vs vector-at-a-time privatevoid agg_doAggregateWithoutKey() { while (inputadapter_input.hasNext()) { row = inputadapter_input.next(); value = inputadapter_row.getLong(0); // do aggr } } private void agg_doAggregateWithoutKey() { while (orc_scan_batch != null) { int numRows = orc_scan_batch.size; while (orc_scan_batch_idx < numRows) { value = col_0.vector[orc_scan_row_idx]; orc_scan_batch_idx++; // do aggr } nextBatch(); } } private void nextBatch() { if (orc_scan_input.hasNext()) { batch = orc_scan_input.next(); col_0 = orc_scan_batch.cols[0]; } } • Lower overhead per row • Avoid virtual function dispatch cost per row • Better cache locality • Avoid unnecessary copy/conversion • No need to convert between OrcLazyRow and UnsafeRow Row-at-a-time processing Vector-at-a-time processing
  • 24.
    Row-at-a-time vs vector-at-a-time •Lower overhead per row • Avoid virtual function dispatch cost per row • Better cache locality • Avoid unnecessary copy/conversion • No need to convert between OrcLazyRow and UnsafeRow Orc Reader RowBatch Orc Reader OrcLazyRow UnsafeRow WholeStage CodeGen WholeStage CodeGen Row-at-a-time processing Vector-at-a-time processing
  • 25.
    Vectorized ORC reader/writerin open source vs at Facebook Open source: • Vectorized reader only support simple type • Vectorized writer is not supported Facebook: • Vectorized reader support all types, integrated with codegen • Vectorized writer support all types, plan to integrated with codegen
  • 26.
    Vector-at-a-time + WholeStage CodeGen • Currently only HiveTableScan and InsertIntoHiveTable understands ORC columnar format • Most of Spark Operators still process one row at a time
  • 27.
    Agenda • Spark atFacebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
  • 28.
    Vectorized reader +vector-at-a-time microbenchmark Up to 8x speed up when reading 10M row from a single column table
  • 29.
    Vectorized reader andwriter + vector-at-a-time microbenchmark Up to 3.5x speed up when reading and writing 10M row from a single column table
  • 30.
    Agenda • Spark atFacebook • Row format vs Columnar format • Row-at-a-time vs Vector-at-a-time processing • Performance results • Road ahead
  • 31.
    Road ahead • SIMD/Avoidbranching/prefetching • Optimize codegen code to trigger auto- vectorization in JVM • Customize JVM to make better compiler optimization decisions for Spark 1 1 3 6 SELECT COUNT(*) FROM user_os_info WHERE user_id > 1 1 1 1 1 0 0 1 1 > > > > ↓ ↓ ↓ ↓ ⟶ 2
  • 32.
    Road ahead • Efficientin memory data format • Use Apache Arrow as in memory format • Encoded columnar in memory format • Use columnar format for shuffle • Filter/projection push down to reader • Avoid decoding cost android ios ios android 0 1 1 0 0 android 1 ios Dictionary encoding SELECT COUNT(*) FROM user_os_info WHERE os = ‘ios’
  • 33.