Introduction
This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution.
This project demonstrates several key data engineering concepts:
- API data extraction
- Data transformation using pandas
- Database integration with PostgreSQL
- Workflow orchestration with Apache Airflow
- Deployment to a cloud environment
System Architecture
The pipeline consists of the following components:
- Data Source: Polygon.io API providing cryptocurrency price data
- ETL Script: Python script that handles extraction, transformation, and loading
- Database: PostgreSQL for data storage
- Orchestration: Apache Airflow for scheduling and monitoring
- Infrastructure: Cloud VM for hosting the pipeline
The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.
Detailed Implementation
Step 1: Creating the ETL Script
The first component is btc_prices.py
, which handles the core ETL functionality:
import requests import os from sqlalchemy import create_engine import pandas as pd from datetime import datetime from dotenv import load_dotenv # Define API endpoint url = 'https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY' response = requests.get(url) if response.status_code == 200: data = response.json() open_price = data.get('open') close_price = data.get('close') date = data.get('day') symbol = data.get('symbol') else: print(f"Failed to retrieve data: {response.status}") exit() # Prepare data for insertion data_df = { 'symbol': symbol, 'open_price': open_price, 'close_price': close_price, 'date': date } df = pd.DataFrame(data_df, index=[0]) df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d') # Load environment variables load_dotenv() dbname = os.getenv('dbname') user = os.getenv('user') password = os.getenv('password') host = os.getenv('host') port = os.getenv('port') # Create database connection engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}') df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering") print(f"Successfully loaded crypto data for {df['date'][0]}")
This script:
- Extracts Bitcoin price data from the Polygon.io API
- Transforms and structures the data using pandas
- Loads the data into PostgreSQL
- Uses environment variables for secure database connection management
Step 2: Creating the Airflow DAG
Next, the btc_dag.py
defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash_operator import BashOperator # DAG default arguments default_args = { "owner": "data_engineer", "depends_on_past": False, "start_date": datetime(2025, 3, 31), "email_on_failure": False, "email_on_retry": True, "retries": 2, "retry_delay": timedelta(minutes=2) } with DAG( 'polygon_btc_data', default_args=default_args, schedule_interval='@daily', ) as dag: activate_venv = BashOperator( task_id='activate_virtual_env', bash_command='source /home/user/project/venv/bin/activate', ) execute_file = BashOperator( task_id='execute_python_file', bash_command='python /home/user/project/btc_prices.py', ) activate_venv >> execute_file
This DAG:
- Defines the execution schedule
- Activates the virtual environment
- Executes the ETL script
Step 3: Setting Up the Environment
- Creating a Virtual Environment:
python -m venv venv source venv/bin/activate
- Installing Dependencies:
pip install requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow
- Setting Up Environment Variables:
echo "dbname=your_database_name" >> .env echo "user=your_database_user" >> .env echo "password=your_database_password" >> .env echo "host=your_database_host" >> .env echo "port=your_database_port" >> .env
Step 4: Server Deployment
- SSH into the cloud VM:
ssh user@your_server_ip
- Create necessary directories:
mkdir -p ~/crypto_price mkdir -p ~/airflow/dags
- Transfer scripts to the server:
scp btc_prices.py user@your_server_ip:~/crypto_price/ scp btc_dag.py user@your_server_ip:~/airflow/dags/
Step 5: PostgreSQL Configuration
- Creating Database Schema:
CREATE SCHEMA IF NOT EXISTS dataengineering; CREATE TABLE IF NOT EXISTS dataengineering.crypto_prices ( id SERIAL PRIMARY KEY, symbol VARCHAR(10) NOT NULL, open_price NUMERIC(20, 8) NOT NULL, close_price NUMERIC(20, 8) NOT NULL, date DATE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
Conclusion
The architecture follows best practices for data engineering:
- Separation of extraction, transformation, and loading concerns
- Secure credential management
- Robust error handling
- Automated scheduling
- Cloud-based deployment
The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.
Top comments (0)