Observability (freshness/completeness), rule-based Monitoring, and pluggable Alerting β orchestrated with Apache Airflow, logged in PostgreSQL, and visualized in Metabase. JSON & SQL configs for schema-aware validation.
A modular, config-driven Data Quality (DQ) platform implementing 3 core capabilities:
- Observability β SLA-aware freshness & completeness checks with historical βSKIPPEDβ logic
- Monitoring β declarative SQL rules with dynamic schema casting (JSON-typed)
- Alerting β grouped, severity-based alerts (Slack-style; easily extensible to Telegram/Email)
It aligns with DAMA dimensions (Accuracy, Completeness, Consistency, Uniqueness, Timeliness, Validity) and is production-minded (idempotent tasks, retries, environment separation).
- Tech Stack
- Features
- Architecture
- Repository Structure
- Quickstart
- Main Modules
- Schema-Aware Config (example)
- Data & DQ Schemas
- Monitoring & Dashboards
- Acknowledgements
- Support me!
- Apache Airflow (orchestration)
- PostgreSQL (ODS staging + DQ logs)
- Python (observability/monitoring/alerting logic)
- Metabase (dashboarding)
- Slack (mocked alert target; channels are pluggable)
-
Observability
- Freshness: SLA latency since last update (per table)
- Completeness: row-count thresholds
PASS/FAIL/SKIPPED(for intentionally historical datasets)
-
Monitoring
- Rules live in
tmt_dq.dq_rule_config(SQL metatable) - Validation queries are COUNT-based; results logged to
dq_violations - Dynamic casting using
config/dq_schema_config.jsonto avoid type/operator mismatches
- Rules live in
-
Alerting
- Grouped by table, only High/Critical with
alert_sent=false - Slack webhook demo; easily extend to Telegram/Email
- Grouped by table, only High/Critical with
-
Dashboard
- Overview metrics, SLA freshness bars, completeness table, top violated rules, violation trends, outstanding high-severity
-
Extensible by config
- Add tables/columns β update JSON typing
- Add rules β upsert SQL rows; no code changes required
tech-design-test-case/ ββ airflow/ β ββ dags/ β β ββ dag_dq_check.py β β ββ dag_dq_load_source.py β ββ plugins/ β ββ dq_utils/ β β ββ alerting.py β β ββ monitoring.py β β ββ observability.py β ββ load_utils.py β ββ logging_utils.py ββ assets/ β ββ data_framework_mario_caesar.drawio β ββ data_framework_mario_caesar.drawio.png β ββ data_monitoring_ss.png β ββ dq_rule_queries.sql β ββ metabase_query.sql ββ config/ β ββ dq_schema_config.json ββ data/ β ββ customer_transactions.csv β ββ olist.sqlite ββ .env ββ .gitignore ββ dev_notes.txt ββ docker-compose.yml ββ Dockerfile ββ README.md ββ requirements.txt - Docker & Docker Compose
- Ports available:
8084(Airflow),5436(Postgres),3000(Metabase)
Create a .env file (or edit existing):
POSTGRES_USER=postgres POSTGRES_PASSWORD=postgres POSTGRES_DB=postgres POSTGRES_PORT=5436 SLACK_WEBHOOK_URL=https://hooks.slack.com/services/your/dummy/webhook # optional demodocker compose up -d --build # Airflow: http://localhost:8084 # Metabase: http://localhost:3000Set variables as needed by your DAGs/utils, for example:
DQ_SCHEMA_CONFIG_PATHβ/opt/airflow/config/dq_schema_config.jsonDB_CONN_IDβpostgres_default(or your custom connection)FRESHNESS_SLA_MINUTESβ4320(3 days) β example default
Also configure Connections (Admin β Connections) for Postgres if not using defaults.
dag_dq_load_sourceLoadsolist.sqliteandcustomer_transactions.csvinto schematmt; then triggersβ¦dag_dq_checkRuns: freshness β completeness β monitoring β alerting. Logs go totmt_dqschema.
observability.py- Computes table freshness vs SLA, completeness vs expected thresholds
- Writes time-series logs:
tmt_dq.dq_freshness_log(table_name, last_updated, delay_minutes, status, check_time)tmt_dq.dq_completeness_log(table_name, row_count, expected_min_rows, status, check_time)
monitoring.py- Reads
tmt_dq.dq_rule_config(rule_id, rule_name, table_name, rule_sql, severity) - Applies JSON-typed casting before running
rule_sql - Writes
tmt_dq.dq_violations(table_name, rule_id, rule_name, violation_count, severity, alert_sent, check_time)
- Reads
alerting.py- Aggregates High/Critical where
alert_sent=false - Sends a grouped Slack message (pluggable) and marks as sent
- Aggregates High/Critical where
load_utils.py- Idempotent DDL + fast COPY/insert flows for SQLite/CSV to Postgres
config/dq_schema_config.json (excerpt):
{ "customer_transactions": { "transaction_id": "INTEGER", "customer_id": "TEXT", "transaction_date": "TEXT", "amount": "REAL", "category": "TEXT", "location": "TEXT" }, "order_reviews": { "review_id": "TEXT", "order_id": "TEXT", "review_score": "INTEGER", "review_creation_date": "TEXT" } }Typical rule rows (see assets/dq_rule_queries.sql):
INSERT INTO tmt_dq.dq_rule_config(rule_id, rule_name, table_name, rule_sql, severity) VALUES (1, 'amount_non_negative', 'customer_transactions', 'SELECT COUNT(*) FROM tmt.customer_transactions WHERE COALESCE(amount,0) < 0', 'medium'), (2, 'review_score_1_5', 'order_reviews', 'SELECT COUNT(*) FROM tmt.order_reviews WHERE review_score NOT BETWEEN 1 AND 5', 'high');- ODS / Staging:
tmt.*(loaded from SQLite/CSV) - DQ Metadata & Logs:
tmt_dq.*dq_rule_configβ rule registry (COUNT-based checks)dq_freshness_log,dq_completeness_logβ observabilitydq_violationsβ per-rule violations +alert_sentflag
Use assets/metabase_query.sql to create cards:
- Overview: total tables monitored, % pass freshness/completeness, total rules evaluated, total high-severity
- Observability: freshness SLA by table (bar), completeness last check (table)
- Monitoring: top violated rules (7d), violation trend by day, violations over time
- Alerts: outstanding high-severity (where
alert_sent=false)
Example screenshot:
- Public Olist datasets for mock relational shapes
- Community tools: Airflow, Postgres, Metabase, Slack SDKs
π More about myself: here

