Data Reliability for Data Lakes Michael Armbrust @michaelarmbrust
1. Collect Everything • Recommendation Engines • Risk, Fraud Detection • IoT & Predictive Maintenance • Genomics & DNA Sequencing 3. Data Science & Machine Learning 2. Store it all in the Data Lake The Promise of the Data Lake Garbage In Garbage Stored Garbage Out 🔥 🔥 🔥 🔥🔥 🔥 🔥
What does a typical data lake project look like?
Evolution of a Cutting-Edge Data Lake Events ? AI & Reporting Streaming Analytics Data Lake
Evolution of a Cutting-Edge Data Lake Events AI & Reporting Streaming Analytics Data Lake
Challenge #1: Historical Queries? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events λ-arch1 1 1
Challenge #2: Messy Data? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation 1 21 1 2
Reprocessing Challenge #3: Mistakes and Failures? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation Reprocessing Partitioned 1 2 3 1 1 3 2
Reprocessing Challenge #4: Updates? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation Reprocessing Updates Partitioned UPDATE & MERGE Scheduled to Avoid Modifications 1 2 3 1 1 3 4 4 4 2
Wasting Time & Money Solving Systems Problems Instead of Extracting Value From Data
Data Lake Distractions No atomicity means failed production jobs leave data in corrupt state requiring tedious recovery ✗ No quality enforcement creates inconsistent and unusable data No consistency / isolation makes it almost impossible to mix appends and reads, batch and streaming
Let’s try it instead with
Reprocessing Challenges of the Data Lake Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation Reprocessing Updates Partitioned UPDATE & MERGE Scheduled to Avoid Modifications 1 2 3 1 1 3 4 4 4 2
AI & Reporting Streaming Analytics The Architecture Data Lake CSV, JSON, TXT… Kinesis
AI & Reporting Streaming Analytics The Architecture Data Lake CSV, JSON, TXT… Kinesis Full ACID Transaction Focus on your data flow, instead of worrying about failures.
AI & Reporting Streaming Analytics The Architecture Data Lake CSV, JSON, TXT… Kinesis Open Standards, Open Source (Apache License) Store petabytes of data without worries of lock-in. Growing community including Presto, Spark and more.
AI & Reporting Streaming Analytics The Architecture Data Lake CSV, JSON, TXT… Kinesis Powered by Unifies Streaming / Batch. Convert existing jobs with minimal modifications.
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Quality Delta Lake allows you to incrementally improve the quality of your data until it is ready for consumption. *Data Quality Levels *
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis • Dumping ground for raw data • Often with long retention (years) • Avoid error-prone parsing 🔥
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Intermediate data with some cleanup applied. Queryable for easy debugging!
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Clean data, ready for consumption. Read with Spark or Presto* *Coming Soon
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Streams move data through the Delta Lake • Low-latency or manually triggered • Eliminates management of schedules and jobs
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Delta Lake also supports batch jobs and standard DML UPDATE DELETE MERGE OVERWRITE • Retention • Corrections • GDPR • UPSERTS INSERT *DML Coming in 0.3.0
Data Lake AI & Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Easy to recompute when business logic changes: • Clear tables • Restart streams DELETE DELETE
Who is using ?
Used by 1000s of organizations world wide > 1 exabyte processed last month alone
27 Improved reliability: Petabyte-scale jobs 10x lower compute: 640 instances to 64! Simpler, faster ETL: 84 jobs → 3 jobs halved data latency
How do I use ?
dataframe .write .format("delta") .save("/data") Get Started with Delta using Spark APIs dataframe .write .format("parquet") .save("/data") Instead of parquet... … simply say delta Add Spark Package pyspark --packages io.delta:delta-core_2.12:0.1.0 bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0 <dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>0.1.0</version> </dependency> Maven
Data Quality Enforce metadata, storage, and quality declaratively. table("warehouse") .location(…) // Location on DBFS .schema(…) // Optional strict schema checking .metastoreName(…) // Registration in Hive Metastore .description(…) // Human readable description for users .expect("validTimestamp", // Expectations on data quality "timestamp > 2012-01-01 AND …", "fail / alert / quarantine") *Coming Soon
Data Quality Enforce metadata, storage, and quality declaratively. table("warehouse") .location(…) // Location on DBFS .schema(…) // Optional strict schema checking .metastoreName(…) // Registration in Hive Metastore .description(…) // Human readable description for users .expect("validTimestamp", // Expectations on data quality "timestamp > 2012-01-01 AND …", "fail / alert / quarantine") *Coming Soon
How does work?
Delta On Disk my_table/ _delta_log/ 00000.json 00001.json date=2019-01-01/ file-1.parquet Transaction Log Table Versions (Optional) Partition Directories Data Files
Table = result of a set of actions Change Metadata – name, schema, partitioning, etc Add File – adds a file (with optional statistics) Remove File – removes a file Result: Current Metadata, List of Files, List of Txns, Version
Implementing Atomicity Changes to the table are stored as ordered, atomic units called commits Add 1.parquet Add 2.parquet Remove 1.parquet Remove 2.parquet Add 3.parquet 000000.json 000001.json …
Ensuring Serializablity Need to agree on the order of changes, even when there are multiple writers. 000000.json 000001.json 000002.json User 1 User 2
Solving Conflicts Optimistically 1. Record start version 2. Record reads/writes 3. Attempt commit 4. If someone else wins, check if anything you read has changed. 5. Try again. 000000.json 000001.json 000002.json User 1 User 2 Write: Append Read: Schema Write: Append Read: Schema
Handling Massive Metadata Large tables can have millions of files in them! How do we scale the metadata? Use Spark for scaling! Add 1.parquet Add 2.parquet Remove 1.parquet Remove 2.parquet Add 3.parquet Checkpoint
Road Map • 0.2.0 – Released! • S3 Support • Azure Blob Store and ADLS Support • 0.3.0 (~July) • UPDATE (Scala) • DELETE (Scala) • MERGE (Scala) • VACUUM (Scala) • Rest of Q3 • DDL Support / Hive Metastore • SQL DML Support
Build your own Delta Lake at https://delta.io

