DEV Community

Dhanvina N
Dhanvina N

Posted on

Automate Your Data Workflows: Why Pressing Download Button Isn’t Always Enough!

Ever found yourself downloading datasets from Kaggle or other online sources, only to get bogged down by repetitive tasks like data cleaning and splitting? Imagine if you could automate these processes, making data management as breezy as a click of a button! That’s where Apache Airflow comes into play. Let’s dive into how you can set up an automated pipeline for handling massive datasets, complete with a NAS (Network-Attached Storage) for seamless data management. 🚀

Why Automate?

Before we dive into the nitty-gritty, let’s explore why automating data workflows can save you time and sanity:

Reduce Repetition: Automate repetitive tasks to focus on more exciting aspects of your project.
Increase Efficiency: Quickly handle updates or new data without manual intervention.
Ensure Consistency: Maintain consistent data processing standards every time.

Step-by-Step Guide to Your Data Pipeline

Let’s walk through setting up a data pipeline using Apache Airflow, focusing on automating dataset downloads, data cleaning, and splitting—all while leveraging your NAS for storage.

File structure

/your_project/ │ ├── dags/ │ └── kaggle_data_pipeline.py # Airflow DAG script for automation │ ├── scripts/ │ ├── cleaning_script.py # Data cleaning script │ └── split_script.py # Data splitting script │ ├── data/ │ ├── raw/ # Raw dataset files │ ├── processed/ # Cleaned and split dataset files │ └── external/ # External files or archives │ ├── airflow_config/ │ └── airflow.cfg # Airflow configuration file (if customized) │ ├── Dockerfile # Optional: Dockerfile for containerizing ├── docker-compose.yml # Optional: Docker Compose configuration ├── requirements.txt # Python dependencies for your project └── README.md # Project documentation 
Enter fullscreen mode Exit fullscreen mode

1. Set Up Apache Airflow
First things first, let’s get Airflow up and running.

Install Apache Airflow:

# Create and activate a virtual environment python3 -m venv airflow_env source airflow_env/bin/activate # Install Airflow pip install apache-airflow 
Enter fullscreen mode Exit fullscreen mode

Initialize the Airflow Database:

airflow db init 
Enter fullscreen mode Exit fullscreen mode

Create an Admin User:

airflow users create --username admin --firstname Admin --lastname User --role Admin --email admin@example.com 
Enter fullscreen mode Exit fullscreen mode

Start Airflow:

airflow webserver --port 8080 airflow scheduler 
Enter fullscreen mode Exit fullscreen mode

Access Airflow UI: Go to http://localhost:8080 in your web browser.

2. Connect Your NAS
Mount NAS Storage: Ensure your NAS is mounted on your system. For instance:

sudo mount -t nfs <NAS_IP>:/path/to/nas /mnt/nas 
Enter fullscreen mode Exit fullscreen mode

3. Create Your Data Pipeline DAG
Create a Python file (e.g., kaggle_data_pipeline.py) in the ~/airflow/dags directory with the following code:

from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta import os import subprocess # Default arguments default_args = { 'owner': 'your_name', 'depends_on_past': False, 'start_date': datetime(2024, 8, 1), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # Define the DAG dag = DAG( 'kaggle_data_pipeline', default_args=default_args, description='Automated Pipeline for Kaggle Datasets', schedule_interval=timedelta(days=1), ) # Define Python functions for each task def download_data(**kwargs): # Replace with your Kaggle dataset URL and credentials subprocess.run(["kaggle", "datasets", "download", "-d", "<DATASET_ID>", "-p", "/mnt/nas/data"]) def extract_data(**kwargs): # Extract data if it's in a compressed format subprocess.run(["unzip", "/mnt/nas/data/dataset.zip", "-d", "/mnt/nas/data"]) def clean_data(**kwargs): # Example cleaning script call subprocess.run(["python", "/path/to/cleaning_script.py", "--input", "/mnt/nas/data"]) def split_data(**kwargs): # Example splitting script call subprocess.run(["python", "/path/to/split_script.py", "--input", "/mnt/nas/data"]) # Define tasks download_task = PythonOperator( task_id='download_data', python_callable=download_data, dag=dag, ) extract_task = PythonOperator( task_id='extract_data', python_callable=extract_data, dag=dag, ) clean_task = PythonOperator( task_id='clean_data', python_callable=clean_data, dag=dag, ) split_task = PythonOperator( task_id='split_data', python_callable=split_data, dag=dag, ) # Set task dependencies download_task >> extract_task >> clean_task >> split_task 
Enter fullscreen mode Exit fullscreen mode

