Building a Data Pipeline using Apache Airflow (on AWS / GCP) Yohei Onishi PyCon SG 2019, Oct. 11 2019
Presenter Profile ● Name: Yohei Onishi ● Data Engineer at a Japanese retail company ● Based in Singapore since Oct. 2018 ● Apache Airflow Contributor 2
Objective ● Expected audiences: Data engineers ○ who are working on building a data pipleline ○ who are looking for a better workflow solution ● Goal: Provide the following so they can start using Airflow ○ Airflow overview and how to author workflow ○ Airflow cluster and CI/CD pipeline ○ Data engineering services on AWS / GCP 3
Data pipeline data source collect ETL analytics data consumer micro services enterprise systems IoT devices object storage message queue micro services enterprise systems BI tool 4
Example: logistics operation monitoring factory warehouse store WH receipt / shipment store receipt inventory management system shipment order FA shipment regional logistics operators ETL KPI report 5
Airflow overview ● Open sourced by Airbnb and Apache top project ● Cloud Composer: managed Airflow cluster on GCP ● Dynamic workflow generation by Python code ● Easily extensible so you can fit it to your usecase ● Scalable by using a message queue to orchestrate arbitrary number of workers ● Workflow visualization 6
Example: Copy a file from s3 bucket to another export records as CSV Singapore region US region EU region transfer it to a regional bucket 7 local region
DEMO: UI and source code sample code: https://github.com/yohei1126/pycon-apac-2019-airflow-sample 8
Concept: Directed acyclic graph, operator, task, etc custom_param_per_dag = {'sg': { ... }, 'eu': { ... }, 'us': { ... }} for region, v in custom_param_per_dag.items(): dag = DAG('shipment_{}'.format(region), ...) export = PostgresToS3Operator(task_id='db_to_s3', ...) transfer = S3CopyObjectOperator(task_id='s3_to_s3', ...) export >> transfer globals()[dag] = dag 9
template t1 = PostgresToS3Operator( task_id='db_to_s3', sql="SELECT * FROM shipment WHERE region = '{{ params.region }}' AND ship_date = '{{ execution_date.strftime("%Y-%m-%d") }}'", bucket=default_args['source_bucket'], object_key='{{ params.region }}/{{ execution_date.strftime("%Y%m%d%H%M%S") }}.csv', params={'region':region}, dag=dag) 10
Operator class PostgresToS3Operator(BaseOperator): template_fields = ('sql', 'bucket', 'object_key') def __init__(self, ..., *args, **kwargs): super(PostgresToS3Operator, self).__init__(*args, **kwargs) ... def execute(self, context): ... 11
Building a data pipeline: AWS vs GCP 12 AWS (2 years ago) GCP (current) Workflow (Airflow cluster) EC2 (or ECS / EKS) Cloud Composer Big data processing Spark on EC2 (or EMR) Cloud Dataflow (or Dataproc) Data warehouse Hive on EC2 -> Athena (or Hive on EMR / Redshift) BigQuery CI / CD Jenkins on EC2 (or Code Build) Cloud Build
AWS: Airflow cluster executor (1..N) worker node (1) executor (1..N) worker node (2) executor (1..N) worker node (1) ... scheduler master node (1) web server master node (2) web server LB admin Airflow metadata DBCelery result backend message broker 13 http://site.clairvoyantsoft.com/setting-apache-airflow-cluster/
GCP: Airflow Cluster = Cloud Composer ● Fully managed Airflow cluster provided by GCP ○ Fully managed ○ Built in integrated with the other GCP services ● To focus on business logic, you should build Airflow cluster using GCP composer 14
GCP: Airflow Cluster = Cloud Composer 15https://cloud.google.com/composer/docs/concepts/overview ● Airflow cluster on Google Kubernetes Engine can be easilly created by CLI or Web console ● Allowed changes to the cluster: increase number of worker node or install Python modules ● You can not install Linux command to worker node.
AWS: Running Spark job in client mode https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html 16 Airflow worker node ● Build Spark cluster out side of Airflow cluster ● official SparkSQLOperator does not support cluster mode ● Use official SparkSubmitOperator or extend official SparkSQLOperator ● Note: if you run Spark job with client mode SparkDriver run on Airflow worker node. This will cause out of memory on driver side.
AWS: Running Spark job in cluster mode https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html 17 Airflow worker node ● Specifying cluster mode in SparkSubmitOperator in your Airflow DAG ● Then your Spark job runs on YARN container (Spark cluster) ● This gives enough memory to Spark driver
GCP: Big data processing = Cloud Dataflow ● Fully managed service streaming / batch data processing ● Single API for both batch and streaming data ● Develop a pipeline in Apache Beam SDK (Java, Python and Go) ● Fully integrated with GCP services ● https://cloud.google.com/dataflow/ 18
GCP: Big data processing = Cloud Dataflow 19 Airflow executor Airflow worker node (Composer) Dataflow Java (Jar) Dataflow Python Dataflow GCS Dataflow template (Java or Python) upload template in advance load template and deploy jobs (2) run template deploy Dataflow job (1) run local code
Data warehouse: Hive / Athena / BigQuery 20 Hive AWS Athena BigQuery Managed or not Nor Fully managed Fully managed Pricing model Pay for computer resource Pay for usage Pay by usage Standard SQL No (HiveQL) Yes Yes Data load Required Not required Required Partitioning Any column Any column Daily partition Scalability Depends on your cluster size Mid High (peta bytes)
AWS: Data warehouse = Athena 21 Airflow workerAthena S3 (data storage) S3 (destination) query export query result run query ● AWSAthenaOperator support query ● Explicit table partitioning is needed
GCP: Data warehouse = BigQuery 22 Composer (Airflow cluster) BigQuery GCS (data storage) GCS (destination) (1) load (3) export query result (2) run query
AWS: CI/CD pipeline AWS SNS AWS SQS Github repo raise / merge a PR Airflow worker polling run Ansible script git pull test deployment 23
GCP: CI/CD pipeline 24 Github repo Cloud Build (Test and deploy) GCS (provided from Composer) Composer (Airflow cluster) trigger build deploy automaticallyupload merge a PR
Building a data pipeline: AWS vs GCP 25 AWS (2 years ago) GCP (current) Workflow (Airflow cluster) EC2 (or ECS / EKS) Cloud Composer Big data processing Spark on EC2 (or EMR) Cloud Dataflow (or Dataproc) Data warehouse Hive on EC2 -> Athena (or Hive on EMR / Redshift) BigQuery CI / CD Jenkins on EC2 (or Code Build) Cloud Build recommended
Summary ● Data Engineers have to build reliable and scalable data pipeline to accelate data analytics activities ● Airflow is great tool to author and monitor workflow ● HA cluster is required in production ● IMHO GCP provide better managed service for data pipeline and data warehouse 26
References ● Apache Airflow ● GCP Cloud Composer ● Airflow: a workflow management platform ● ETL best practices in Airflow 1.8 ● Data Science for Startups: Data Pipelines ● Airflow: Tips, Tricks, and Pitfalls 27
Thank you! 28

