Publish AI, ML & data-science insights to a global community of data professionals.

From Configuration to Orchestration: Building an ETL Workflow with AWS Is No Longer a Struggle

A step-by-step guide to leverage AWS services for efficient data pipeline automation

Image by Greg Rakozy, via Unsplash

AWS continues to lead the cloud industry with a whopping 32% share due to its early market entry, robust technology and comprehensive service offerings. However, many users find AWS challenging to navigate, and this discontentment lead more companies and organisations to prefer its competitors Microsoft Azure and Google Cloud Platform.

Despite its steeper learning curve and less intuitive interface, AWS remains the top cloud service due to its reliability, hybrid cloud and maximum service options. More importantly, the selection of proper strategies can significantly reduce configuration complexity, streamline workflows, and boost performance.

In this article, I’ll introduce an efficient way to set up a complete ETL pipeline with orchestration on AWS, based on my own experience. It will also give you a refreshed view on the production of data with AWS or make you feel less struggling when conducting configuration if this is your first time to use AWS for certain tasks.

Strategy for Designing an Efficient Data Pipeline

AWS has the most comprehensive ecosystem with its vast services. To build a production-ready data warehouse on AWS at least requires the following services:

  • IAM – Although this service isn’t included into any part of the workflow, it’s the foundation for accessing all other services.
  • AWS S3 – Data Lake storage
  • AWS Glue – ETL processing
  • Amazon Redshift – Data Warehouse
  • CloudWatch – Monitoring and logging

You also need access to Airflow if you have to schedule more complex dependencies and conduct advanced retries in terms of error handling although Redshift can handle some basic cron jobs.

To make your work easier, I highly recommend to install an IDE (Visual Studio Code or PyCharm and of course you can choose your own favourite IDE). An IDE dramatically improves your efficiency for complex python code, local testing/debugging, version control integration and team collaboration. And in the next session, I’ll provide step by step configurations.

Initial Setup

Here are the steps of initial configurations:

  • Launch a virtual environment in your IDE
  • Install dependencies – basically, we need to install the libraries that will be used later on.
pip install apache-airflow==2.7.0 boto3 pandas pyspark sqlalchemy
  • Install AWS CLI – this step allows you to write scripts to automate various AWS operations and makes the management of AWS resources more efficiently.
  • AWS Configuration – make sure to enter these IAM user credentials when prompted:
    • AWS Access Key ID: From your IAM user.
    • AWS Secret Access Key: From your IAM user.
    • Default region: us-east-1 (or your preferred region)
    • Default output format: json.
  • Integrate Airflow – here are the steps:
    • Initialize Airflow
    • Create DAG files in Airflow
    • Run the web server at http://localhost:8080 (login:admin/admin)
    • Open another terminal tab and start the scheduler
export AIRFLOW_HOME=$(pwd)/airflow airflow db init airflow users create \ --username admin \ --password admin \ --firstname Admin \ --lastname User \ --role Admin \ --email [email protected] #Initialize Airflow
airflow webserver --port 8080 ##run the webserver
airflow scheduler #start the scheduler

Development Workflow: COVID-19 Data Case Study

I’m using JHU’s public COVID-19 dataset (CC BY 4.0 licensed) for demonstration purpose. You can refer to data here,

The chart below shows the workflow from data ingestion to data loading to Redshift tables in the development environment.

Development workflow created by author

Data Ingestion

In the first step of data ingestion to AWS S3, I processed data by melting them to long format and converting the date format. I saved the data in the parquet format to improve the storage efficiency, enhance query performance and reduce storage costs. The code for this step is as below:

import pandas as pd from datetime import datetime import os import boto3 import sys def process_covid_data(): try: # Load raw data url = "https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv" df = pd.read_csv(url) # --- Data Processing --- # 1. Melt to long format df = df.melt( id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], var_name='date_str', value_name='confirmed_cases' ) # 2. Convert dates (JHU format: MM/DD/YY) df['date'] = pd.to_datetime( df['date_str'], format='%m/%d/%y', errors='coerce' ).dropna() # 3. Save as partitioned Parquet output_dir = "covid_processed" df.to_parquet( output_dir, engine='pyarrow', compression='snappy', partition_cols=['date'] ) # 4. Upload to S3 s3 = boto3.client('s3') total_files = 0 for root, _, files in os.walk(output_dir): for file in files: local_path = os.path.join(root, file) s3_path = os.path.join( 'raw/covid/', os.path.relpath(local_path, output_dir) ) s3.upload_file( Filename=local_path, Bucket='my-dev-bucket', Key=s3_path ) total_files += len(files) print(f"Successfully processed and uploaded {total_files} Parquet files") print(f"Data covers from {df['date'].min()} to {df['date'].max()}") return True except Exception as e: print(f"Error: {str(e)}", file=sys.stderr) return False if __name__ == "__main__": process_covid_data()

After running the python code, you should be able to see the parquet files in the S3 buckets, under the folder of ‘raw/covid/’.

Screenshot by author

ETL Pipeline Development

AWS Glue is mainly used for ETL Pipeline Development. Although it can also be used for data ingestion even if the data hasn’t loaded to S3, its strength lies in processing data once it’s in S3 for data warehousing purposes. Here’s PySpark scripts for data transform:

# transform_covid.py from awsglue.context import GlueContext from pyspark.sql.functions import * glueContext = GlueContext(SparkContext.getOrCreate()) df = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://my-dev-bucket/raw/covid/"]}, format="parquet" ).toDF() # Add transformations here df_transformed = df.withColumn("load_date", current_date()) # Write to processed zone df_transformed.write.parquet( "s3://my-dev-bucket/processed/covid/", mode="overwrite" )
Screenshot by author

The next step is to load data to Redshift. In Redshift Console, click on “Query Editor Q2” on the left side and you can edit your SQL code and finish the Redshift COPY.

# Create a table covid_data in dev schema CREATE TABLE dev.covid_data ( "Province/State" VARCHAR(100), "Country/Region" VARCHAR(100), "Lat" FLOAT8, "Long" FLOAT8, date_str VARCHAR(100), confirmed_cases FLOAT8 ) DISTKEY("Country/Region") SORTKEY(date_str);
# COPY data to redshift COPY dev.covid_data ( "Province/State", "Country/Region", "Lat", "Long", date_str, confirmed_cases ) FROM 's3://my-dev-bucket/processed/covid/' IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole' REGION 'your-region' FORMAT PARQUET;

Then you’ll see the data successfully uploaded to the data warehouse.

Screenshot by author

Pipeline Automation

The easiest way to automate your data pipeline is to schedule jobs under Redshift query editor v2 by creating a Stored Procedure (I have a more detailed introduction about SQL Stored Procedure, you can refer to this article).

CREATE OR REPLACE PROCEDURE dev.run_covid_etl() AS $$ BEGIN TRUNCATE TABLE dev.covid_data; COPY dev.covid_data FROM 's3://simba-dev-bucket/raw/covid' IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole' REGION 'your-region' FORMAT PARQUET; END; $$ LANGUAGE plpgsql;
Screenshot by author

Alternatively, you can run Airflow for scheduled jobs.

from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator default_args = { 'owner': 'data_team', 'depends_on_past': False, 'start_date': datetime(2023, 1, 1), 'retries': 2 } with DAG( 'redshift_etl_dev', default_args=default_args, schedule_interval='@daily', catchup=False ) as dag: run_etl = RedshiftSQLOperator( task_id='run_covid_etl', redshift_conn_id='redshift_dev', sql='CALL dev.run_covid_etl()', )

Production Workflow

Airflow DAG is powerful to orchestrates your entire ETL pipeline if there are many dependencies and it’s also a good practice in production environment.

After developing and testing your ETL pipeline, you can automate your tasks in production environment using Airflow.

Production workflow created by author

Here are the check list of key preparation steps to help the successful deployment in Airflow:

  • Create S3 bucket my-prod-bucket 
  • Create Glue job prod_covid_transformation in AWS Console
  • Create Redshift Stored Procedure prod.load_covid_data()
  • Configure Airflow
  • Configure SMTP for emails in airflow.cfg

Then the deployment of the data pipeline in Airflow is:

from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator from airflow.operators.email import EmailOperator # 1. DAG CONFIGURATION default_args = { 'owner': 'data_team', 'retries': 3, 'retry_delay': timedelta(minutes=5), 'start_date': datetime(2023, 1, 1) } # 2. DATA INGESTION FUNCTION def load_covid_data(): import pandas as pd import boto3 url = "https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv" df = pd.read_csv(url) df = df.melt( id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], var_name='date_str', value_name='confirmed_cases' ) df['date'] = pd.to_datetime(df['date_str'], format='%m/%d/%y') df.to_parquet( 's3://my-prod-bucket/raw/covid/', engine='pyarrow', partition_cols=['date'] ) # 3. DAG DEFINITION with DAG( 'covid_etl', default_args=default_args, schedule_interval='@daily', catchup=False ) as dag: # Task 1: Ingest Data ingest = PythonOperator( task_id='ingest_data', python_callable=load_covid_data ) # Task 2: Transform with Glue transform = GlueJobOperator( task_id='transform_data', job_name='prod_covid_transformation', script_args={ '--input_path': 's3://my-prod-bucket/raw/covid/', '--output_path': 's3://my-prod-bucket/processed/covid/' } ) # Task 3: Load to Redshift load = RedshiftSQLOperator( task_id='load_data', sql="CALL prod.load_covid_data()" ) # Task 4: Notifications notify = EmailOperator( task_id='send_email', to='you-email-address', subject='ETL Status: {{ ds }}', html_content='ETL job completed: <a href="{{ ti.log_url }}">View Logs</a>' )

My Final Thoughts

Although some users, especially those who are new to the cloud and seeking simple solutions tend to be daunted by AWS’s high barrier to entry and be overwhelmed by the massive choices of services, it’s worth the time and efforts and here are the reasons:

  • The process of configuration, and the designing, building and testing of the data pipelines gives you the deep understanding of a typical data engineering workflow. The skills will benefit you even if you produce your projects with other cloud services, such as Azure, GCP and Alibaba Cloud.
  • The mature ecosystem that AWS has and a vast array of services that it offers enable users to customise their data architecture strategies and enjoy more flexibility and scalability in their projects.

Thank you for reading! Hope this article helpful to build your cloud-base data pipeline!


Towards Data Science is a community publication. Submit your insights to reach our global audience and earn through the TDS Author Payment Program.

Write for TDS

Related Articles