Apache Spark
Programming
With Databricks
©2022 Databricks Inc. — All rights reserved 1
Welcome
©2022 Databricks Inc. — All rights reserved
Course Agenda
Here’s where we’re headed
Databricks Reader DataFrame
DataFrames Introduction Spark SQL
Platform & Writer & Column
Complex Additional User-Defined
Transformations Aggregation Datetimes
Types Functions Functions
Spark Query
Performance Partitioning
Architecture Optimization
Structured Streaming Streaming Aggregating
Delta Lake
and Delta Query Streams
©2022 Databricks Inc. — All rights reserved 3
Lesson Objectives
By the end of this course, you should be able to:
1 Identify core features of Spark and Databricks
2 Describe how DataFrames are created and evaluated in Spark
3 Apply the DataFrame API to process and analyze data
4 Demonstrate how Spark is optimized and executed on a cluster
5 Apply Delta Lake and Structured Streaming to process data
©2022 Databricks Inc. — All rights reserved 4
Module 1 Apache Spark Programming with Databricks
Introductions
©2022 Databricks Inc. — All rights reserved
Welcome!
Let’s get to know you
▪ Name
▪ Role and team
▪ Programing experience
▪ Motivation for attending
▪ Personal interest
©2022 Databricks Inc. — All rights reserved
Module 2 Apache Spark Programming with Databricks
Spark Core
©2022 Databricks Inc. — All rights reserved
Spark Core
Databricks Ecosystem
Spark Overview
Spark SQL
Reader & Writer
DataFrame & Column
©2022 Databricks Inc. — All rights reserved 8
Databricks
Ecosystem
©2022 Databricks Inc. — All rights reserved
Customers
5000+
Lakehouse
across the globe
One simple platform to unify all of
your data, analytics, and AI workloads
Original creators of:
10
10
©2022 Databricks Inc. — All rights reserved
Lakehouse
One platform to unify all of
Data your data, analytics, and AI Data
Lake workloads Warehouse
©2021 Databricks Inc. — All rights reserved
An open approach to bringing
data management and
governance to data lakes
Data Better reliability with transactions
Data
Lake 48x faster data processing with
indexing
Warehouse
Data governance at scale with
fine-grained access control lists
©2021 Databricks Inc. — All rights reserved
The Databricks Lakehouse Platform
Databricks Lakehouse Platform
Data BI and SQL Data Science Real-Time Data
Engineering Analytics and ML Applications
Data Management and Governance
✓ Simple
Open Data Lake
✓ Open
Platform Security & Administration
✓ Collaborative
Unstructured, semi-structured, structured, and streaming data
©2021 Databricks Inc. — All rights reserved
The Databricks Lakehouse Platform
Databricks Lakehouse Platform
✓ Simple Data
Engineering
BI and SQL
Analytics
Data Science
and ML
Real-Time Data
Applications
Unify your data, analytics, Data Management and Governance
and AI on one common
platform for all data use Open Data Lake
cases
Platform Security & Administration
Unstructured, semi-structured, structured, and streaming data
©2021 Databricks Inc. — All rights reserved
The Databricks Lakehouse Platform
✓ Open
30 Million+
Unify your data ecosystem Monthly downloads
with open source standards
and formats.
Built on the innovation of
some of the most
successful open source
data projects in the world
©2021 Databricks Inc. — All rights reserved
The Databricks Lakehouse Platform
Visual ETL & Data Ingestion Business Intelligence
Azure
✓ Open
Azure Data
Factory Synapse
Google
BigQuery
Amazon
Redshift
Unify your data ecosystem Machine Learning
with open source standards Amazon
SageMaker
Azure Machine
Learning
and formats.
Google
AI Platform
Lakehouse Platform
Data Providers
450+
Centralized Governance
AWS
Glue
Partners across the Top Consulting & SI Partners
data landscape
©2021 Databricks Inc. — All rights reserved
The Databricks Lakehouse Platform
Data Analysts
✓ Collaborative
Unify your data teams to
collaborate across the
entire data and AI workflow
Models
Dashboards
Notebooks
Datasets
Data Engineers Data Scientists
©2021 Databricks Inc. — All rights reserved
TPC-DS
Databricks SQL set official data warehousing performance record -
outperformed the previous record by 2.2x.
©2022 Databricks Inc. — All rights reserved
Spark Overview
©2022 Databricks Inc. — All rights reserved 19
• De-facto standard unified analytics engine for big
data processing
• Largest open-source project in data processing
• Technology created by the founders of Databricks
©2022 Databricks Inc. — All rights reserved 20
Spark Benefits
Fast Easy to Use Unified
©2022 Databricks Inc. — All rights reserved
Spark API
Spark SQL +
Streaming MLlib
DataFrames
Spark Core API
R SQL Python Scala Java
©2022 Databricks Inc. — All rights reserved
Spark Execution
Job Task 1
Stage 1
Spark
Job Task 2
application
Stage 2
Job
©2022 Databricks Inc. — All rights reserved
Spark Cluster
Driver
Worker Worker Worker Worker
Executor Executor Executor Executor
Task
Core Task
Core Task
Core Core Core Core Task
Core Task
Core
©2022 Databricks Inc. — All rights reserved
Bonus: Magic commands in Notebook cells
Magic commands allow you to override default languages as well as a few
auxiliary commands that run utilities/commands. For example:
1. %python, %r, %scala, %sql Switch languages in a command cell
2. %sh Run shell code (runs only on Spark Driver, and not the Workers)
3. %fs Shortcut for dbutils filesystem commands
4. %md Markdown for styling the display
5. %run Execute a remote Notebook from a Notebook
6. %pip Install new Python libraries
©2022 Databricks Inc. — All rights reserved
CSV
Comma Separated Values
item_id, name, price, qty
M_PREM_Q, Premium Queen Mattress, 1795, 35
M_STAN_F, Standard Full Mattress, 945, 24
M_PREM_F, Premium Full Mattress, 1695, 45
M_PREM_T, Premium Twin Mattress, 1095, 18
©2022 Databricks Inc. — All rights reserved
Parquet
A columnar storage format
Name Score ID
Row-Oriented data on disk
Kit 4.2 1 Alex 4.5 2 Terry 4.1 3
Column-Oriented data on disk
Kit Alex Terry 4.2 4.5 4.1 1 2 3
©2022 Databricks Inc. — All rights reserved
Delta Lake
Technology designed to be used with Apache Spark to build robust data lakes
©2022 Databricks Inc. — All rights reserved
Open-source Storage Layer
©2022 Databricks Inc. — All rights reserved
Delta Lake’s Key Features
▪ ACID transactions
▪ Time travel (data versioning)
▪ Schema enforcement and evolution
▪ Audit history
▪ Parquet format
▪ Compatible with Apache Spark API
©2022 Databricks Inc. — All rights reserved
Module 3 Apache Spark Programming with Databricks
Functions
©2022 Databricks Inc. — All rights reserved
Functions
Aggregation
Datetimes
Complex Types
Additional Functions
User-Defined Functions
©2022 Databricks Inc. — All rights reserved 32
Module 4 Apache Spark Programming with Databricks
Performance
©2022 Databricks Inc. — All rights reserved
Performance
Spark Architecture
Query Optimization
Partitioning
©2022 Databricks Inc. — All rights reserved 34
Spark
Architecture
Spark Cluster
Spark Execution
35
©2022 Databricks Inc. — All rights reserved
Scenario 1: Filter out brown pieces from candy
bags
©2022 Databricks Inc. — All rights reserved
Cluster
Driver
Executo
r
Core
©2022 Databricks Inc. — All rights reserved
Data
Partitio
n
©2022 Databricks Inc. — All rights reserved
We need filter out brown pieces
from these candy bags
©2022 Databricks Inc. — All rights reserved
2 3
1 5 4
9 67 8
10
11 12
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Student A get bag # 1
Student B get bag # 2
Student C get bag # 3
2 3 ...
1 5 4
9 67 8
10
11 12
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
B C D
A E
L F
K G
J H
I
©2022 Databricks Inc. — All rights reserved
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Eliminate the brown candy pieces
and pile the rest in the corner
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Eliminate the brown candy pieces
and pile the rest in the corner
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
A
E J H
B C D F
G I K L
©2022 Databricks Inc. — All rights reserved
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Students A, E, H, J,
get bags 13, 14, 15, 16
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
B
L
D
A F K
H G I
E
J
©2022 Databricks Inc. — All rights reserved
C
A B D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
A
E
J H
B C D F
G I K L
©2022 Databricks Inc. — All rights reserved
All done!
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Scenario 2: Count total pieces in candy bags
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
We need to count the total pieces
in these candy bags
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
Students B, E, I, L,
get these four bags
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
Students B, E, I, L,
commit your findings
5 6
5 6
A B C D E F
4 5
4 5
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count
Stage 1 is complete!
5 6
A B C D E F
4 5
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 2: Global
Count
©2022 Databricks Inc. — All rights reserved
Stage 2: Global Count
Student G, fetch counts from
students B, E, I, L
5 6
A B C D E F
4 5
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 2: Global Count
A B C D E F
5
6
4
5 G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 2: Global Count
A B C D E F
20
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 2: Global Count
20
A B C D E F
G H I J K L
©2022 Databricks Inc. — All rights reserved
Stage 1: Local Count Stage 2: Global Count
20
5 6
A B C D E F A B C D E F
4 5
G H I J K L G H I J K L
©2022 Databricks Inc. — All rights reserved
Query
Optimization
Catalyst Optimizer
Adaptive Query Execution
68
©2022 Databricks Inc. — All rights reserved
Query Optimization
LOGICAL OPTIMIZATION COST BASED OPTIMIZATION
Metadata Catalog
Catalyst Catalog
Cost Model
Query Unresolved Optimized Physical
Physical Selected
Logical Plan
Logical Plan Physical
Plans
RDDs
Logical Plan Plans Physical Plan
Plans
PHYSICAL WHOLE-STAGE
ANALYSIS PLANNING CODE GENERATION
©2022 Databricks Inc. — All rights reserved
Query Optimization with AQE
New in Spark 3.0, enabled by default as of Spark 3.2
LOGICAL OPTIMIZATION COST BASED OPTIMIZATION
Metadata Catalog
Catalyst Catalog
Cost Model
Query Unresolved Optimized Physical
Physical Selected
Logical Plan
Logical Plan Physical
Plans
RDDs
Logical Plan Plans Physical Plan
Plans
PHYSICAL WHOLE-STAGE
ANALYSIS PLANNING CODE GENERATION
Runtime Statistics
ADAPTIVE QUERY EXECUTION
©2022 Databricks Inc. — All rights reserved
Module 5 Apache Spark Programming with Databricks
Structured Streaming
©2022 Databricks Inc. — All rights reserved
Structured
Streaming
Streaming Query
Stream Aggregations
©2022 Databricks Inc. — All rights reserved 72
Streaming Query
Advantages
Use Cases
Sources 73
©2022 Databricks Inc. — All rights reserved
Batch Stream Processing
Processing
©2022 Databricks Inc. — All rights reserved
Advantages of Stream Processing
Lower latency Efficient Updates
Automatic bookkeeping on new data
©2022 Databricks Inc. — All rights reserved
Stream Processing Use Cases
Real-time
Notifications Incremental ETL
reporting
Update data to
Real-time
serve in Online ML
decision making
real-time
©2022 Databricks Inc. — All rights reserved
Micro-Batch Processing
©2022 Databricks Inc. — All rights reserved
Micro-Batch Processing
©2022 Databricks Inc. — All rights reserved
Structured Streaming
Micro-batches
= new rows
appended to
unbounded table
©2022 Databricks Inc. — All rights reserved
Input Sources
Kafka Event Hubs
Sockets Generator
Files
FOR TESTING
©2022 Databricks Inc. — All rights reserved
Sinks
Kafka Event Hubs
Console Memory
Files Foreach
FOR DEBUGGING
©2022 Databricks Inc. — All rights reserved
Output Modes
APPEND
UPDATE COMPLETE
Add new records
Update changed Rewrite full output
only
records in place
©2022 Databricks Inc. — All rights reserved
Trigger Types
Process each micro-batch as soon as the previous
Default
one has been processed
Micro-batch processing kicked off at the
Fixed interval
user-specified interval
Process all of the available data as a single
One-time
micro-batch and then automatically stop the query
Long-running tasks that continuously read,
Continuous process, and write data as soon events are
Processing available
*Experimental See Structured Streaming Programming Guide
©2022 Databricks Inc. — All rights reserved
End-to-end fault tolerance
Guaranteed in Structured Streaming by
Checkpointing and write-ahead logs
Idempotent sinks
Replayable data sources
©2022 Databricks Inc. — All rights reserved
Stream
Aggregations
Aggregations
Windows
Watermarking 85
©2022 Databricks Inc. — All rights reserved
Real-time Aggregations
Errors in IoT data by device type
Anomalous behavior in server log files by country
Behavior analysis on messages by hashtags
©2022 Databricks Inc. — All rights reserved
Time-Based Windows
Tumbling Windows Sliding Windows
No window overlap Windows overlap
Any given event gets Any given event gets
aggregated into only one aggregated into multiple
window group window groups
e.g. 1:00–2:00 am, 2:00–3:00 e.g. 1:00-2:00 am, 1:30–2:30 am,
am, 3:00-4:00 am, ... 2:00–3:00 am, ...
©2022 Databricks Inc. — All rights reserved
Sliding Windows Example
©2022 Databricks Inc. — All rights reserved
Windowing
(streamingDF
.groupBy(col("device"),
window(col("time"), "1 hour"))
.count())
Why are we seeing 200
tasks for this stage?
©2022 Databricks Inc. — All rights reserved
Control the Shuffle Repartitioning
spark.conf.set("spark.sql.shuffle.partitions",spark.sparkContext.defaultParallelism)
©2022 Databricks Inc. — All rights reserved
Event-Time Processing
EVENT-TIME DATA WATERMARKS
Process based on event-time Handle late data and limit how
(time fields embedded in data) long to remember old data
rather than receipt time
©2022 Databricks Inc. — All rights reserved
Handling Late Data and Watermarking
©2022 Databricks Inc. — All rights reserved
Watermarking
(streamingDF
.withWatermark("time", "2 hours")
.groupBy(col("device"),
window(col("time"), "1 hour"))
.count()
)
©2022 Databricks Inc. — All rights reserved
Module 6 Apache Spark Programming with Databricks
Delta Lake
©2022 Databricks Inc. — All rights reserved
Delta
Lake
Using Spark with Delta Lake
©2022 Databricks Inc. — All rights reserved 95
Delta Lake
Delta Lake Concepts
96
©2022 Databricks Inc. — All rights reserved
What is Delta Lake?
▪ Technology designed to be used with Apache Spark to build robust
data lakes
▪ Open source project at delta.io
▪ Databricks Delta Lake documentation
©2022 Databricks Inc. — All rights reserved
Delta Lake features
▪ ACID transactions on Spark
▪ Scalable metadata handling
▪ Streaming and batch unification
▪ Schema enforcement
▪ Time travel
▪ Upserts and deletes
▪ Fully configurable/optimizable
▪ Structured streaming support
©2022 Databricks Inc. — All rights reserved
Delta Lake components
Delta Lake
storage Delta tables Delta Engine
layer
©2022 Databricks Inc. — All rights reserved
Delta Lake Storage Layer
▪ Highly performant and persistent
▪ Low-cost, easily scalable object storage
▪ Ensures consistency
▪ Allows for flexibility
©2022 Databricks Inc. — All rights reserved
Delta tables
▪ Contain data in Parquet files that are kept in object storage
▪ Keep transaction logs in object storage
▪ Can be registered in a metastore (optional)
©2022 Databricks Inc. — All rights reserved
Delta Engine
Databricks edge feature; not available in OS Apache Spark
▪ File management optimizations
▪ Auto-optimized writes
▪ Performance optimization via Delta caching
©2022 Databricks Inc. — All rights reserved
What is the Delta transaction log?
▪ Ordered record of the transactions performed on a Delta table
▪ Single source of truth for that table
▪ Mechanism that the Delta Engine uses to guarantee atomicity
©2022 Databricks Inc. — All rights reserved
How does the transaction log work?
▪ Delta Lake breaks operations down into one or more of these steps:
▪ Add file
▪ Remove file
▪ Update metadata
▪ Set transaction
▪ Change protocol
▪ Commit info
©2022 Databricks Inc. — All rights reserved
Delta transaction log at the file level
©2022 Databricks Inc. — All rights reserved
Adding commits to the transaction log
©2022 Databricks Inc. — All rights reserved
DESCRIBE Command DESCRIBE HISTORY Command
▪ Returns the metadata of an ▪ Returns a more complete set
existing table of metadata for a Delta table
▪ Ex. Column names, data ▪ Operation, user,
types, comments operation metrics
©2022 Databricks Inc. — All rights reserved
Delta Lake Time Travel
▪ Query an older snapshot of a Delta table
▪ Re-creating analysis, reports or outputs
▪ Writing complex temporal queries
▪ Fixing mistakes in data
▪ Providing snapshot isolation
©2022 Databricks Inc. — All rights reserved
Time Travel SQL syntax
SELECT * FROM events TIMESTAMP AS OF timestamp_expression
SELECT * FROM events VERSION AS OF version
©2022 Databricks Inc. — All rights reserved
Time Travel SQL syntax
SELECT * FROM events TIMESTAMP AS OF timestamp_expression
▪ timestamp_expression can be:
▪ String that can be cast to a timestamp
▪ String that can be cast to a date
▪ Explicit timestamp type (the result of casting)
▪ Simple time or date expressions (see the Databricks
documentation)
©2022 Databricks Inc. — All rights reserved
Time Travel SQL syntax
SELECT * FROM events VERSION AS OF version
▪ Version can be obtained from the output of DESCRIBE HISTORY
events.
©2022 Databricks Inc. — All rights reserved
Thank you! Congrats!
©2022 Databricks Inc. — All rights reserved