DEV Community

Abdelrahman Adnan
Abdelrahman Adnan

Posted on

๐Ÿš€ MLOps Zoomcamp 2025: Week 3 - Workflow Orchestration with Prefect ๐Ÿ“Š

๐Ÿ”„ What is Workflow Orchestration?

Workflow orchestration is the process of coordinating, scheduling, and integrating various tasks in a data pipeline. For ML pipelines, this includes:

  • Data ingestion and preprocessing
  • Feature engineering
  • Model training and evaluation
  • Model deployment
  • Retraining schedules
  • Monitoring

Orchestration tools help manage complex dependencies, handle failures gracefully, and provide visibility into pipeline execution.

๐Ÿงฉ Prefect: A Modern Orchestration Tool for ML

Prefect is designed specifically for data-intensive applications with a Python-first approach. Unlike older orchestration tools, Prefect allows you to convert your existing Python code into production-ready data pipelines with minimal changes.

Why Prefect for MLOps?

  1. Pythonic: Works with your existing Python code
  2. Dynamic: DAGs can be created at runtime
  3. Resilient: Built-in error handling and recovery
  4. Observable: Comprehensive UI and monitoring
  5. Distributed: Can scale across multiple machines
  6. Modern: Actively maintained and feature-rich

๐Ÿ—๏ธ Prefect Architecture Explained

Prefect has a distributed architecture with distinct components:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ Prefect API โ”‚โ—„โ”€โ”€โ”€โ”€โ”ค Prefect UI โ”‚ โ”‚ Storage โ”‚ โ”‚ (Orchestrator) โ”‚ โ”‚ (Dashboard) โ”‚ โ”‚ (S3, GCS, etc) โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚ โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ Prefect Agents โ”‚โ—„โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ (Workers that execute flows) โ”‚ โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ 
Enter fullscreen mode Exit fullscreen mode

Core Components:

  1. Prefect API (Orion): The orchestration engine that:

    • Schedules and triggers flows
    • Tracks flow and task states
    • Stores execution history
    • Manages concurrency and dependencies
  2. Prefect UI: Web-based dashboard that provides:

    • Visual representation of flows and tasks
    • Execution history and logs
    • Performance metrics
    • Administrative controls
  3. Storage: Where flow code and artifacts are stored:

    • Local filesystem
    • Cloud object storage (S3, GCS, etc.)
    • Git repositories
    • Docker images
  4. Agents: Workers that:

    • Poll for scheduled flows
    • Pull flow code from storage
    • Execute flows in the specified environment
    • Report results back to the API

๐Ÿง  Prefect Core Concepts in Detail

1. Tasks ๐Ÿ“‹

Tasks are the atomic units of work in Prefect. They encapsulate individual operations and can be composed into larger workflows.

Task Definition:

