The Spark File Format Ecosystem Vinoo Ganesh Chief Technology Officer, Veraset
Agenda About Veraset Session Goals On-Disk Storage OLTP/OLAP Workflows File Format Deep Dive Case: Veraset Case: Parquet Pruning Looking Forward Questions
About Veraset About Me ▪ CTO at Veraset ▪ (Formerly) Lead of Compute / Spark at Palantir Technologies Data-as-a-Service (DaaS) Startup Anonymized Geospatial Data ▪ Centered around population movement ▪ Model training at scale ▪ Heavily used during COVID-19 investigations / analyses Process, Cleanse, Optimize, and Deliver >2 PB Data Yearly Data is Our Product ▪ We don’t build analytical tools ▪ No fancy visualizations ▪ Optimized data storage, retrieval, and processing are our lifeblood ▪ “Just Data” We’re Hiring! vinoo@veraset.com
Session Goals On-disk storage ▪ Row, Column, Hybrid Introduce OLTP / OLAP workflows Explain feature set of formats Inspect formats Explore configuration for formats Look forward We can’t cover everything about file formats in 30 minutes, so let’s hit the high points.
File Formats ▪ Text ▪ CSV * ▪ TSV * ▪ JSON ▪ XML Semi-StructuredUnstructured ▪ Avro ▪ ORC ▪ Parquet Structured Bolded formats will be covered in this session * Can be considered ”semi-structured”
On-Disk Storage Data is stored on hard drives in “blocks” Disk loads a “block” into memory at a time ▪ Block is the minimum amount of data read during a read Reading unnecessary data == expensive! Reading fragmented data == expensive! Random seek == expensive! Sequential read/writes strongly preferred Insight: Lay data on disk in a manner optimized for your workflows ▪ Common categorizations for these workflows: OLTP/OLAP https://bit.ly/2TG7SJw
Example Data A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Column A Column B Column C Row 0 Row 1 Row 2 Row 3
Example Data Column A Column B Column C Row 0 Row 1 Row 2 Row 3 A0 C0B0 A1 A2 A3 B1 B2 B3 C1 C2 C3
Row-wise Storage A0 C0B0 A1 A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Visually Block 1 B1 A2C1 B2 Block 2 C2 B3A3 C3 Block 3 On Disk
Columnar (Column-wise) Storage A0 A2A1 A3 Block 1 B0 B2B1 B3 Block 2 C0 C2C1 C3 Block 3 A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Visually On Disk
Hybrid Storage A0 B0A1 B1 Row Group 1 C0 A2C1 A3 Row Group 2 B2 C2B3 C3 A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Visually Logical Row Groups
Hybrid Storage A0 B0A1 B1 Block 1 C0 A2C1 A3 Block 2 B2 C2B3 C3 Block 3 On Disk A0 B0A1 B1 Row Group 1 C0 A2C1 A3 Row Group 2 B2 C2B3 C3 Logical Row Groups In Parquet – aim to fit one row group in one block
Summary: Physical Layout Row-wise formats are best for write-heavy (transactional) workflows Columnar formats are best optimized for read-heavy (analytical) workflows Hybrid formats combine both methodologies
OLTP / OLAP
OLTP / OLAP Workflows Online Transaction Processing (OLTP) ▪ Larger numbers of short queries / transactions ▪ More processing than analysis focused ▪ Geared towards record (row) based processing than column based ▪ More frequent data updates / deletes (transactions) Online Analytical Processing (OLAP) ▪ More analysis than processing focused ▪ Geared towards column-based data analytics processing ▪ Less frequent transactions ▪ More analytic complexity per query Insight: Data access patterns should inform the selection of file formats
Example Data student_id subject score Row 0 Row 1 Row 2 Row 3 71 97.44math 33 101 13 history geography physics 88.32 73.11 87.78
Unstructured File Format: CSV
About: CSV CSV developed by IBM in 1972 ▪ Ease of typing CSV lists on punched cards Flexible (not always good) Row-based Human Readable Compressible Splittable ▪ When raw / using spittable format Supported Natively Fast (from a write perspective) Comma Separated Value (CSV) $ cat myTable.csv "student_id","subject","score" 71,"math",97.44 33,"history",88.32 101,"geography",73.11 13,"physics",87.78 scala> val table = spark.read.option("header","true") .option("inferSchema", "true").csv("myTable.csv") table: org.apache.spark.sql.DataFrame = [student_id: int, subject: string ... 1 more field] scala> table.printSchema root |-- student_id: integer (nullable = true) |-- subject: string (nullable = true) |-- score: double (nullable = true) scala> table.show +----------+---------+-----+ |student_id| subject|score| +----------+---------+-----+ | 71| math|97.44| | 33| history|88.32| | 101|geography|73.11| | 13| physics|87.78| +----------+---------+-----+ * Some formatting applied
Semi-Structured File Format: JSON
About: JSON Specified in early 2000s Self-Describing Row-based Human Readable Compressible Splittable (in some cases) Supported natively in Spark Supports Complex Data Types Fast (from a write perspective) JSON (JavaScript Object Notation) $ cat myTable.json {"student_id":71,"subject":"math","score":97.44} {"student_id":33,"subject":"history","score":88.32} {"student_id":101,"subject":"geography","score":73.11} {"student_id":13,"subject":"physics","score":87.78} scala> val table = spark.read.json("myTable.json") table: org.apache.spark.sql.DataFrame = [score: double, student_id: bigint ... 1 more field] scala> table.show +-----+----------+---------+ |score|student_id| subject| +-----+----------+---------+ |97.44| 71| math| |88.32| 33| history| |73.11| 101|geography| |87.78| 13| physics| +-----+----------+---------+ * Some formatting applied
Structured File Formats: Avro, ORC, Parquet
About: Avro Data Format + Serialization Format Self-Describing ▪ Schema evolution Row-based ▪ Optimized for write-intensive applications Binary Format – Schema stored inside of file (as JSON) Compressible Splittable Supported by external library in Spark Supports rich data structures
Inspecting: Avro $ avro-tools tojson part-00000-tid-8566179657485710941-115d547d-6b9a-43cf-957a-c549d3243afb-3-1-c000.avro {"student_id":{"int":71},"subject":{"string":"math"},"score":{"double":97.44}} {"student_id":{"int":33},"subject":{"string":"history"},"score":{"double":88.32}} {"student_id":{"int":101},"subject":{"string":"geography"},"score":{"double":73.11}} {"student_id":{"int":13},"subject":{"string":"physics"},"score":{"double":87.78}} $ avro-tools getmeta part-00000-tid-8566179657485710941-115d547d-6b9a-43cf-957a-c549d3243afb-3-1-c000.avro avro.schema { "type" : "record", "name" : "topLevelRecord", "fields" : [ { "name" : "student_id", "type" : [ "int", "null" ] }, { "name" : "subject", "type" : [ "string", "null" ] }, { "name" : "score", "type" : [ "double", "null" ] } ] } avro.codec snappy * Some formatting applied
Config: Avro spark.sql.avro.compression.codec ▪ What: Compression codec used in writing of AVRO files ▪ Options: {uncompressed, deflate, snappy, bzip2, xz} spark.sql.avro.deflate.level ▪ What: Compression level for the deflate codec ▪ Options: {-1,1..9} * Default value is underlined
About: ORC Next iteration of Hive RCFile ▪ Created in 2013 as part of Stinger initiative to speed up Hive Self-Describing Hybrid-Based (rows grouped by row groups, then column partitioned) ▪ Optimized for read-intensive applications Binary Format – Schema stored inside of file (in metadata) Compressible Splittable Supported by natively in Spark Supports rich data structures ▪ Hive data Type Support (including compound types): struct, list, map, union
Structure: ORC Row groups called Stripes Index Data contain column min/max values and row positions within each column ▪ Bit field / bloom filter as well (if included) ▪ Used for selection of stripes / row groups, not for answering queries Row Data contains the actual data Stripe Footer contain directory of stream locations Postscript contains compression parameters and size of the compressed footer https://bit.ly/2A7AlS1
Inspecting: ORC $ orc-tools meta part-00000-34aef610-c8d4-46fb-84c9- b43887d2b37e-c000.snappy.orc Processing data file part-00000-34aef610-c8d4-46fb-84c9- b43887d2b37e-c000.snappy.orc [length: 574] Structure for part-00000-34aef610-c8d4-46fb-84c9-b43887d2b37e- c000.snappy.orc File Version: 0.12 with ORC_135 Rows: 4 Compression: SNAPPY Compression size: 262144 Type: struct<score:double,student_id:bigint,subject:string> Stripe Statistics: Stripe 1: Column 0: count: 4 hasNull: false Column 1: count: 4 hasNull: false bytesOnDisk: 35 min: 73.11 max: 97.44 sum: 346.65 Column 2: count: 4 hasNull: false bytesOnDisk: 9 min: 13 max: 101 sum: 218 Column 3: count: 4 hasNull: false bytesOnDisk: 37 min: geography max: physics sum: 27 File Statistics: Column 0: count: 4 hasNull: false Column 1: count: 4 hasNull: false bytesOnDisk: 35 min: 73.11 max: 97.44 sum: 346.65 Column 2: count: 4 hasNull: false bytesOnDisk: 9 min: 13 min: 101 sum: 218 Column 3: count: 4 hasNull: false bytesOnDisk: 37 min: geography max: physics sum: 27 Stripes: Stripe: offset: 3 data: 81 rows: 4 tail: 75 index: 123 Stream: column 0 section ROW_INDEX start: 3 length 11 Stream: column 1 section ROW_INDEX start: 14 length 44 Stream: column 2 section ROW_INDEX start: 58 length 26 Stream: column 3 section ROW_INDEX start: 84 length 42 Stream: column 1 section DATA start: 126 length 35 Stream: column 2 section DATA start: 161 length 9 Stream: column 3 section DATA start: 170 length 30 Stream: column 3 section LENGTH start: 200 length 7 Encoding column 0: DIRECT Encoding column 1: DIRECT Encoding column 2: DIRECT_V2 Encoding column 3: DIRECT_V2 File length: 574 bytes Padding length: 0 bytes Padding ratio: 0% * Some formatting applied
Config: ORC spark.sql.orc.impl ▪ What: The name of ORC implementation ▪ Options: {native, hive} spark.sql.orc.compression.codec ▪ What: Compression codec used when writing ORC files ▪ Options: {none, uncompressed, snappy, zlib, lzo} spark.sql.orc.mergeSchema ▪ What: (3.0+) ORC data source should merge schemas from all files (else, picked at random) ▪ Options: {true, false} spark.sql.orc.columnarReaderBatchSize ▪ What: Number of rows to include in a ORC vectorized reader batch. ▪ Options: Int ▪ Default: 4096 spark.sql.orc.filterPushdown ▪ What: Enable filter pushdown for ORC files ▪ Options: {true, false} spark.sql.orc.enableVectorizedReader ▪ What: Enables vectorized orc decoding ▪ Options: {true, false} * Default value is underlined
About: Parquet Originally built by Twitter and Cloudera Self-Describing Hybrid-Based (rows grouped by row groups, then column partitioned) ▪ Optimized for read-intensive applications Binary Format – Schema stored inside of file Compressible Splittable Supported by natively in Spark Supports rich data structures
Structure: Parquet Row Groups are a logical horizontal partitioning of the data into rows ▪ Consists of a column chunk for each column in the dataset Column chunk are chunks of the data for a column ▪ Guaranteed to be contiguous in the file Pages make up column chunks ▪ A page is conceptually an indivisible unit (in terms of compression and encoding) File metadata contains the locations of all the column metadata start locations
Inspecting: Parquet (1) $ parquet-tools meta part-00000-5adea6d5-53ae-49cc-8f70-a7365519b6bf-c000.snappy.parquet file: file:part-00000-5adea6d5-53ae-49cc-8f70-a7365519b6bf-c000.snappy.parquet creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"score","type":"double","nullable":true,"metadata":{}},{"name":"student _id","type":"long","nullable":true,"metadata":{}},{"name":"subject","type":"string","nullable":true,"metad ata":{}}]} file schema: spark_schema -------------------------------------------------------------------------------- score: OPTIONAL DOUBLE R:0 D:1 student_id: OPTIONAL INT64 R:0 D:1 subject: OPTIONAL BINARY O:UTF8 R:0 D:1 row group 1: RC:4 TS:288 OFFSET:4 -------------------------------------------------------------------------------- score: DOUBLE SNAPPY DO:0 FPO:4 SZ:101/99/0.98 VC:4 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 73.11, max: 97.44, num_nulls: 0] student_id: INT64 SNAPPY DO:0 FPO:105 SZ:95/99/1.04 VC:4 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 13, max: 101, num_nulls: 0] subject: BINARY SNAPPY DO:0 FPO:200 SZ:92/90/0.98 VC:4 ENC:PLAIN,BIT_PACKED,RLE ST:[min: geography, max: physics, num_nulls: 0] * Some formatting applied
Inspecting: Parquet (2) $ parquet-tools dump part-00000-5adea6d5-53ae-49cc-8f70-a7365519b6bf- c000.snappy.parquet row group 0 ------------------------------------------------------------------------- score: DOUBLE SNAPPY DO:0 FPO:4 SZ:101/99/0.98 VC:4 ENC:RLE,BIT_ [more]... student_id: INT64 SNAPPY DO:0 FPO:105 SZ:95/99/1.04 VC:4 ENC:RLE,BIT_ [more]... subject: BINARY SNAPPY DO:0 FPO:200 SZ:92/90/0.98 VC:4 ENC:RLE,BIT [more]... score TV=4 RL=0 DL=1 --------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 73.11, max: 97. [more]... VC:4 student_id TV=4 RL=0 DL=1 --------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 13, max: 101, n [more]... VC:4 subject TV=4 RL=0 DL=1 --------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: geography, max: [more]... VC:4 DOUBLE score --------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:97.44 value 2: R:0 D:1 V:88.32 value 3: R:0 D:1 V:73.11 value 4: R:0 D:1 V:87.78 INT64 student_id --------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:71 value 2: R:0 D:1 V:33 value 3: R:0 D:1 V:101 value 4: R:0 D:1 V:13 BINARY subject --------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:math value 2: R:0 D:1 V:history value 3: R:0 D:1 V:geography value 4: R:0 D:1 V:physics * Some formatting applied
Config: Parquet spark.sql.parquet.compression.codec ▪ What: Compression codec used when writing Parquet files ▪ Options: {none, uncompressed, snappy, gzip, lzo, lz4, brotli, zstd} spark.sql.parquet.mergeSchema ▪ What: Parquet data source merges schemas collected from all data file (else, picked at random) ▪ Options: {true, false} spark.sql.parquet.columnarReaderBatchSize ▪ What: Number of rows to include in a parquet vectorized reader batch ▪ Options: Int ▪ Default: 4096 spark.sql.parquet.enableVectorizedReader ▪ What: Enables vectorized parquet decoding ▪ Options: {true, false} spark.sql.parquet.filterPushdown ▪ What: Enables Parquet filter push-down optimization ▪ Options: {true, false} ▪ Similar: ▪ spark.sql.parquet.filterPushdown.date ▪ spark.sql.parquet.filterPushdown.timestamp ▪ spark.sql.parquet.filterPushdown.decimal ▪ spark.sql.parquet.filterPushdown.string.startsWith ▪ spark.sql.parquet.pushdown.inFilterThreshold ▪ spark.sql.parquet.recordLevelFilter.enabled * Default value is underlined
Case Study: Veraset
Case Study: Veraset Veraset processes and delivers 3+ TB data daily Historically processed and delivered data in CSV ▪ Pipeline runtime ~5.5 hours OLAP Workflow ▪ Data used by read-intensive applications ▪ Schema fixed (no schema evolution) ▪ Strictly typed and fixed columns ▪ Heavy analytics / aggregations performed on data ▪ Processing-heavy workflow ▪ Frequently read data – Snappy > GZip Migration to snappy compressed parquet ▪ Pipeline runtime ~2.12 hours Migration from CSV -> snappy compressed Parquet
Disclaimer: Software can have bugs.
Case Study: Parquet Bug
Case Study: Parquet Partition Pruning Bug Data formats are software and can have bugs - PARQUET-1246 Sort order not specified for -0.0/+0.0 and NaN, leading to incorrect partition pruning If NaN or -0.0/+0.0 first row in group, entire row groups would be pruned out Conclusion: Make sure you are frequently updating your data format version to get bug fixes and performance improvements
Looking Forward: Apache Arrow Complements (not competes with) on-disk formats and storage technologies to promote data exchange / interoperability ▪ Interfaces between systems (ie. Python <> JVM) Columnar layout in memory, optimized for data locality Zero-Copy reads + minimizes SerDe Overhead Cache-efficient in OLAP workloads Organized for SIMD optimizations Flexible data model In Memory Data Format
Final Thoughts Think critically about your workflows and needs – OLTP and OLAP, schema evolution, etc.. Migrating to formats optimized for your workflows can be easy performance wins Perform load and scale testing of your format before moving to production Don’t neglect the impact of compression codecs on your IO performance Keep format libraries up-to-date
Thank you! vinoo.ganesh@gmail.com
Feedback Your feedback is important to us. Don’t forget to rate and review the sessions.
The Apache Spark File Format Ecosystem

The Apache Spark File Format Ecosystem

  • 2.
    The Spark FileFormat Ecosystem Vinoo Ganesh Chief Technology Officer, Veraset
  • 3.
    Agenda About Veraset Session Goals On-DiskStorage OLTP/OLAP Workflows File Format Deep Dive Case: Veraset Case: Parquet Pruning Looking Forward Questions
  • 4.
    About Veraset About Me ▪CTO at Veraset ▪ (Formerly) Lead of Compute / Spark at Palantir Technologies Data-as-a-Service (DaaS) Startup Anonymized Geospatial Data ▪ Centered around population movement ▪ Model training at scale ▪ Heavily used during COVID-19 investigations / analyses Process, Cleanse, Optimize, and Deliver >2 PB Data Yearly Data is Our Product ▪ We don’t build analytical tools ▪ No fancy visualizations ▪ Optimized data storage, retrieval, and processing are our lifeblood ▪ “Just Data” We’re Hiring! vinoo@veraset.com
  • 5.
    Session Goals On-disk storage ▪Row, Column, Hybrid Introduce OLTP / OLAP workflows Explain feature set of formats Inspect formats Explore configuration for formats Look forward We can’t cover everything about file formats in 30 minutes, so let’s hit the high points.
  • 6.
    File Formats ▪ Text ▪CSV * ▪ TSV * ▪ JSON ▪ XML Semi-StructuredUnstructured ▪ Avro ▪ ORC ▪ Parquet Structured Bolded formats will be covered in this session * Can be considered ”semi-structured”
  • 7.
    On-Disk Storage Data isstored on hard drives in “blocks” Disk loads a “block” into memory at a time ▪ Block is the minimum amount of data read during a read Reading unnecessary data == expensive! Reading fragmented data == expensive! Random seek == expensive! Sequential read/writes strongly preferred Insight: Lay data on disk in a manner optimized for your workflows ▪ Common categorizations for these workflows: OLTP/OLAP https://bit.ly/2TG7SJw
  • 8.
    Example Data A0 B0C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Column A Column B Column C Row 0 Row 1 Row 2 Row 3
  • 9.
    Example Data Column AColumn B Column C Row 0 Row 1 Row 2 Row 3 A0 C0B0 A1 A2 A3 B1 B2 B3 C1 C2 C3
  • 10.
    Row-wise Storage A0 C0B0A1 A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Visually Block 1 B1 A2C1 B2 Block 2 C2 B3A3 C3 Block 3 On Disk
  • 11.
    Columnar (Column-wise) Storage A0A2A1 A3 Block 1 B0 B2B1 B3 Block 2 C0 C2C1 C3 Block 3 A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Visually On Disk
  • 12.
    Hybrid Storage A0 B0A1B1 Row Group 1 C0 A2C1 A3 Row Group 2 B2 C2B3 C3 A0 B0 C0 A1 B1 C1 A2 B2 C2 A3 B3 C3 Visually Logical Row Groups
  • 13.
    Hybrid Storage A0 B0A1B1 Block 1 C0 A2C1 A3 Block 2 B2 C2B3 C3 Block 3 On Disk A0 B0A1 B1 Row Group 1 C0 A2C1 A3 Row Group 2 B2 C2B3 C3 Logical Row Groups In Parquet – aim to fit one row group in one block
  • 14.
    Summary: Physical Layout Row-wiseformats are best for write-heavy (transactional) workflows Columnar formats are best optimized for read-heavy (analytical) workflows Hybrid formats combine both methodologies
  • 15.
  • 16.
    OLTP / OLAPWorkflows Online Transaction Processing (OLTP) ▪ Larger numbers of short queries / transactions ▪ More processing than analysis focused ▪ Geared towards record (row) based processing than column based ▪ More frequent data updates / deletes (transactions) Online Analytical Processing (OLAP) ▪ More analysis than processing focused ▪ Geared towards column-based data analytics processing ▪ Less frequent transactions ▪ More analytic complexity per query Insight: Data access patterns should inform the selection of file formats
  • 17.
    Example Data student_id subjectscore Row 0 Row 1 Row 2 Row 3 71 97.44math 33 101 13 history geography physics 88.32 73.11 87.78
  • 18.
  • 19.
    About: CSV CSV developedby IBM in 1972 ▪ Ease of typing CSV lists on punched cards Flexible (not always good) Row-based Human Readable Compressible Splittable ▪ When raw / using spittable format Supported Natively Fast (from a write perspective) Comma Separated Value (CSV) $ cat myTable.csv "student_id","subject","score" 71,"math",97.44 33,"history",88.32 101,"geography",73.11 13,"physics",87.78 scala> val table = spark.read.option("header","true") .option("inferSchema", "true").csv("myTable.csv") table: org.apache.spark.sql.DataFrame = [student_id: int, subject: string ... 1 more field] scala> table.printSchema root |-- student_id: integer (nullable = true) |-- subject: string (nullable = true) |-- score: double (nullable = true) scala> table.show +----------+---------+-----+ |student_id| subject|score| +----------+---------+-----+ | 71| math|97.44| | 33| history|88.32| | 101|geography|73.11| | 13| physics|87.78| +----------+---------+-----+ * Some formatting applied
  • 20.
  • 21.
    About: JSON Specified inearly 2000s Self-Describing Row-based Human Readable Compressible Splittable (in some cases) Supported natively in Spark Supports Complex Data Types Fast (from a write perspective) JSON (JavaScript Object Notation) $ cat myTable.json {"student_id":71,"subject":"math","score":97.44} {"student_id":33,"subject":"history","score":88.32} {"student_id":101,"subject":"geography","score":73.11} {"student_id":13,"subject":"physics","score":87.78} scala> val table = spark.read.json("myTable.json") table: org.apache.spark.sql.DataFrame = [score: double, student_id: bigint ... 1 more field] scala> table.show +-----+----------+---------+ |score|student_id| subject| +-----+----------+---------+ |97.44| 71| math| |88.32| 33| history| |73.11| 101|geography| |87.78| 13| physics| +-----+----------+---------+ * Some formatting applied
  • 22.
    Structured File Formats:Avro, ORC, Parquet
  • 23.
    About: Avro Data Format+ Serialization Format Self-Describing ▪ Schema evolution Row-based ▪ Optimized for write-intensive applications Binary Format – Schema stored inside of file (as JSON) Compressible Splittable Supported by external library in Spark Supports rich data structures
  • 24.
    Inspecting: Avro $ avro-toolstojson part-00000-tid-8566179657485710941-115d547d-6b9a-43cf-957a-c549d3243afb-3-1-c000.avro {"student_id":{"int":71},"subject":{"string":"math"},"score":{"double":97.44}} {"student_id":{"int":33},"subject":{"string":"history"},"score":{"double":88.32}} {"student_id":{"int":101},"subject":{"string":"geography"},"score":{"double":73.11}} {"student_id":{"int":13},"subject":{"string":"physics"},"score":{"double":87.78}} $ avro-tools getmeta part-00000-tid-8566179657485710941-115d547d-6b9a-43cf-957a-c549d3243afb-3-1-c000.avro avro.schema { "type" : "record", "name" : "topLevelRecord", "fields" : [ { "name" : "student_id", "type" : [ "int", "null" ] }, { "name" : "subject", "type" : [ "string", "null" ] }, { "name" : "score", "type" : [ "double", "null" ] } ] } avro.codec snappy * Some formatting applied
  • 25.
    Config: Avro spark.sql.avro.compression.codec ▪ What:Compression codec used in writing of AVRO files ▪ Options: {uncompressed, deflate, snappy, bzip2, xz} spark.sql.avro.deflate.level ▪ What: Compression level for the deflate codec ▪ Options: {-1,1..9} * Default value is underlined
  • 26.
    About: ORC Next iterationof Hive RCFile ▪ Created in 2013 as part of Stinger initiative to speed up Hive Self-Describing Hybrid-Based (rows grouped by row groups, then column partitioned) ▪ Optimized for read-intensive applications Binary Format – Schema stored inside of file (in metadata) Compressible Splittable Supported by natively in Spark Supports rich data structures ▪ Hive data Type Support (including compound types): struct, list, map, union
  • 27.
    Structure: ORC Row groupscalled Stripes Index Data contain column min/max values and row positions within each column ▪ Bit field / bloom filter as well (if included) ▪ Used for selection of stripes / row groups, not for answering queries Row Data contains the actual data Stripe Footer contain directory of stream locations Postscript contains compression parameters and size of the compressed footer https://bit.ly/2A7AlS1
  • 28.
    Inspecting: ORC $ orc-toolsmeta part-00000-34aef610-c8d4-46fb-84c9- b43887d2b37e-c000.snappy.orc Processing data file part-00000-34aef610-c8d4-46fb-84c9- b43887d2b37e-c000.snappy.orc [length: 574] Structure for part-00000-34aef610-c8d4-46fb-84c9-b43887d2b37e- c000.snappy.orc File Version: 0.12 with ORC_135 Rows: 4 Compression: SNAPPY Compression size: 262144 Type: struct<score:double,student_id:bigint,subject:string> Stripe Statistics: Stripe 1: Column 0: count: 4 hasNull: false Column 1: count: 4 hasNull: false bytesOnDisk: 35 min: 73.11 max: 97.44 sum: 346.65 Column 2: count: 4 hasNull: false bytesOnDisk: 9 min: 13 max: 101 sum: 218 Column 3: count: 4 hasNull: false bytesOnDisk: 37 min: geography max: physics sum: 27 File Statistics: Column 0: count: 4 hasNull: false Column 1: count: 4 hasNull: false bytesOnDisk: 35 min: 73.11 max: 97.44 sum: 346.65 Column 2: count: 4 hasNull: false bytesOnDisk: 9 min: 13 min: 101 sum: 218 Column 3: count: 4 hasNull: false bytesOnDisk: 37 min: geography max: physics sum: 27 Stripes: Stripe: offset: 3 data: 81 rows: 4 tail: 75 index: 123 Stream: column 0 section ROW_INDEX start: 3 length 11 Stream: column 1 section ROW_INDEX start: 14 length 44 Stream: column 2 section ROW_INDEX start: 58 length 26 Stream: column 3 section ROW_INDEX start: 84 length 42 Stream: column 1 section DATA start: 126 length 35 Stream: column 2 section DATA start: 161 length 9 Stream: column 3 section DATA start: 170 length 30 Stream: column 3 section LENGTH start: 200 length 7 Encoding column 0: DIRECT Encoding column 1: DIRECT Encoding column 2: DIRECT_V2 Encoding column 3: DIRECT_V2 File length: 574 bytes Padding length: 0 bytes Padding ratio: 0% * Some formatting applied
  • 29.
    Config: ORC spark.sql.orc.impl ▪ What:The name of ORC implementation ▪ Options: {native, hive} spark.sql.orc.compression.codec ▪ What: Compression codec used when writing ORC files ▪ Options: {none, uncompressed, snappy, zlib, lzo} spark.sql.orc.mergeSchema ▪ What: (3.0+) ORC data source should merge schemas from all files (else, picked at random) ▪ Options: {true, false} spark.sql.orc.columnarReaderBatchSize ▪ What: Number of rows to include in a ORC vectorized reader batch. ▪ Options: Int ▪ Default: 4096 spark.sql.orc.filterPushdown ▪ What: Enable filter pushdown for ORC files ▪ Options: {true, false} spark.sql.orc.enableVectorizedReader ▪ What: Enables vectorized orc decoding ▪ Options: {true, false} * Default value is underlined
  • 30.
    About: Parquet Originally builtby Twitter and Cloudera Self-Describing Hybrid-Based (rows grouped by row groups, then column partitioned) ▪ Optimized for read-intensive applications Binary Format – Schema stored inside of file Compressible Splittable Supported by natively in Spark Supports rich data structures
  • 31.
    Structure: Parquet Row Groupsare a logical horizontal partitioning of the data into rows ▪ Consists of a column chunk for each column in the dataset Column chunk are chunks of the data for a column ▪ Guaranteed to be contiguous in the file Pages make up column chunks ▪ A page is conceptually an indivisible unit (in terms of compression and encoding) File metadata contains the locations of all the column metadata start locations
  • 32.
    Inspecting: Parquet (1) $parquet-tools meta part-00000-5adea6d5-53ae-49cc-8f70-a7365519b6bf-c000.snappy.parquet file: file:part-00000-5adea6d5-53ae-49cc-8f70-a7365519b6bf-c000.snappy.parquet creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"score","type":"double","nullable":true,"metadata":{}},{"name":"student _id","type":"long","nullable":true,"metadata":{}},{"name":"subject","type":"string","nullable":true,"metad ata":{}}]} file schema: spark_schema -------------------------------------------------------------------------------- score: OPTIONAL DOUBLE R:0 D:1 student_id: OPTIONAL INT64 R:0 D:1 subject: OPTIONAL BINARY O:UTF8 R:0 D:1 row group 1: RC:4 TS:288 OFFSET:4 -------------------------------------------------------------------------------- score: DOUBLE SNAPPY DO:0 FPO:4 SZ:101/99/0.98 VC:4 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 73.11, max: 97.44, num_nulls: 0] student_id: INT64 SNAPPY DO:0 FPO:105 SZ:95/99/1.04 VC:4 ENC:PLAIN,BIT_PACKED,RLE ST:[min: 13, max: 101, num_nulls: 0] subject: BINARY SNAPPY DO:0 FPO:200 SZ:92/90/0.98 VC:4 ENC:PLAIN,BIT_PACKED,RLE ST:[min: geography, max: physics, num_nulls: 0] * Some formatting applied
  • 33.
    Inspecting: Parquet (2) $parquet-tools dump part-00000-5adea6d5-53ae-49cc-8f70-a7365519b6bf- c000.snappy.parquet row group 0 ------------------------------------------------------------------------- score: DOUBLE SNAPPY DO:0 FPO:4 SZ:101/99/0.98 VC:4 ENC:RLE,BIT_ [more]... student_id: INT64 SNAPPY DO:0 FPO:105 SZ:95/99/1.04 VC:4 ENC:RLE,BIT_ [more]... subject: BINARY SNAPPY DO:0 FPO:200 SZ:92/90/0.98 VC:4 ENC:RLE,BIT [more]... score TV=4 RL=0 DL=1 --------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 73.11, max: 97. [more]... VC:4 student_id TV=4 RL=0 DL=1 --------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: 13, max: 101, n [more]... VC:4 subject TV=4 RL=0 DL=1 --------------------------------------------------------------------- page 0: DLE:RLE RLE:BIT_PACKED VLE:PLAIN ST:[min: geography, max: [more]... VC:4 DOUBLE score --------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:97.44 value 2: R:0 D:1 V:88.32 value 3: R:0 D:1 V:73.11 value 4: R:0 D:1 V:87.78 INT64 student_id --------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:71 value 2: R:0 D:1 V:33 value 3: R:0 D:1 V:101 value 4: R:0 D:1 V:13 BINARY subject --------------------------------------- *** row group 1 of 1, values 1 to 4 *** value 1: R:0 D:1 V:math value 2: R:0 D:1 V:history value 3: R:0 D:1 V:geography value 4: R:0 D:1 V:physics * Some formatting applied
  • 34.
    Config: Parquet spark.sql.parquet.compression.codec ▪ What:Compression codec used when writing Parquet files ▪ Options: {none, uncompressed, snappy, gzip, lzo, lz4, brotli, zstd} spark.sql.parquet.mergeSchema ▪ What: Parquet data source merges schemas collected from all data file (else, picked at random) ▪ Options: {true, false} spark.sql.parquet.columnarReaderBatchSize ▪ What: Number of rows to include in a parquet vectorized reader batch ▪ Options: Int ▪ Default: 4096 spark.sql.parquet.enableVectorizedReader ▪ What: Enables vectorized parquet decoding ▪ Options: {true, false} spark.sql.parquet.filterPushdown ▪ What: Enables Parquet filter push-down optimization ▪ Options: {true, false} ▪ Similar: ▪ spark.sql.parquet.filterPushdown.date ▪ spark.sql.parquet.filterPushdown.timestamp ▪ spark.sql.parquet.filterPushdown.decimal ▪ spark.sql.parquet.filterPushdown.string.startsWith ▪ spark.sql.parquet.pushdown.inFilterThreshold ▪ spark.sql.parquet.recordLevelFilter.enabled * Default value is underlined
  • 35.
  • 36.
    Case Study: Veraset Verasetprocesses and delivers 3+ TB data daily Historically processed and delivered data in CSV ▪ Pipeline runtime ~5.5 hours OLAP Workflow ▪ Data used by read-intensive applications ▪ Schema fixed (no schema evolution) ▪ Strictly typed and fixed columns ▪ Heavy analytics / aggregations performed on data ▪ Processing-heavy workflow ▪ Frequently read data – Snappy > GZip Migration to snappy compressed parquet ▪ Pipeline runtime ~2.12 hours Migration from CSV -> snappy compressed Parquet
  • 37.
  • 38.
  • 39.
    Case Study: ParquetPartition Pruning Bug Data formats are software and can have bugs - PARQUET-1246 Sort order not specified for -0.0/+0.0 and NaN, leading to incorrect partition pruning If NaN or -0.0/+0.0 first row in group, entire row groups would be pruned out Conclusion: Make sure you are frequently updating your data format version to get bug fixes and performance improvements
  • 40.
    Looking Forward: ApacheArrow Complements (not competes with) on-disk formats and storage technologies to promote data exchange / interoperability ▪ Interfaces between systems (ie. Python <> JVM) Columnar layout in memory, optimized for data locality Zero-Copy reads + minimizes SerDe Overhead Cache-efficient in OLAP workloads Organized for SIMD optimizations Flexible data model In Memory Data Format
  • 41.
    Final Thoughts Think criticallyabout your workflows and needs – OLTP and OLAP, schema evolution, etc.. Migrating to formats optimized for your workflows can be easy performance wins Perform load and scale testing of your format before moving to production Don’t neglect the impact of compression codecs on your IO performance Keep format libraries up-to-date
  • 42.
  • 43.
    Feedback Your feedback isimportant to us. Don’t forget to rate and review the sessions.