Create Data Processing Scripts
scripts/cleaning_script.py

import argparse import os def clean_data(input_path): # Implement your data cleaning logic here print(f"Cleaning data in {input_path}...") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--input', required=True, help="Path to the data directory") args = parser.parse_args() clean_data(args.input) 
Enter fullscreen mode Exit fullscreen mode

scripts/split_script.py

import argparse import os def split_data(input_path): # Implement your data splitting logic here print(f"Splitting data in {input_path}...") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument('--input', required=True, help="Path to the data directory") args = parser.parse_args() split_data(args.input) 
Enter fullscreen mode Exit fullscreen mode

Dockerize Your Setup

FROM apache/airflow:2.5.1 USER root # Install any additional packages RUN pip install kaggle # Copy DAGs and scripts COPY dags/ /opt/airflow/dags/ COPY scripts/ /opt/airflow/scripts/ USER airflow 
Enter fullscreen mode Exit fullscreen mode

docker-compose.yml

version: '3' services: airflow-webserver: image: apache/airflow:2.5.1 ports: - "8080:8080" environment: - AIRFLOW__CORE__SQL_ALCHEMY_DATABASE_URI=sqlite:///airflow.db - AIRFLOW__CORE__EXECUTOR=LocalExecutor volumes: - ./dags:/opt/airflow/dags - ./scripts:/opt/airflow/scripts command: webserver airflow-scheduler: image: apache/airflow:2.5.1 environment: - AIRFLOW__CORE__SQL_ALCHEMY_DATABASE_URI=sqlite:///airflow.db - AIRFLOW__CORE__EXECUTOR=LocalExecutor volumes: - ./dags:/opt/airflow/dags - ./scripts:/opt/airflow/scripts command: scheduler 
Enter fullscreen mode Exit fullscreen mode

Run Your Pipeline
Start Airflow Services:

docker-compose up 
Enter fullscreen mode Exit fullscreen mode

Monitor Pipeline:

Access the Airflow UI at http://localhost:8080 to trigger and monitor the pipeline 
Enter fullscreen mode Exit fullscreen mode

GitHub Actions Setup
GitHub Actions allows you to automate workflows directly within your GitHub repository. Here’s how you can set it up to run your Dockerized pipeline:

Create GitHub Actions Workflow
Create a .github/workflows Directory:

mkdir -p .github/workflows 
Enter fullscreen mode Exit fullscreen mode

Create a Workflow File:

.github/workflows/ci-cd.yml

name: CI/CD Pipeline on: push: branches: - main pull_request: branches: - main jobs: build: runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Docker Buildx uses: docker/setup-buildx-action@v2 - name: Build and push Docker image uses: docker/build-push-action@v4 with: context: . push: true tags: your_dockerhub_username/your_image_name:latest - name: Run Docker container run: | docker run -d --name airflow_container -p 8080:8080 your_dockerhub_username/your_image_name:latest 
Enter fullscreen mode Exit fullscreen mode

4. What’s Happening Here?

  • download_data: Automatically downloads the dataset from Kaggle to your NAS.
  • extract_data: Unzips the dataset if needed.
  • clean_data: Cleans the data using your custom script.
  • split_data: Splits the data into training, validation, and testing sets.

5. Run and Monitor Your Pipeline
Access the Airflow UI to manually trigger the DAG or monitor its execution.
Check Logs for detailed information on each task.

6. Optimize and Scale
As your dataset grows or your needs change:

  • Adjust Task Parallelism: Configure Airflow to handle multiple tasks concurrently.
  • Enhance Data Cleaning: Update your cleaning and splitting scripts as needed.
  • Add More Tasks: Integrate additional data processing steps into your pipeline.

Conclusion

Automating your data workflows with Apache Airflow can transform how you manage and process datasets. From downloading and cleaning to splitting and scaling, Airflow’s orchestration capabilities streamline your data pipeline, allowing you to focus on what really matters—analyzing and deriving insights from your data.

So, set up your pipeline today, kick back, and let Airflow do the heavy lifting!

Top comments (0)