from prefect import task @task( name="extract_data", # Custom name for the task  retries=3, # Retry automatically on failure  retry_delay_seconds=30, # Wait between retries  cache_key_fn=lambda context, **params: params["url"], # Cache by URL parameter  cache_expiration=timedelta(hours=12) # Cache for 12 hours ) def extract_data(url: str) -> pd.DataFrame: """Extract data from a given URL""" return pd.read_csv(url) 
Enter fullscreen mode Exit fullscreen mode

Task Properties:

  • Name: Human-readable identifier for the task
  • Retries: Number of times to retry on failure
  • Timeout: Maximum execution time
  • Tags: Metadata for filtering and organization
  • Cache: Store and reuse task results
  • Result Storage: Where to store task outputs
  • Result Handlers: How to serialize/deserialize outputs

Task States:

Tasks can be in various states during execution:

  • Pending: Ready to run
  • Running: Currently executing
  • Completed: Successfully finished
  • Failed: Encountered an error
  • Retrying: Failed but will retry
  • Cancelled: Manually stopped

2. Flows ๐ŸŒŠ

Flows are the main unit of work in Prefect. They coordinate the execution of tasks and manage dependencies.

Flow Definition:

from prefect import flow from prefect.task_runners import ConcurrentTaskRunner @flow( name="Data Processing Pipeline", description="Extracts, transforms, and loads data", version="1.0.0", task_runner=ConcurrentTaskRunner(), # Run tasks concurrently  retries=2, # Retry the entire flow if it fails ) def process_data(date: str = None): """Main flow to process data""" # If no date provided, use today's date  if date is None: date = datetime.today().strftime("%Y-%m-%d") # Call tasks, which automatically creates dependencies  data = extract_data(f"data/data-{date}.csv") processed = transform_data(data) load_data(processed) return processed 
Enter fullscreen mode Exit fullscreen mode

Flow Features:

  • Task Dependencies: Automatically determined from function calls
  • Error Handling: Custom exception handling
  • Subflows: Nested flows for better organization
  • Parameters: Runtime configuration
  • State Handlers: Custom logic on state changes
  • Logging: Structured logging for monitoring
  • Concurrency: Parallel task execution

Flow Execution:

Flows can be executed in various ways:

  • Directly calling the function
  • Using flow.serve() for real-time API
  • Through deployments for scheduled runs
  • Via the Prefect UI or API

3. Task Runners ๐Ÿƒโ€โ™‚๏ธ

Task runners determine how tasks within a flow are executed.

Types of Task Runners:

from prefect.task_runners import SequentialTaskRunner # Default, run tasks in sequence from prefect.task_runners import ConcurrentTaskRunner # Run tasks concurrently from prefect.task_runners import DaskTaskRunner # Distributed execution with Dask 
Enter fullscreen mode Exit fullscreen mode

Use Cases:

  • SequentialTaskRunner: Simple workflows, easy debugging
  • ConcurrentTaskRunner: Independent tasks that can run in parallel
  • DaskTaskRunner: Large-scale data processing, distributed computing

4. Deployments ๐Ÿšข

Deployments make flows executable outside your local Python environment.

Creating a Deployment:

from prefect.deployments import Deployment from prefect.orion.schemas.schedules import CronSchedule from prefect.infrastructure.process import Process # Create deployment from a flow deployment = Deployment.build_from_flow( flow=process_data, name="daily-data-processing", version="1", schedule=CronSchedule("0 0 * * *"), # Daily at midnight  infrastructure=Process(), # Run as a local process  tags=["production", "data-pipeline"], parameters={"date": None}, # Default parameters ) # Apply deployment to the Prefect API deployment.apply() 
Enter fullscreen mode Exit fullscreen mode

Deployment Properties:

  • Name: Identifier for the deployment
  • Schedule: When to execute the flow
  • Infrastructure: Where to run the flow
  • Storage: Where to store the flow code
  • Parameters: Default parameters for the flow
  • Tags: For organizing and filtering deployments

5. Schedules โฐ

Schedules determine when flows should be executed automatically.

Types of Schedules:

from prefect.orion.schemas.schedules import ( CronSchedule, # Based on cron expressions  IntervalSchedule, # Run at regular intervals  RRuleSchedule # Based on iCalendar recurrence rules ) # Examples cron_schedule = CronSchedule(cron="0 9 * * 1-5") # Weekdays at 9 AM interval_schedule = IntervalSchedule(interval=timedelta(hours=4)) # Every 4 hours 
Enter fullscreen mode Exit fullscreen mode

6. Results and Persistence ๐Ÿ’พ

Prefect can store and track results from task and flow runs.

Persisting Results:

@task( persist_result=True, # Store the result  result_serializer=JSONSerializer(), # How to serialize the result  result_storage=S3Bucket.load("my-bucket"), # Where to store the result ) def calculate_metrics(data): # Process data and return metrics  return {"accuracy": 0.95, "precision": 0.92} 
Enter fullscreen mode Exit fullscreen mode

๐Ÿ› ๏ธ Building an ML Pipeline with Prefect: Complete Example

The following example demonstrates a complete ML pipeline for NYC taxi trip duration prediction:

import pandas as pd import pickle from datetime import datetime, timedelta from sklearn.feature_extraction import DictVectorizer from sklearn.linear_model import LinearRegression from sklearn.metrics import mean_squared_error from prefect import task, flow from prefect.task_runners import SequentialTaskRunner from prefect.deployments import Deployment from prefect.orion.schemas.schedules import CronSchedule from prefect.logging import get_run_logger # Data ingestion task with retries @task(retries=3, retry_delay_seconds=5, name="download_taxi_data") def get_data(date): """Download taxi data for given date range""" logger = get_run_logger() train_date = datetime.strptime(date, '%Y-%m-%d') val_date = train_date + timedelta(days=28) train_month = train_date.month val_month = val_date.month train_year = train_date.year val_year = val_date.year train_file = f"yellow_tripdata_{train_year}-{train_month:02d}.parquet" val_file = f"yellow_tripdata_{val_year}-{val_month:02d}.parquet" logger.info(f"Downloading training data: {train_file}") train_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{train_file}" train_data = pd.read_parquet(train_url) logger.info(f"Downloading validation data: {val_file}") val_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{val_file}" val_data = pd.read_parquet(val_url) return train_data, val_data # Data preprocessing task @task(name="prepare_taxi_features") def prepare_features(df, categorical_features): """Prepare the features for training""" logger = get_run_logger() # Calculate trip duration  df['duration'] = (df.tpep_dropoff_datetime - df.tpep_pickup_datetime).dt.total_seconds() / 60 # Filter outliers  logger.info(f"Initial shape: {df.shape}") df = df[(df.duration >= 1) & (df.duration <= 60)] logger.info(f"Filtered shape: {df.shape}") # Create features  df['PU_DO'] = df['PULocationID'].astype(str) + '_' + df['DOLocationID'].astype(str) # Process categoricals  df[categorical_features] = df[categorical_features].fillna(-1).astype('str') return df # Model training task @task(name="train_regression_model") def train_model(df, categorical_features): """Train the model with the given data""" logger = get_run_logger() dicts = df[categorical_features].to_dict(orient='records') dv = DictVectorizer() X_train = dv.fit_transform(dicts) y_train = df['duration'].values logger.info(f"Training on {X_train.shape[0]} examples") lr = LinearRegression() lr.fit(X_train, y_train) return lr, dv # Model evaluation task @task(name="evaluate_model_performance") def evaluate_model(model, dv, df, categorical_features): """Evaluate the model performance""" logger = get_run_logger() dicts = df[categorical_features].to_dict(orient='records') X_val = dv.transform(dicts) y_val = df['duration'].values y_pred = model.predict(X_val) rmse = mean_squared_error(y_val, y_pred, squared=False) logger.info(f"RMSE: {rmse:.3f}") return rmse # Model artifacts storage task @task(name="save_model_artifacts") def save_model(model, dv, date): """Save the model and DictVectorizer""" logger = get_run_logger() # Create directory if it doesn't exist  os.makedirs("models", exist_ok=True) with open(f'models/model-{date}.pkl', 'wb') as f: pickle.dump(model, f) with open(f'models/dv-{date}.pkl', 'wb') as f: pickle.dump(dv, f) logger.info(f"Model saved: models/model-{date}.pkl") return True # Main flow that orchestrates all tasks @flow(name="taxi-duration-prediction", task_runner=SequentialTaskRunner()) def main(date=None): """Main flow for taxi duration prediction model training""" if date is None: date = datetime.today().strftime('%Y-%m-%d') logger = get_run_logger() logger.info(f"Starting training pipeline for date: {date}") categorical_features = ['PULocationID', 'DOLocationID', 'PU_DO'] # 1. Get data  train_data, val_data = get_data(date) # 2. Process training data  df_train = prepare_features(df=train_data, categorical_features=categorical_features) # 3. Train model  model, dv = train_model(df=df_train, categorical_features=categorical_features) # 4. Process validation data  df_val = prepare_features(df=val_data, categorical_features=categorical_features) # 5. Evaluate model  rmse = evaluate_model( model=model, dv=dv, df=df_val, categorical_features=categorical_features ) # 6. Save model  save_model(model=model, dv=dv, date=date) logger.info(f"Pipeline completed successfully!") return rmse # Define deployment for production def create_deployment(): return Deployment.build_from_flow( flow=main, name="nyc-taxi-monthly-training", schedule=CronSchedule("0 0 1 * *"), # First day of each month at midnight  tags=["production", "mlops", "taxi-duration"] ) # For local development/testing if __name__ == "__main__": main("2023-01-01") 
Enter fullscreen mode Exit fullscreen mode

๐Ÿ“Š MLOps Workflow Integration

In the MLOps lifecycle, Prefect serves as the workflow orchestration layer that connects:

  1. Data Engineering: Extracting and preparing data for ML
  2. Experimentation: Running and tracking model experiments
  3. Continuous Training: Automating regular model retraining
  4. Deployment: Pushing models to production
  5. Monitoring: Tracking model performance over time

Integration with MLflow

Prefect works well with MLflow for experiment tracking:

@task def train_with_mlflow(X_train, y_train, params): with mlflow.start_run(): # Log parameters  mlflow.log_params(params) # Train model  model = RandomForestRegressor(**params) model.fit(X_train, y_train) # Log metrics  train_rmse = mean_squared_error(y_train, model.predict(X_train), squared=False) mlflow.log_metric("train_rmse", train_rmse) # Log model  mlflow.sklearn.log_model(model, "model") return model 
Enter fullscreen mode Exit fullscreen mode

๐Ÿšฆ Prefect vs. Other Orchestration Tools

Prefect vs. Airflow

Feature Prefect Airflow
Programming Model Python-native, tasks as functions DAGs as configuration
Dynamic Workflows Yes, DAGs can change at runtime Limited, mostly static DAGs
Error Handling Rich, automated retries and custom handlers Basic retry policies
UI/UX Modern, task-oriented Traditional, DAG-oriented
Parametrization First-class support More complex implementation
Learning Curve Lower for Python developers Steeper
Community Growing Large, established

Prefect vs. Kubeflow

Feature Prefect Kubeflow
Focus General workflow orchestration Kubernetes-native ML pipelines
Deployment Multiple options (local, cloud, k8s) Primarily Kubernetes
Integration Python ecosystem Container-based components
Complexity Lower barrier to entry Higher complexity
ML Specific General purpose, adaptable to ML Built specifically for ML

๐Ÿ›ซ Getting Started with Prefect in 5 Minutes

Installation

pip install prefect 
Enter fullscreen mode Exit fullscreen mode

Start the Prefect Server

prefect server start 
Enter fullscreen mode Exit fullscreen mode

Create a Simple Flow

from prefect import task, flow @task def say_hello(name): return f"Hello, {name}!" @flow def hello_flow(name="world"): result = say_hello(name) print(result) return result if __name__ == "__main__": hello_flow("MLOps Zoomcamp") 
Enter fullscreen mode Exit fullscreen mode

Create a Deployment

prefect deployment build hello_flow.py:hello_flow -n my-first-deployment -q default prefect deployment apply hello_flow-deployment.yaml 
Enter fullscreen mode Exit fullscreen mode

Start an Agent

prefect agent start -q default 
Enter fullscreen mode Exit fullscreen mode

๐Ÿงช Best Practices for ML Workflows with Prefect

  1. Task Granularity: Create tasks at the right level - not too fine-grained, not too coarse
  2. Error Boundaries: Place tasks boundaries around operations that might fail
  3. Parameterize Flows: Make flows configurable with parameters
  4. Logging: Use the Prefect logger to capture important information
  5. Resource Management: Clean up resources in task teardown
  6. Caching Strategy: Cache expensive computations but be mindful of data changes
  7. Testing: Test flows and tasks independently
  8. Version Control: Track flow code in version control
  9. Documentation: Document flow purpose, inputs, and outputs
  10. Monitoring: Set up notifications for critical flow failures

๐Ÿ”ฎ Advanced Prefect Features for ML

Dask Integration for Distributed Training

from prefect.task_runners import DaskTaskRunner @flow(task_runner=DaskTaskRunner()) def distributed_training_flow(): # Tasks will be executed in a distributed Dask cluster  results = [] for i in range(10): results.append(train_model_fold(fold_id=i)) return results 
Enter fullscreen mode Exit fullscreen mode

Storage Options for Large Models

from prefect.filesystems import S3 # Register S3 block s3_block = S3(bucket_path="my-model-registry") s3_block.save("model-storage") # Use in deployment deployment = Deployment.build_from_flow( flow=train_flow, name="distributed-training", storage=S3.load("model-storage"), ) 
Enter fullscreen mode Exit fullscreen mode

Notifications for Critical Failures

from prefect.notifications import SlackWebhook slack_webhook = SlackWebhook(url="https://hooks.slack.com/services/XXX/YYY/ZZZ") slack_webhook.save("ml-alerts") @flow(on_failure=[SlackWebhook.load("ml-alerts")]) def critical_training_flow(): # This flow will send a Slack message if it fails  ... 
Enter fullscreen mode Exit fullscreen mode

Top comments (0)