Open Source Reliability for Data Lake with Apache Spark by Michael Armbrust

  • 1.
    Data Reliability forData Lakes Michael Armbrust @michaelarmbrust
  • 2.
    1. Collect Everything • RecommendationEngines • Risk, Fraud Detection • IoT & Predictive Maintenance • Genomics & DNA Sequencing 3. Data Science & Machine Learning 2. Store it all in the Data Lake The Promise of the Data Lake Garbage In Garbage Stored Garbage Out 🔥 🔥 🔥 🔥🔥 🔥 🔥
  • 3.
    What does atypical data lake project look like?
  • 4.
    Evolution of aCutting-Edge Data Lake Events ? AI & Reporting Streaming Analytics Data Lake
  • 5.
    Evolution of aCutting-Edge Data Lake Events AI & Reporting Streaming Analytics Data Lake
  • 6.
    Challenge #1: HistoricalQueries? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events λ-arch1 1 1
  • 7.
    Challenge #2: MessyData? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation 1 21 1 2
  • 8.
    Reprocessing Challenge #3: Mistakesand Failures? Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation Reprocessing Partitioned 1 2 3 1 1 3 2
  • 9.
    Reprocessing Challenge #4: Updates? DataLake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation Reprocessing Updates Partitioned UPDATE & MERGE Scheduled to Avoid Modifications 1 2 3 1 1 3 4 4 4 2
  • 10.
    Wasting Time &Money Solving Systems Problems Instead of Extracting Value From Data
  • 11.
    Data Lake Distractions Noatomicity means failed production jobs leave data in corrupt state requiring tedious recovery ✗ No quality enforcement creates inconsistent and unusable data No consistency / isolation makes it almost impossible to mix appends and reads, batch and streaming
  • 12.
    Let’s try itinstead with
  • 13.
    Reprocessing Challenges of theData Lake Data Lake λ-arch λ-arch Streaming Analytics AI & Reporting Events Validation λ-arch Validation Reprocessing Updates Partitioned UPDATE & MERGE Scheduled to Avoid Modifications 1 2 3 1 1 3 4 4 4 2
  • 14.
    AI & Reporting Streaming Analytics TheArchitecture Data Lake CSV, JSON, TXT… Kinesis
  • 15.
    AI & Reporting Streaming Analytics TheArchitecture Data Lake CSV, JSON, TXT… Kinesis Full ACID Transaction Focus on your data flow, instead of worrying about failures.
  • 16.
    AI & Reporting Streaming Analytics TheArchitecture Data Lake CSV, JSON, TXT… Kinesis Open Standards, Open Source (Apache License) Store petabytes of data without worries of lock-in. Growing community including Presto, Spark and more.
  • 17.
    AI & Reporting Streaming Analytics TheArchitecture Data Lake CSV, JSON, TXT… Kinesis Powered by Unifies Streaming / Batch. Convert existing jobs with minimal modifications.
  • 18.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Quality Delta Lake allows you to incrementally improve the quality of your data until it is ready for consumption. *Data Quality Levels *
  • 19.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis • Dumping ground for raw data • Often with long retention (years) • Avoid error-prone parsing 🔥
  • 20.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Intermediate data with some cleanup applied. Queryable for easy debugging!
  • 21.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Clean data, ready for consumption. Read with Spark or Presto* *Coming Soon
  • 22.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Streams move data through the Delta Lake • Low-latency or manually triggered • Eliminates management of schedules and jobs
  • 23.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Delta Lake also supports batch jobs and standard DML UPDATE DELETE MERGE OVERWRITE • Retention • Corrections • GDPR • UPSERTS INSERT *DML Coming in 0.3.0
  • 24.
    Data Lake AI &Reporting Streaming Analytics Business-level Aggregates Filtered, Cleaned Augmented Raw Ingestion The Bronze Silver Gold CSV, JSON, TXT… Kinesis Easy to recompute when business logic changes: • Clear tables • Restart streams DELETE DELETE
  • 25.
  • 26.
    Used by 1000sof organizations world wide > 1 exabyte processed last month alone
  • 27.
    27 Improved reliability: Petabyte-scale jobs 10xlower compute: 640 instances to 64! Simpler, faster ETL: 84 jobs → 3 jobs halved data latency
  • 28.
    How do Iuse ?
  • 29.
    dataframe .write .format("delta") .save("/data") Get Started withDelta using Spark APIs dataframe .write .format("parquet") .save("/data") Instead of parquet... … simply say delta Add Spark Package pyspark --packages io.delta:delta-core_2.12:0.1.0 bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0 <dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>0.1.0</version> </dependency> Maven
  • 30.
    Data Quality Enforce metadata,storage, and quality declaratively. table("warehouse") .location(…) // Location on DBFS .schema(…) // Optional strict schema checking .metastoreName(…) // Registration in Hive Metastore .description(…) // Human readable description for users .expect("validTimestamp", // Expectations on data quality "timestamp > 2012-01-01 AND …", "fail / alert / quarantine") *Coming Soon
  • 31.
    Data Quality Enforce metadata,storage, and quality declaratively. table("warehouse") .location(…) // Location on DBFS .schema(…) // Optional strict schema checking .metastoreName(…) // Registration in Hive Metastore .description(…) // Human readable description for users .expect("validTimestamp", // Expectations on data quality "timestamp > 2012-01-01 AND …", "fail / alert / quarantine") *Coming Soon
  • 32.
  • 33.
  • 34.
    Table = resultof a set of actions Change Metadata – name, schema, partitioning, etc Add File – adds a file (with optional statistics) Remove File – removes a file Result: Current Metadata, List of Files, List of Txns, Version
  • 35.
    Implementing Atomicity Changes tothe table are stored as ordered, atomic units called commits Add 1.parquet Add 2.parquet Remove 1.parquet Remove 2.parquet Add 3.parquet 000000.json 000001.json …
  • 36.
    Ensuring Serializablity Need toagree on the order of changes, even when there are multiple writers. 000000.json 000001.json 000002.json User 1 User 2
  • 37.
    Solving Conflicts Optimistically 1.Record start version 2. Record reads/writes 3. Attempt commit 4. If someone else wins, check if anything you read has changed. 5. Try again. 000000.json 000001.json 000002.json User 1 User 2 Write: Append Read: Schema Write: Append Read: Schema
  • 38.
    Handling Massive Metadata Largetables can have millions of files in them! How do we scale the metadata? Use Spark for scaling! Add 1.parquet Add 2.parquet Remove 1.parquet Remove 2.parquet Add 3.parquet Checkpoint
  • 39.
    Road Map • 0.2.0– Released! • S3 Support • Azure Blob Store and ADLS Support • 0.3.0 (~July) • UPDATE (Scala) • DELETE (Scala) • MERGE (Scala) • VACUUM (Scala) • Rest of Q3 • DDL Support / Hive Metastore • SQL DML Support
  • 40.
    Build your ownDelta Lake at https://delta.io