Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ In practice, a single generic DLT pipeline reads the Dataflowspec and uses it to
| Liquid cluster support | Bronze, Bronze Quarantine, Silver tables|
| [DLT-META CLI](https://databrickslabs.github.io/dlt-meta/getting_started/dltmeta_cli/) | ```databricks labs dlt-meta onboard```, ```databricks labs dlt-meta deploy``` |
| Bronze and Silver pipeline chaining | Deploy dlt-meta pipeline with ```layer=bronze_silver``` option using Direct publishing mode |
| [DLT Sinks](https://docs.databricks.com/aws/en/delta-live-tables/dlt-sinks) |Supported formats:external ```delta table```, ```kafka```.Bronze, Silver layers|

## Getting Started

Expand Down
51 changes: 51 additions & 0 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
4. [Append FLOW Eventhub Demo](#append-flow-eventhub-demo): Write to same target from multiple sources using [dlt.append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#append-flows) and adding [File metadata column](https://docs.databricks.com/en/ingestion/file-metadata-column.html)
5. [Silver Fanout Demo](#silver-fanout-demo): This demo showcases the implementation of fanout architecture in the silver layer.
6. [Apply Changes From Snapshot Demo](#Apply-changes-from-snapshot-demo): This demo showcases the implementation of ingesting from snapshots in bronze layer
7. [DLT Sink Demo](#dlt-sink-demo): This demo showcases the implementation of write to external sinks like delta and kafka

The source argument is optional for the demos.

Expand Down Expand Up @@ -257,3 +258,53 @@ This demo will perform following tasks:
python demo/launch_acfs_demo.py --uc_catalog_name=<<uc catalog name>> --profile=<<DEFAULT>>
```
![acfs.png](../docs/static/images/acfs.png)

# DLT Sink Demo
- This demo will perform following steps
- Showcase onboarding process for dlt writing to external sink pattern
- Run onboarding for the bronze iot events.
- Publish test events to kafka topic
- Run Bronze DLT which will read from kafka source topic and write to
- events delta table into uc
- create quarantine table as per data quality expectations
- writes to external kafka topics
- writes to external dbfs location as external delta sink
### Steps:
1. Launch Command Prompt

2. Install [Databricks CLI](https://docs.databricks.com/dev-tools/cli/index.html)

3. ```commandline
git clone https://github.com/databrickslabs/dlt-meta.git
```

4. ```commandline
cd dlt-meta
```
5. Set python environment variable into terminal
```commandline
dlt_meta_home=$(pwd)
```
```commandline
export PYTHONPATH=$dlt_meta_home
```

6. Optional: if you are using secrets for kafka. Create databricks secrets scope for source and sink kafka using below command
```commandline
databricks secrets create-scope <<name>>
```
```commandline
databricks secrets put-secret --json '{
"scope": "<<name>>",
"key": "<<keyname>>",
"string_value": "<<value>>"
}'
```

7. Run the command
```commandline
python demo/launch_dlt_sink_demo.py --uc_catalog_name=<<uc_catalog_name>> --source=kafka --kafka_source_topic=<<kafka source topic name>>>> --kafka_sink_topic=<<kafka sink topic name>> --kafka_source_servers_secrets_scope_name=<<kafka source servers secret name>> --kafka_source_servers_secrets_scope_key=<<kafka source server secret scope key name>> --kafka_sink_servers_secret_scope_name=<<kafka sink server secret scope key name>> --kafka_sink_servers_secret_scope_key=<<kafka sink servers secret scope key name>> --profile=<<DEFAULT>>
```
![dlt_demo_sink.png](../docs/static/images/dlt_demo_sink.png)
![dlt_delta_sink.png](../docs/static/images/dlt_delta_sink.png)
![dlt_kafka_sink.png](../docs/static/images/dlt_kafka_sink.png)
52 changes: 52 additions & 0 deletions demo/conf/kafka-sink-onboarding.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
[
{
"data_flow_id": "103",
"data_flow_group": "A1",
"source_system": "Sensor Device",
"source_format": "kafka",
"source_details": {
"source_schema_path": "{uc_volume_path}/demo/resources/ddl/eventhub_iot_schema.ddl",
"subscribe": "{kafka_source_topic}",
"kafka.security.protocol": "PLAINTEXT",
"kafka_source_servers_secrets_scope_name":"{kafka_source_servers_secrets_scope_name}",
"kafka_source_servers_secrets_scope_key":"{kafka_source_servers_secrets_scope_key}"
},
"bronze_reader_options": {
"startingOffsets": "earliest",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "60000"
},
"bronze_database_demo": "{uc_catalog_name}.{bronze_schema}",
"bronze_table": "bronze_{run_id}_iot",
"bronze_partition_columns": "date",
"bronze_table_path_demo": "{uc_volume_path}/data/bronze/iot",
"bronze_data_quality_expectations_json_demo": "{uc_volume_path}/demo/conf/dqe/iot/bronze_data_quality_expectations.json",
"bronze_database_quarantine_demo": "{uc_catalog_name}.{bronze_schema}",
"bronze_quarantine_table": "bronze_{run_id}_iot_quarantine",
"bronze_quarantine_table_path_demo": "{uc_volume_path}/data/bronze/iot_quarantine",
"bronze_sinks": [
{
"name": "bronze_customer_kafka_sink",
"format": "kafka",
"options": {
"kafka_sink_servers_secret_scope_name":"{kafka_sink_servers_secret_scope_name}",
"kafka_sink_servers_secret_scope_key":"{kafka_sink_servers_secret_scope_key}",
"kafka.security.protocol":"PLAINTEXT",
"kafka.security.protocol":"PLAINTEXT",
"topic":"{kafka_sink_topic}"
},
"select_exp":["value"],
"where_clause":"value is not null"
},
{
"name": "bronze_customer_delta_sink",
"format": "delta",
"options": {
"path":"dbfs:/mnt/dltmeta_sink/iot"
},
"select_exp":["value"],
"where_clause":"value is not null"
}
]
}
]
90 changes: 90 additions & 0 deletions demo/launch_dlt_sink_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@

import uuid
from src.install import WorkspaceInstaller
from integration_tests.run_integration_tests import (
DLTMETARunner,
DLTMetaRunnerConf,
get_workspace_api_client,
process_arguments
)
import traceback


class DLTMETASinkDemo(DLTMETARunner):

def __init__(self, args, ws, base_dir):
self.args = args
self.ws = ws
self.wsi = WorkspaceInstaller(ws)
self.base_dir = base_dir

def run(self, runner_conf: DLTMetaRunnerConf):
"""
Runs the DLT-META Sink Demo by calling the necessary methods in the correct order.

Parameters:
- runner_conf: The DLTMetaRunnerConf object containing the runner configuration parameters.
"""
try:
self.init_dltmeta_runner_conf(runner_conf)
self.create_bronze_silver_dlt(runner_conf)
self.launch_workflow(runner_conf)
except Exception as e:
print(e)
traceback.print_exc()
# finally:
# self.clean_up(runner_conf)

def init_runner_conf(self) -> DLTMetaRunnerConf:
"""
Initialize the runner configuration for running integration tests.

Returns:
-------
DLTMetaRunnerConf
The initialized runner configuration.
"""
run_id = uuid.uuid4().hex
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self.wsi._my_username,
uc_catalog_name=self.args["uc_catalog_name"],
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_demo_{run_id}",
runners_nb_path=f"/Users/{self.wsi._my_username}/dlt_meta_demo/{run_id}",
source="kafka",
kafka_template="demo/conf/kafka-sink-onboarding.template",
kafka_source_topic=self.args["kafka_source_topic"],
kafka_source_servers_secrets_scope_name=self.args["kafka_source_servers_secrets_scope_name"],
kafka_source_servers_secrets_scope_key=self.args["kafka_source_servers_secrets_scope_key"],
kafka_sink_topic=self.args["kafka_sink_topic"],
kafka_sink_servers_secret_scope_name=self.args["kafka_sink_servers_secret_scope_name"],
kafka_sink_servers_secret_scope_key=self.args["kafka_sink_servers_secret_scope_key"],
env="demo",
onboarding_file_path="demo/conf/onboarding.json",
runners_full_local_path='./demo/notebooks/dlt_sink_runners/',
test_output_file_path=(
f"/Users/{self.wsi._my_username}/dlt_meta_demo/"
f"{run_id}/demo-output.csv"
),
)

return runner_conf

def launch_workflow(self, runner_conf: DLTMetaRunnerConf):
created_job = self.create_workflow_spec(runner_conf)
self.open_job_url(runner_conf, created_job)


def main():
args = process_arguments()
workspace_client = get_workspace_api_client(args["profile"])
dltmeta_afam_demo_runner = DLTMETASinkDemo(args, workspace_client, "demo")
print("initializing complete")
runner_conf = dltmeta_afam_demo_runner.init_runner_conf()
dltmeta_afam_demo_runner.run(runner_conf)


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions demo/notebooks/dlt_sink_runners/init_dlt_meta_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Databricks notebook source
dlt_meta_whl = spark.conf.get("dlt_meta_whl")
%pip install $dlt_meta_whl # noqa : E999

# COMMAND ----------

layer = spark.conf.get("layer", None)

from src.dataflow_pipeline import DataflowPipeline
DataflowPipeline.invoke_dlt_pipeline(spark, layer)
46 changes: 46 additions & 0 deletions demo/notebooks/dlt_sink_runners/publish_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Databricks notebook source
# DBTITLE 1,Install kafka python lib
# MAGIC %pip install kafka-python

# COMMAND ----------

dbutils.library.restartPython()


# COMMAND ----------

# DBTITLE 1,Extract input from notebook params
dbutils.widgets.text("kafka_source_topic", "kafka_source_topic", "")
dbutils.widgets.text("kafka_source_servers_secrets_scope_name", "kafka_source_servers_secrets_scope_name", "")
dbutils.widgets.text("kafka_source_servers_secrets_scope_key", "kafka_source_servers_secrets_scope_key", "")
dbutils.widgets.text("kafka_input_data", "kafka_input_data", "")
kafka_source_topic = dbutils.widgets.get("kafka_source_topic")
kafka_source_servers_secrets_scope_name = dbutils.widgets.get("kafka_source_servers_secrets_scope_name")
kafka_source_servers_secrets_scope_key = dbutils.widgets.get("kafka_source_servers_secrets_scope_key")
kafka_input_data = dbutils.widgets.get("kafka_input_data")

print(f"kafka_source_topic: {kafka_source_topic}, kafka_source_servers_secrets_scope_name: {kafka_source_servers_secrets_scope_name}, kafka_source_servers_secrets_scope_key: {kafka_source_servers_secrets_scope_key}, kafka_input_data: {kafka_input_data}")

# COMMAND ----------

# DBTITLE 1,Initialize kafka producer
from kafka import KafkaProducer
import json
kafka_bootstrap_servers = dbutils.secrets.get(f"{kafka_source_servers_secrets_scope_name}", f"{kafka_source_servers_secrets_scope_key}")
for char in kafka_bootstrap_servers:
print(char, end = ' ')
producer = KafkaProducer(
bootstrap_servers=f"{kafka_bootstrap_servers}",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)

# COMMAND ----------

# DBTITLE 1,Send Messages
with open(f"{kafka_input_data}") as f:
data = json.load(f)

for event in data:
producer.send(kafka_source_topic, event)

producer.close()
31 changes: 31 additions & 0 deletions demo/notebooks/dlt_sink_runners/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Databricks notebook source
import pandas as pd

run_id = dbutils.widgets.get("run_id")
uc_catalog_name = dbutils.widgets.get("uc_catalog_name")
output_file_path = dbutils.widgets.get("output_file_path")
bronze_schema = dbutils.widgets.get("bronze_schema")
log_list = []

# Assumption is that to get to this notebook Bronze and Silver completed successfully
log_list.append("Completed Bronze Eventhub DLT Pipeline.")

TABLES = {
f"{uc_catalog_name}.{bronze_schema}.bronze_{run_id}_iot": 20,
f"{uc_catalog_name}.{bronze_schema}.bronze_{run_id}_iot_quarantine": 2,
}

log_list.append("Validating DLT EVenthub Bronze Table Counts...")
for table, counts in TABLES.items():
query = spark.sql(f"SELECT count(*) as cnt FROM {table}")
cnt = query.collect()[0].cnt

log_list.append(f"Validating Counts for Table {table}.")
try:
assert int(cnt) >= counts
log_list.append(f"Expected >= {counts} Actual: {cnt}. Passed!")
except AssertionError:
log_list.append(f"Expected {counts} Actual: {cnt}. Failed!")

pd_df = pd.DataFrame(log_list)
pd_df.to_csv(output_file_path)
1 change: 1 addition & 0 deletions demo/resources/ddl/eventhub_iot_schema.ddl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT
1 change: 1 addition & 0 deletions docs/content/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ In practice, a single generic DLT pipeline reads the Dataflowspec and uses it to
| Liquid cluster support | Bronze, Bronze Quarantine, Silver tables|
| [DLT-META CLI](https://databrickslabs.github.io/dlt-meta/getting_started/dltmeta_cli/) | ```databricks labs dlt-meta onboard```, ```databricks labs dlt-meta deploy``` |
| Bronze and Silver pipeline chaining | Deploy dlt-meta pipeline with ```layer=bronze_silver``` option using Direct publishing mode |
| [DLT Sinks](https://docs.databricks.com/aws/en/delta-live-tables/dlt-sinks) | Supported formats:external ```delta table```, ```kafka```.Bronze, Silver layers|
## How much does it cost ?
DLT-META does not have any **direct cost** associated with it other than the cost to run the Databricks Delta Live Tables
on your environment.The overall cost will be determined primarily by the [Databricks Delta Live Tables Pricing] (https://databricks.com/product/delta-live-tables-pricing-azure)
Expand Down
6 changes: 5 additions & 1 deletion docs/content/getting_started/metadatapreperation.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The `onboarding.json` file contains links to [silver_transformations.json](https
| bronze_apply_changes_from_snapshot | Bronze apply changes from snapshot Json e.g. Mandatory fields: keys=["userId"], scd_type=`1` or `2` optional fields: track_history_column_list=`[col1]`, track_history_except_column_list=`[col2]` |
| bronze_table_path_{env} | Bronze table storage path.|
| bronze_table_properties | DLT table properties map. e.g. `{"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false" }` |
| bronze_sink | DLT Sink API properties: e.g Delta: `{"name": "bronze_sink","format": "delta","options": {"tableName": "my_catalog.my_schema.my_table"}}`, Kafka:`{"name": "bronze_sink","format": "kafka","options": { "kafka.bootstrap.servers": "host:port","subscribe": "my_topic"}}` |
| bronze_data_quality_expectations_json | Bronze table data quality expectations |
| bronze_database_quarantine_{env} | Bronze database for quarantine data which fails expectations. |
| bronze_quarantine_table Bronze | Table for quarantine data which fails expectations |
Expand All @@ -53,9 +54,12 @@ The `onboarding.json` file contains links to [silver_transformations.json](https
| silver_cdc_apply_changes | Silver cdc apply changes Json |
| silver_table_path_{env} | Silver table storage path. |
| silver_table_properties | DLT table properties map. e.g. `{"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false"}` |
| silver_sink | DLT Sink API properties: e.g Delta:`{"name": "silver_sink","format": "delta","options": {"tableName": "my_catalog.my_schema.my_table"}}`, Kafka:`{"name": "silver_sink","format": "kafka","options": { "kafka.bootstrap.servers": "host:port","subscribe": "my_topic"}}`|
| silver_transformation_json | Silver table sql transformation json path |
| silver_data_quality_expectations_json_{env} | Silver table data quality expectations json file path
| silver_append_flows | Silver table append flows json. e.g.`"silver_append_flows":[{"name":"customer_bronze_flow", "create_streaming_table": false,"source_format": "cloudFiles", "source_details": {"source_database": "APP","source_table":"CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"},"reader_options": {"cloudFiles.format": "json","cloudFiles.inferColumnTypes": "true","cloudFiles.rescuedDataColumn": "_rescued_data"},"once": true}]`
| silver_append_flows | Silver table append flows json. e.g.`"silver_append_flows":[{"name":"customer_bronze_flow",
"create_streaming_table": false,"source_format": "cloudFiles", "source_details": {"source_database": "APP","source_table":"CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"},"reader_options": {"cloudFiles.format": "json","cloudFiles.inferColumnTypes": "true","cloudFiles.rescuedDataColumn": "_rescued_data"},"once": true}]`|



### Data Quality Rules File Structure([Examples](https://github.com/databrickslabs/dlt-meta/tree/main/examples/dqe))
Expand Down
Binary file added docs/static/images/dlt_delta_sink.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/static/images/dlt_demo_sink.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/static/images/dlt_kafka_sink.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading