Skip to content

caesarmario/dq-platform-template-airflow-postgres

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

10 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

πŸ”§ Data Quality Platform Template β€” Airflow Γ— Postgres

Observability (freshness/completeness), rule-based Monitoring, and pluggable Alerting β€” orchestrated with Apache Airflow, logged in PostgreSQL, and visualized in Metabase. JSON & SQL configs for schema-aware validation.


Star Badge GitHub Beacons Kaggle


A modular, config-driven Data Quality (DQ) platform implementing 3 core capabilities:

  • Observability β€” SLA-aware freshness & completeness checks with historical β€œSKIPPED” logic
  • Monitoring β€” declarative SQL rules with dynamic schema casting (JSON-typed)
  • Alerting β€” grouped, severity-based alerts (Slack-style; easily extensible to Telegram/Email)

It aligns with DAMA dimensions (Accuracy, Completeness, Consistency, Uniqueness, Timeliness, Validity) and is production-minded (idempotent tasks, retries, environment separation).


πŸ“ƒ Table of Contents


πŸ”§ Tech Stack

  • Apache Airflow (orchestration)
  • PostgreSQL (ODS staging + DQ logs)
  • Python (observability/monitoring/alerting logic)
  • Metabase (dashboarding)
  • Slack (mocked alert target; channels are pluggable)

βœ… Features

  • Observability

    • Freshness: SLA latency since last update (per table)
    • Completeness: row-count thresholds
    • PASS / FAIL / SKIPPED (for intentionally historical datasets)
  • Monitoring

    • Rules live in tmt_dq.dq_rule_config (SQL metatable)
    • Validation queries are COUNT-based; results logged to dq_violations
    • Dynamic casting using config/dq_schema_config.json to avoid type/operator mismatches
  • Alerting

    • Grouped by table, only High/Critical with alert_sent=false
    • Slack webhook demo; easily extend to Telegram/Email
  • Dashboard

    • Overview metrics, SLA freshness bars, completeness table, top violated rules, violation trends, outstanding high-severity
  • Extensible by config

    • Add tables/columns β†’ update JSON typing
    • Add rules β†’ upsert SQL rows; no code changes required

πŸ—Ί Architecture

Architecture Diagram


πŸ“‚ Repository Structure

tech-design-test-case/ β”œβ”€ airflow/ β”‚ β”œβ”€ dags/ β”‚ β”‚ β”œβ”€ dag_dq_check.py β”‚ β”‚ └─ dag_dq_load_source.py β”‚ └─ plugins/ β”‚ β”œβ”€ dq_utils/ β”‚ β”‚ β”œβ”€ alerting.py β”‚ β”‚ β”œβ”€ monitoring.py β”‚ β”‚ └─ observability.py β”‚ β”œβ”€ load_utils.py β”‚ └─ logging_utils.py β”œβ”€ assets/ β”‚ β”œβ”€ data_framework_mario_caesar.drawio β”‚ β”œβ”€ data_framework_mario_caesar.drawio.png β”‚ β”œβ”€ data_monitoring_ss.png β”‚ β”œβ”€ dq_rule_queries.sql β”‚ └─ metabase_query.sql β”œβ”€ config/ β”‚ └─ dq_schema_config.json β”œβ”€ data/ β”‚ β”œβ”€ customer_transactions.csv β”‚ └─ olist.sqlite β”œβ”€ .env β”œβ”€ .gitignore β”œβ”€ dev_notes.txt β”œβ”€ docker-compose.yml β”œβ”€ Dockerfile β”œβ”€ README.md └─ requirements.txt 


πŸš€ Quickstart

1) Prerequisites

  • Docker & Docker Compose
  • Ports available: 8084 (Airflow), 5436 (Postgres), 3000 (Metabase)

2) Environment

Create a .env file (or edit existing):

POSTGRES_USER=postgres POSTGRES_PASSWORD=postgres POSTGRES_DB=postgres POSTGRES_PORT=5436 SLACK_WEBHOOK_URL=https://hooks.slack.com/services/your/dummy/webhook # optional demo

3) Bring up the stack

docker compose up -d --build # Airflow: http://localhost:8084 # Metabase: http://localhost:3000

4) Airflow Variables (UI β†’ Admin β†’ Variables)

Set variables as needed by your DAGs/utils, for example:

  • DQ_SCHEMA_CONFIG_PATH β†’ /opt/airflow/config/dq_schema_config.json
  • DB_CONN_ID β†’ postgres_default (or your custom connection)
  • FRESHNESS_SLA_MINUTES β†’ 4320 (3 days) β€” example default

Also configure Connections (Admin β†’ Connections) for Postgres if not using defaults.

5) Run DAGs (order)

  1. dag_dq_load_source Loads olist.sqlite and customer_transactions.csv into schema tmt; then triggers…
  2. dag_dq_check Runs: freshness β†’ completeness β†’ monitoring β†’ alerting. Logs go to tmt_dq schema.

πŸ”© Main Modules

  • observability.py
    • Computes table freshness vs SLA, completeness vs expected thresholds
    • Writes time-series logs:
      • tmt_dq.dq_freshness_log(table_name, last_updated, delay_minutes, status, check_time)
      • tmt_dq.dq_completeness_log(table_name, row_count, expected_min_rows, status, check_time)
  • monitoring.py
    • Reads tmt_dq.dq_rule_config(rule_id, rule_name, table_name, rule_sql, severity)
    • Applies JSON-typed casting before running rule_sql
    • Writes tmt_dq.dq_violations(table_name, rule_id, rule_name, violation_count, severity, alert_sent, check_time)
  • alerting.py
    • Aggregates High/Critical where alert_sent=false
    • Sends a grouped Slack message (pluggable) and marks as sent
  • load_utils.py
    • Idempotent DDL + fast COPY/insert flows for SQLite/CSV to Postgres

🧬 Schema-Aware Config (example)

config/dq_schema_config.json (excerpt):

{ "customer_transactions": { "transaction_id": "INTEGER", "customer_id": "TEXT", "transaction_date": "TEXT", "amount": "REAL", "category": "TEXT", "location": "TEXT" }, "order_reviews": { "review_id": "TEXT", "order_id": "TEXT", "review_score": "INTEGER", "review_creation_date": "TEXT" } }

Typical rule rows (see assets/dq_rule_queries.sql):

INSERT INTO tmt_dq.dq_rule_config(rule_id, rule_name, table_name, rule_sql, severity) VALUES (1, 'amount_non_negative', 'customer_transactions', 'SELECT COUNT(*) FROM tmt.customer_transactions WHERE COALESCE(amount,0) < 0', 'medium'), (2, 'review_score_1_5', 'order_reviews', 'SELECT COUNT(*) FROM tmt.order_reviews WHERE review_score NOT BETWEEN 1 AND 5', 'high');

πŸ—„ Data & DQ Schemas

  • ODS / Staging: tmt.* (loaded from SQLite/CSV)
  • DQ Metadata & Logs: tmt_dq.*
    • dq_rule_config β€” rule registry (COUNT-based checks)
    • dq_freshness_log, dq_completeness_log β€” observability
    • dq_violations β€” per-rule violations + alert_sent flag

πŸ“Š Monitoring & Dashboards

Use assets/metabase_query.sql to create cards:

  • Overview: total tables monitored, % pass freshness/completeness, total rules evaluated, total high-severity
  • Observability: freshness SLA by table (bar), completeness last check (table)
  • Monitoring: top violated rules (7d), violation trend by day, violations over time
  • Alerts: outstanding high-severity (where alert_sent=false)

Example screenshot:

DQ Dashboard


πŸ™ Acknowledgements

  • Public Olist datasets for mock relational shapes
  • Community tools: Airflow, Postgres, Metabase, Slack SDKs

πŸ™Œ Support me!

πŸ‘‰ If you find this project useful, please ⭐ this repository πŸ˜†!

πŸ‘‰ More about myself: here