Scaling Pandas Workflows with PySpark

Table of Contents

Scaling Pandas Workflows with PySpark

Scaling Pandas Workflows with PySpark’s Pandas API

Introduction

A common challenge data scientists face is that pandas struggles with large datasets that exceed memory limits. When working with big data, local computation can become unbearably slow or even crash.

On the other hand, PySpark provides distributed computing but requires users to learn a new syntax based on Spark DataFrames.

For example, here’s how you would compute the average value of a column in pandas and PySpark:

Pandas syntax:

#| eval: false import pandas as pd pandas_df = pd.DataFrame({"value": [1, 2, 3, 4, 5]}) pandas_df["value"].mean() # Output: 3 

PySpark syntax:

#| eval: false from pyspark.sql import SparkSession from pyspark.sql.functions import avg spark = SparkSession.builder.getOrCreate() spark_df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["value"]) spark_df.select(avg("value")).show() 

Output:

+----------+ |avg(value)| +----------+ | 3.0| +----------+ 

This example shows that simple tasks in pandas often require more verbose operations in PySpark. This learning curve and context switching usually slow down rapid prototyping and data exploration.

The Pandas API on Spark (pyspark.pandas) bridges this gap, allowing you to write familiar pandas-like code that scales across a Spark cluster. You get the best of both worlds: pandas ease of use and Spark scalability.

In this article, we’ll explore the advantages of using the Pandas API on Spark, guide you through practical usage examples, and discuss its differences from pandas and PySpark.

The source code of this article can be found here:

Source Code

Advantages of Using Pandas API on Spark Over Pandas

  • Faster query execution: Pandas on Spark uses all available CPU cores to parallelize computations, significantly speeding up queries compared to pandas, which is limited to a single core.
  • Scalable to larger-than-memory datasets: Unlike pandas, which requires the entire dataset to fit in memory and often fails with memory errors, Spark can work with datasets that are bigger than your computer’s memory by processing small parts at a time.
  • Provides access to Spark’s battle-tested query optimizer: Pandas on Spark uses Spark’s Catalyst optimizer, which automatically improves queries by selecting only the needed columns and filtering rows early.

Pandas on Spark Architecture

Setup

First, install PySpark if you haven’t:

pip install pyspark 

Then, start a local Spark session:

#| eval: false from pyspark.sql import SparkSession import pyspark.pandas as ps spark = SparkSession.builder.getOrCreate() 

Object Creation

You can create a pandas-on-Spark Series or DataFrames using the same syntax as pandas:

Create a pandas-on Spark Series:

#| eval: false ps_s = ps.Series([1, 3, 5, 6, 8]) 

Create a pandas-on Spark DataFrame:

#| eval: false import numpy as np ps_df = ps.DataFrame( {"id": np.arange(1, 1_000_001), "value": np.random.randn(1_000_000)} ) 

You can even convert an existing pandas object to a pandas-on-Spark easily:

#| eval: false ps_df = ps.from_pandas(pandas_df) 

Basic Operations

You can perform operations like with pandas, but now it’s distributed. Here are some examples of basic operations using the pandas API on Spark:

Compute basic statistics:

#| eval: false ps_df.describe() 

Output:

 id value count 1000000.000000 1000000.000000 mean 500000.500000 -0.000697 std 288675.278932 0.999534 min 1.000000 -5.051222 25% 250000.750000 -0.674671 50% 500000.500000 -0.000586 75% 750000.250000 0.672834 max 1000000.000000 4.553696 

Get the first few rows:

#| eval: false ps_df.head() 

Output:

 id value 0 1 -3.334066 1 2 0.966236 2 3 -1.148075 3 4 1.108155 4 5 -0.049615 

Filter rows and drop any NaN values:

#| eval: false filtered_df = ps_df.where(ps_df.value > 0).dropna() filtered_df.head() 

Output:

 id value 1 2.0 0.966236 3 4.0 1.108155 9 10.0 0.562544 12 13.0 0.809431 13 14.0 1.478501 

GroupBy

Grouping work similarly but happen in parallel across partitions:

#| eval: false # Create a sample DataFrame ps_df_2 = ps.DataFrame( {"category": ["A", "B", "A", "C", "B"], "value": [10, 20, 15, 30, 25]} ) # Compute mean value by category ps_df_2.groupby("category").value.mean() 

Output:

category A 12.5 B 22.5 C 30.0 Name: value, dtype: float64 

Plotting

Basic plotting is supported. Below are some examples:

Plot a histogram:

#| eval: false ps_df["value"].plot.hist() 

Histogram Plot

