DEV Community

Cover image for Apache Airflow 初體驗
HyperRedStart
HyperRedStart

Posted on

Apache Airflow 初體驗

Airflow 是由 Airbnb 所貢獻的 Apache 頂級專案,用於流程控制,建立有向無環的工作流程(DAGs) ,以事件排程的方式部屬我們的 DAG Schedule 並可以部屬到各個雲端平台上, GCP / AWS / Azure,可透過網頁 UI 調配部屬排程,主打彈性、高擴充性、搭配Jinja Template Engine 優雅的設計 Workflow 工作流程!

1.建置 Airflow 環境 - 使用 Docker

下載官方提供 docker-compose 文件

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.2.4/docker-compose.yaml' 
Enter fullscreen mode Exit fullscreen mode

產生相依目錄並設定 Airflow 使用者權限

mkdir -p ./dags ./logs ./plugins echo -e "AIRFLOW_UID=$(id -u)" > .env 
Enter fullscreen mode Exit fullscreen mode

2.資料庫初始化

docker-compose up airflow-init 
Enter fullscreen mode Exit fullscreen mode

3.啟動 Airflow 服務

docker-compose up -d 
Enter fullscreen mode Exit fullscreen mode

airflow service

確認我們的 Container 是否都健康!
dockerps

4.下載 airflow 命令工具

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.1/airflow.sh' chmod +x airflow.sh 
Enter fullscreen mode Exit fullscreen mode

5.撰寫 Demo 程式

將我們第一隻 DAGs 程式放至於 Container掛載目錄 dags 下
dags/test_app_v1.py

import time from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator default_args = { 'owner' : '', 'start_date' : datetime(2022, 1, 1, 0, 0), 'schedule_interval': '@daily', 'retries':2, 'retry_delay': timedelta(minutes = 1) } def fn_one(): print('execute jobs') with DAG('test_app_v1' ,default_args=default_args) as dag: tesk = PythonOperator( task_id = 'one', python_callable=fn_one ) 
Enter fullscreen mode Exit fullscreen mode

6.透過 airflow.sh 將執行我們的 DAG 程式

# 建立運行 python container  ./airflow.sh bash # 執行 python 檔案 python dags/test_app_v1.py 
Enter fullscreen mode Exit fullscreen mode

7.網頁中查看新建立的 test_app_v1 DAG

Airflow Web Server
http://localhost:8080
airflow/airflow

在執行py腳本後我們就可以在 Airflow 首頁中查找到我們的 DAG 排程 !

dag

查看執行狀態,使否有異常失敗等情況
status

8.REST API

使用 Airflow Rest Api 可以取代我們在 WebUI 上的工作,進行 DAG 的抓取執行刪除等工作。

https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

curl -X GET 'http://10.1.200.67:8080/api/v1/dags' --user "airflow:airflow" 
Enter fullscreen mode Exit fullscreen mode
{ "dag_id": "test_app_v1", "description": null, "file_token": "", "fileloc": "/opt/airflow/dags/test_app_v1.py", "is_active": true, "is_paused": false, "is_subdag": false, "owners": [], "root_dag_id": null, "schedule_interval": { "__type": "TimeDelta", "days": 1, "microseconds": 0, "seconds": 0 }, "tags": [] } 
Enter fullscreen mode Exit fullscreen mode

Conclusion

很多需要進行排定的工作項目都可以在 Airflow 的幫助下進行運行,搭配 python的語法可以很快速的撰寫爬蟲、資料轉換、事件觸發等工作,Airflow 提供WebUI操作畫面可以讓我們更便利的去對我們的 DAGs 進行監控,解決了我們在設計排程上的諸多困難!

Top comments (0)