Building a Data Pipeline using Apache Airflow (on AWS / GCP)

  • 1.
    Building a DataPipeline using Apache Airflow (on AWS / GCP) Yohei Onishi PyCon SG 2019, Oct. 11 2019
  • 2.
    Presenter Profile ● Name:Yohei Onishi ● Data Engineer at a Japanese retail company ● Based in Singapore since Oct. 2018 ● Apache Airflow Contributor 2
  • 3.
    Objective ● Expected audiences:Data engineers ○ who are working on building a data pipleline ○ who are looking for a better workflow solution ● Goal: Provide the following so they can start using Airflow ○ Airflow overview and how to author workflow ○ Airflow cluster and CI/CD pipeline ○ Data engineering services on AWS / GCP 3
  • 4.
    Data pipeline data sourcecollect ETL analytics data consumer micro services enterprise systems IoT devices object storage message queue micro services enterprise systems BI tool 4
  • 5.
    Example: logistics operationmonitoring factory warehouse store WH receipt / shipment store receipt inventory management system shipment order FA shipment regional logistics operators ETL KPI report 5
  • 6.
    Airflow overview ● Opensourced by Airbnb and Apache top project ● Cloud Composer: managed Airflow cluster on GCP ● Dynamic workflow generation by Python code ● Easily extensible so you can fit it to your usecase ● Scalable by using a message queue to orchestrate arbitrary number of workers ● Workflow visualization 6
  • 7.
    Example: Copy afile from s3 bucket to another export records as CSV Singapore region US region EU region transfer it to a regional bucket 7 local region
  • 8.
    DEMO: UI andsource code sample code: https://github.com/yohei1126/pycon-apac-2019-airflow-sample 8
  • 9.
    Concept: Directed acyclicgraph, operator, task, etc custom_param_per_dag = {'sg': { ... }, 'eu': { ... }, 'us': { ... }} for region, v in custom_param_per_dag.items(): dag = DAG('shipment_{}'.format(region), ...) export = PostgresToS3Operator(task_id='db_to_s3', ...) transfer = S3CopyObjectOperator(task_id='s3_to_s3', ...) export >> transfer globals()[dag] = dag 9
  • 10.
    template t1 = PostgresToS3Operator( task_id='db_to_s3', sql="SELECT* FROM shipment WHERE region = '{{ params.region }}' AND ship_date = '{{ execution_date.strftime("%Y-%m-%d") }}'", bucket=default_args['source_bucket'], object_key='{{ params.region }}/{{ execution_date.strftime("%Y%m%d%H%M%S") }}.csv', params={'region':region}, dag=dag) 10
  • 11.
    Operator class PostgresToS3Operator(BaseOperator): template_fields =('sql', 'bucket', 'object_key') def __init__(self, ..., *args, **kwargs): super(PostgresToS3Operator, self).__init__(*args, **kwargs) ... def execute(self, context): ... 11
  • 12.
    Building a datapipeline: AWS vs GCP 12 AWS (2 years ago) GCP (current) Workflow (Airflow cluster) EC2 (or ECS / EKS) Cloud Composer Big data processing Spark on EC2 (or EMR) Cloud Dataflow (or Dataproc) Data warehouse Hive on EC2 -> Athena (or Hive on EMR / Redshift) BigQuery CI / CD Jenkins on EC2 (or Code Build) Cloud Build
  • 13.
    AWS: Airflow cluster executor (1..N) workernode (1) executor (1..N) worker node (2) executor (1..N) worker node (1) ... scheduler master node (1) web server master node (2) web server LB admin Airflow metadata DBCelery result backend message broker 13 http://site.clairvoyantsoft.com/setting-apache-airflow-cluster/
  • 14.
    GCP: Airflow Cluster= Cloud Composer ● Fully managed Airflow cluster provided by GCP ○ Fully managed ○ Built in integrated with the other GCP services ● To focus on business logic, you should build Airflow cluster using GCP composer 14
  • 15.
    GCP: Airflow Cluster= Cloud Composer 15https://cloud.google.com/composer/docs/concepts/overview ● Airflow cluster on Google Kubernetes Engine can be easilly created by CLI or Web console ● Allowed changes to the cluster: increase number of worker node or install Python modules ● You can not install Linux command to worker node.
  • 16.
    AWS: Running Sparkjob in client mode https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html 16 Airflow worker node ● Build Spark cluster out side of Airflow cluster ● official SparkSQLOperator does not support cluster mode ● Use official SparkSubmitOperator or extend official SparkSQLOperator ● Note: if you run Spark job with client mode SparkDriver run on Airflow worker node. This will cause out of memory on driver side.
  • 17.
    AWS: Running Sparkjob in cluster mode https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_running_spark_on_yarn.html 17 Airflow worker node ● Specifying cluster mode in SparkSubmitOperator in your Airflow DAG ● Then your Spark job runs on YARN container (Spark cluster) ● This gives enough memory to Spark driver
  • 18.
    GCP: Big dataprocessing = Cloud Dataflow ● Fully managed service streaming / batch data processing ● Single API for both batch and streaming data ● Develop a pipeline in Apache Beam SDK (Java, Python and Go) ● Fully integrated with GCP services ● https://cloud.google.com/dataflow/ 18
  • 19.
    GCP: Big dataprocessing = Cloud Dataflow 19 Airflow executor Airflow worker node (Composer) Dataflow Java (Jar) Dataflow Python Dataflow GCS Dataflow template (Java or Python) upload template in advance load template and deploy jobs (2) run template deploy Dataflow job (1) run local code
  • 20.
    Data warehouse: Hive/ Athena / BigQuery 20 Hive AWS Athena BigQuery Managed or not Nor Fully managed Fully managed Pricing model Pay for computer resource Pay for usage Pay by usage Standard SQL No (HiveQL) Yes Yes Data load Required Not required Required Partitioning Any column Any column Daily partition Scalability Depends on your cluster size Mid High (peta bytes)
  • 21.
    AWS: Data warehouse= Athena 21 Airflow workerAthena S3 (data storage) S3 (destination) query export query result run query ● AWSAthenaOperator support query ● Explicit table partitioning is needed
  • 22.
    GCP: Data warehouse= BigQuery 22 Composer (Airflow cluster) BigQuery GCS (data storage) GCS (destination) (1) load (3) export query result (2) run query
  • 23.
    AWS: CI/CD pipeline AWSSNS AWS SQS Github repo raise / merge a PR Airflow worker polling run Ansible script git pull test deployment 23
  • 24.
    GCP: CI/CD pipeline 24 Githubrepo Cloud Build (Test and deploy) GCS (provided from Composer) Composer (Airflow cluster) trigger build deploy automaticallyupload merge a PR
  • 25.
    Building a datapipeline: AWS vs GCP 25 AWS (2 years ago) GCP (current) Workflow (Airflow cluster) EC2 (or ECS / EKS) Cloud Composer Big data processing Spark on EC2 (or EMR) Cloud Dataflow (or Dataproc) Data warehouse Hive on EC2 -> Athena (or Hive on EMR / Redshift) BigQuery CI / CD Jenkins on EC2 (or Code Build) Cloud Build recommended
  • 26.
    Summary ● Data Engineershave to build reliable and scalable data pipeline to accelate data analytics activities ● Airflow is great tool to author and monitor workflow ● HA cluster is required in production ● IMHO GCP provide better managed service for data pipeline and data warehouse 26
  • 27.
    References ● Apache Airflow ●GCP Cloud Composer ● Airflow: a workflow management platform ● ETL best practices in Airflow 1.8 ● Data Science for Startups: Data Pipelines ● Airflow: Tips, Tricks, and Pitfalls 27
  • 28.