Plot a bar graph:

#| eval: false ps_df_2.plot.bar(x="category", y="value") 

Bar Graph

Reading and Writing Data

You can easily load and save datasets in common formats. For examples, you can:

Read and write to CSV:

#| eval: false # Write to CSV ps_df.to_csv("output_data.csv") # Read back new_df = ps.read_csv("output_data.csv") print(new_df.head()) 

Read and write to Parquet:

#| eval: false # Write to Parquet ps_df.to_parquet("output_data.parquet") # Read back new_parquet_df = ps.read_parquet("output_data.parquet") print(new_parquet_df.head()) 

Using Pandas API on Spark with Regular Pandas

Combining Pandas API on Spark with pandas to get the best of both worlds is often useful. For example, you can clean and aggregate a large dataset with Pandas API on Spark to benefit from fast, parallel processing:

#| eval: false import pyspark.pandas as ps import pandas as pd from sklearn.linear_model import LinearRegression # Create a large Pandas API on Spark DataFrame psdf = ps.DataFrame({ "feature1": range(1_000_000), "feature2": range(1_000_000, 2_000_000), "target": range(500_000, 1_500_000) }) print(f"Length of the original DataFrame: {len(psdf):,}") # Aggregate the data to a smaller size aggregated = psdf.groupby(psdf.feature1 // 10000).mean() print(f"Length of the aggregated DataFrame: {len(aggregated):,}") 

Output:

Length of the original DataFrame: 1,000,000 Length of the aggregated DataFrame: 100 

Once the dataset is small enough, you can convert it to a pandas DataFrame using .to_pandas() and then apply a scikit-learn machine learning model:

#| eval: false # Convert to pandas DataFrame small_pdf = aggregated.to_pandas() # Train a scikit-learn model model = LinearRegression() X = small_pdf[["feature1", "feature2"]] y = small_pdf["target"] model.fit(X, y) 

This approach works well if the cleaned and aggregated data fits comfortably into memory.

Pandas API on Spark vs. Pandas: Query Execution Model

Pandas API on Spark executes queries differently than pandas:

  • Pandas API on Spark uses lazy evaluation. It builds a logical query plan, optimizes it, and only executes when results are requested
  • Pandas uses eager evaluation. It loads data into memory immediately and performs each operation as it is called, without optimizations

Example in pandas (eager execution):

#| eval: false pandas_df["value"] = pandas_df["value"] + 1 # Operation executes immediately print(pandas_df) 

Output:

 value 0 2 1 3 2 4 3 5 4 6 

Example in Pandas API on Spark (lazy execution):

#| eval: false # Using Pandas API on Spark updated_psdf = ps_df.assign(a=ps_df["value"] + 1) # Lazy operation print(updated_psdf.head()) # Triggers actual computation 

Output:

 id value a 0 1 -0.002641 0.997359 1 2 -1.818039 -0.818039 2 3 2.371413 3.371413 3 4 0.909148 1.909148 4 5 2.365013 3.365013 

Pandas API on Spark vs. PySpark Differences

Both Pandas API on Spark and PySpark generate logical query plans and optimize execution with Spark. As a result, their performance is often similar.

The main difference is syntax: Pandas API on Spark follows a pandas-like syntax, while PySpark follows Spark SQL/DataFrame syntax.

Pandas API on Spark syntax:

#| eval: false pandas_spark_df = ps.DataFrame({"col1": [1, 2, 3], "col2": [4, 5, 6]}) (pandas_spark_df["col1"] + pandas_spark_df["col2"]).head() 

PySpark syntax:

#| eval: false from pyspark.sql.functions import col pyspark_df = spark.createDataFrame([(1, 4), (2, 5), (3, 6)], ["col1", "col2"]) pyspark_df.select((col("col1") + col("col2")).alias("sum")).show() 

You can easily convert a Pandas-on-Spark DataFrame to a Spark DataFrame:

#| eval: false # Convert Pandas API on Spark DataFrame to PySpark DataFrame spark_native_df = pandas_spark_df.to_spark() # Now you can use full PySpark functionality spark_native_df.select((col("col1") + col("col2")).alias("sum")).show() 

Final Thoughts

Being able to use pandas syntax while leveraging PySpark to handle big data feels like a dream combination. If you’ve been hesitant to start using PySpark because of its different syntax, give Pandas API on Spark a try. It makes working with large datasets much more approachable without losing the comfort of familiar code.

Leave a Comment

0
    0
    Your Cart
    Your cart is empty
    Scroll to Top

    Work with Khuyen Tran

    Work with Khuyen Tran