Skip to content
Merged
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,13 @@ deployment-merged.yaml
#IDE
.idea/
.vscode/

.databricks
.databricks-login.json
demo/conf/onboarding.json
integration_tests/conf/onboarding*.json
demo/conf/onboarding*.json
databricks.yaml
integration_test_output*.csv
databricks.yml

.databricks
databricks.yaml
23 changes: 13 additions & 10 deletions demo/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# [DLT-META](https://github.com/databrickslabs/dlt-meta) DEMO's
# [DLT-META](https://github.com/databrickslabs/dlt-meta) DEMOs
1. [DAIS 2023 DEMO](#dais-2023-demo): Showcases DLT-META's capabilities of creating Bronze and Silver DLT pipelines with initial and incremental mode automatically.
2. [Databricks Techsummit Demo](#databricks-tech-summit-fy2024-demo): 100s of data sources ingestion in bronze and silver DLT pipelines automatically.
3. [Append FLOW Autoloader Demo](#append-flow-autoloader-file-metadata-demo): Write to same target from multiple sources using [dlt.append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#append-flows) and adding [File metadata column](https://docs.databricks.com/en/ingestion/file-metadata-column.html)
4. [Append FLOW Eventhub Demo](#append-flow-eventhub-demo): Write to same target from multiple sources using [dlt.append_flow](https://docs.databricks.com/en/delta-live-tables/flows.html#append-flows) and adding [File metadata column](https://docs.databricks.com/en/ingestion/file-metadata-column.html)
5. [Silver Fanout Demo](#silver-fanout-demo): This demo showcases the implementation of fanout architecture in the silver layer.
6. [Apply Changes From Snapshot Demo](#Apply-changes-from-snapshot-demo): This demo showcases the implementation of ingesting from snapshots in bronze layer

The source argument is optional for the demos.


# DAIS 2023 DEMO
Expand Down Expand Up @@ -38,7 +39,7 @@ This Demo launches Bronze and Silver DLT pipelines with following activities:
```

6. ```commandline
python demo/launch_dais_demo.py --uc_catalog_name=<<uc catalog name>>
python demo/launch_dais_demo.py --uc_catalog_name=<<uc catalog name>> --profile=<<DEFAULT>>
```
- uc_catalog_name : Unity catalog name
- you can provide `--profile=databricks_profile name` in case you already have databricks cli otherwise command prompt will ask host and token.
Expand Down Expand Up @@ -70,7 +71,7 @@ This demo will launch auto generated tables(100s) inside single bronze and silve
```

6. ```commandline
python demo/launch_techsummit_demo.py --uc_catalog_name=<<uc catalog name>>
python demo/launch_techsummit_demo.py --uc_catalog_name=<<uc catalog name>> --profile=<<DEFAULT>>
```
- uc_catalog_name : Unity catalog name
- you can provide `--profile=databricks_profile name` in case you already have databricks cli otherwise command prompt will ask host and token
Expand Down Expand Up @@ -106,10 +107,9 @@ This demo will perform following tasks:
```

6. ```commandline
python demo/launch_af_cloudfiles_demo.py --uc_catalog_name=<<uc catalog name>> --source=cloudfiles --cloud_provider_name=aws --profile=<<DEFAULT>>
python demo/launch_af_cloudfiles_demo.py --uc_catalog_name=<<uc catalog name>> --source=cloudfiles --profile=<<DEFAULT>>
```
- uc_catalog_name : Unity Catalog name
- cloud_provier_name : Which cloud you are using, either AWS, Azure, or GCP
- you can provide `--profile=databricks_profile name` in case you already have databricks cli otherwise command prompt will ask host and token

![af_am_demo.png](../docs/static/images/af_am_demo.png)
Expand Down Expand Up @@ -163,7 +163,7 @@ This demo will perform following tasks:
- eventhub_port: Eventhub port

7. ```commandline
python3 demo/launch_af_eventhub_demo.py --uc_catalog_name=<<uc catalog name>> --eventhub_name=dltmeta_demo --eventhub_name_append_flow=dltmeta_demo_af --eventhub_secrets_scope_name=dltmeta_eventhub_creds --eventhub_namespace=dltmeta --eventhub_port=9093 --eventhub_producer_accesskey_name=RootManageSharedAccessKey --eventhub_consumer_accesskey_name=RootManageSharedAccessKey --eventhub_accesskey_secret_name=RootManageSharedAccessKey
python3 demo/launch_af_eventhub_demo.py --uc_catalog_name=<<uc catalog name>> --eventhub_name=dltmeta_demo --eventhub_name_append_flow=dltmeta_demo_af --eventhub_secrets_scope_name=dltmeta_eventhub_creds --eventhub_namespace=dltmeta --eventhub_port=9093 --eventhub_producer_accesskey_name=RootManageSharedAccessKey --eventhub_consumer_accesskey_name=RootManageSharedAccessKey --eventhub_accesskey_secret_name=RootManageSharedAccessKey --profile=<<DEFAULT>>
```

![af_eh_demo.png](../docs/static/images/af_eh_demo.png)
Expand Down Expand Up @@ -194,10 +194,13 @@ This demo will perform following tasks:
```
```commandline
export PYTHONPATH=$dlt_meta_home
```

6. Run the command
```commandline
python demo/launch_silver_fanout_demo.py --source=cloudfiles --uc_catalog_name=<<uc catalog name>>
```

6. Run the command ```python demo/launch_silver_fanout_demo.py --source=cloudfiles --uc_catalog_name=<<uc catalog name>> --dbr_version=15.3.x-scala2.12 --dbfs_path=dbfs:/dais-dlt-meta-silver-fanout```
- db_version : Databricks Runtime Version
- dbfs_path : Path on your Databricks workspace where demo will be copied for launching DLT-META Pipelines
- you can provide `--profile=databricks_profile name` in case you already have databricks cli otherwise command prompt will ask host and token.

- - 6a. Databricks Workspace URL:
Expand Down Expand Up @@ -251,6 +254,6 @@ This demo will perform following tasks:

6. Run the command
```commandline
python demo/launch_acfs_demo.py --uc_catalog_name=<<>>
python demo/launch_acfs_demo.py --uc_catalog_name=<<uc catalog name>> --profile=<<DEFAULT>>
```
![acfs.png](../docs/static/images/acfs.png)
Binary file removed demo/dbc/silver_fout_runners.dbc
Binary file not shown.
31 changes: 15 additions & 16 deletions demo/launch_acfs_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import uuid
import traceback
from src.install import WorkspaceInstaller
from integration_tests.run_integration_tests import (
DLTMETARunner,
Expand All @@ -15,7 +16,7 @@ def __init__(self, args, ws, base_dir):
self.ws = ws
self.wsi = WorkspaceInstaller(ws)
self.base_dir = base_dir

def run(self, runner_conf: DLTMetaRunnerConf):
"""
Runs the DLT-META Apply Changes from Snapshot Demo by calling the necessary methods in the correct order.
Expand All @@ -29,42 +30,40 @@ def run(self, runner_conf: DLTMetaRunnerConf):
self.launch_workflow(runner_conf)
except Exception as e:
print(e)
traceback.print_exc()
# finally:
# self.clean_up(runner_conf)

def init_runner_conf(self) -> DLTMetaRunnerConf:
run_id = uuid.uuid4().hex
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self.wsi._my_username,
int_tests_dir="file:./demo",
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_demo_{run_id}",
runners_nb_path=f"/Users/{self.wsi._my_username}/dlt_meta_demo/{run_id}",
runners_full_local_path="integration_tests/notebooks/snapshot_runners",
test_output_file_path=(
f"/Users/{self.wsi._my_username}/dlt_meta_demo/"
f"{run_id}/dlt_meta_acfs_demo-output.csv"
),
source="snapshot",
snapshot_template="demo/conf/snapshot-onboarding.template",
onboarding_file_path="demo/conf/onboarding.json",
env="demo"
)
runner_conf.uc_catalog_name = self.args.__dict__['uc_catalog_name']
runner_conf.runners_full_local_path = './demo/dbc/snapshot_runners.dbc'
runner_conf.uc_catalog_name = self.args['uc_catalog_name']
return runner_conf

def launch_workflow(self, runner_conf: DLTMetaRunnerConf):
created_job = self.create_snapshot_workflow_spec(runner_conf)
created_job = self.create_workflow_spec(runner_conf)
self.open_job_url(runner_conf, created_job)


acfs_args_map = {
"--profile": "provide databricks cli profile name, if not provide databricks_host and token",
"--uc_catalog_name": "provide databricks uc_catalog name, this is required to create volume, schema, table"
}

afam_mandatory_args = [
"uc_catalog_name"]


def main():
args = process_arguments(acfs_args_map, afam_mandatory_args)
workspace_client = get_workspace_api_client(args.profile)
args = process_arguments()
workspace_client = get_workspace_api_client(args["profile"])
dltmeta_afam_demo_runner = ApplyChangesFromSnapshotDemo(args, workspace_client, "demo")
print("initializing complete")
runner_conf = dltmeta_afam_demo_runner.init_runner_conf()
Expand Down
2 changes: 2 additions & 0 deletions demo/launch_af_cloudfiles_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ def run(self, runner_conf: DLTMetaRunnerConf):
except Exception as e:
print(e)
traceback.print_exc()
# finally:
# self.clean_up(runner_conf)

def init_runner_conf(self) -> DLTMetaRunnerConf:
"""
Expand Down
52 changes: 24 additions & 28 deletions demo/launch_af_eventhub_demo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

import uuid
import traceback
from src.install import WorkspaceInstaller
from integration_tests.run_integration_tests import (
DLTMETARunner,
Expand Down Expand Up @@ -30,6 +31,9 @@ def run(self, runner_conf: DLTMetaRunnerConf):
self.launch_workflow(runner_conf)
except Exception as e:
print(e)
traceback.print_exc()
# finally:
# self.clean_up(runner_conf)

def init_runner_conf(self) -> DLTMetaRunnerConf:
"""
Expand All @@ -44,49 +48,41 @@ def init_runner_conf(self) -> DLTMetaRunnerConf:
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self.wsi._my_username,
int_tests_dir="file:./demo",
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_demo_{run_id}",
runners_nb_path=f"/Users/{self.wsi._my_username}/dlt_meta_demo/{run_id}",
source="eventhub",
eventhub_template="demo/conf/eventhub-onboarding.template",
onboarding_file_path="demo/conf/onboarding.json",
env="demo"
env="demo",
# eventhub provided args
eventhub_name=self.args["eventhub_name"],
eventhub_name_append_flow=self.args["eventhub_name_append_flow"],
eventhub_producer_accesskey_name=self.args[
"eventhub_consumer_accesskey_name"
],
eventhub_consumer_accesskey_name=self.args[
"eventhub_consumer_accesskey_name"
],
eventhub_accesskey_secret_name=self.args["eventhub_accesskey_secret_name"],
eventhub_secrets_scope_name=self.args["eventhub_secrets_scope_name"],
eventhub_namespace=self.args["eventhub_namespace"],
eventhub_port=self.args["eventhub_port"]
)
runner_conf.uc_catalog_name = self.args.__dict__['uc_catalog_name']
runner_conf.runners_full_local_path = './demo/dbc/afam_eventhub_runners.dbc'
runner_conf.uc_catalog_name = self.args['uc_catalog_name']
runner_conf.runners_full_local_path = 'demo/notebooks/afam_eventhub_runners'
return runner_conf

def launch_workflow(self, runner_conf: DLTMetaRunnerConf):
created_job = self.create_eventhub_workflow_spec(runner_conf)
created_job = self.create_workflow_spec(runner_conf)
self.open_job_url(runner_conf, created_job)
return created_job


afam_args_map = {
"--profile": "provide databricks cli profile name, if not provide databricks_host and token",
"--uc_catalog_name": "provide databricks uc_catalog name, this is required to create volume, schema, table",
"--eventhub_name": "Provide eventhub_name e.g --eventhub_name=iot",
"--eventhub_name_append_flow": "Provide eventhub_name_append_flow e.g --eventhub_name_append_flow=iot_af",
"--eventhub_producer_accesskey_name": "Provide access key that has write permission on the eventhub",
"--eventhub_consumer_accesskey_name": "Provide access key that has read permission on the eventhub",
"--eventhub_secrets_scope_name": "Provide eventhub_secrets_scope_name e.g \
--eventhub_secrets_scope_name=eventhubs_creds",
"--eventhub_accesskey_secret_name": "Provide eventhub_accesskey_secret_name e.g \
-eventhub_accesskey_secret_name=RootManageSharedAccessKey",
"--eventhub_namespace": "Provide eventhub_namespace e.g --eventhub_namespace=topic-standard",
"--eventhub_port": "Provide eventhub_port e.g --eventhub_port=9093",
}

afeh_mandatory_args = ["uc_catalog_name", "eventhub_name",
"eventhub_name_append_flow", "eventhub_producer_accesskey_name",
"eventhub_consumer_accesskey_name", "eventhub_secrets_scope_name",
"eventhub_namespace", "eventhub_port"]


def main():
args = process_arguments(afam_args_map, afeh_mandatory_args)
workspace_client = get_workspace_api_client(args.profile)
args = process_arguments()
workspace_client = get_workspace_api_client(args['profile'])
dltmeta_afam_demo_runner = DLTMETAFEHDemo(args, workspace_client, "demo")
print("initializing complete")
runner_conf = dltmeta_afam_demo_runner.init_runner_conf()
Expand Down
48 changes: 20 additions & 28 deletions demo/launch_dais_demo.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import uuid
import traceback
from databricks.sdk.service import jobs, compute
from src.install import WorkspaceInstaller
from src.__about__ import __version__
from integration_tests.run_integration_tests import (
DLTMETARunner,
DLTMetaRunnerConf,
get_workspace_api_client,
process_arguments,
cloud_node_type_id_dict
process_arguments
)


Expand Down Expand Up @@ -37,21 +37,22 @@ def init_runner_conf(self) -> DLTMetaRunnerConf:
runner_conf = DLTMetaRunnerConf(
run_id=run_id,
username=self._my_username(self.ws),
int_tests_dir="file:./demo",
int_tests_dir="demo",
dlt_meta_schema=f"dlt_meta_dataflowspecs_demo_{run_id}",
bronze_schema=f"dlt_meta_bronze_dais_demo_{run_id}",
silver_schema=f"dlt_meta_silver_dais_demo_{run_id}",
runners_nb_path=f"/Users/{self._my_username(self.ws)}/dlt_meta_dais_demo/{run_id}",
runners_nb_path=f"/Users/{self.wsi._my_username}/dlt_meta_dais_demo/{run_id}",
runners_full_local_path="demo/notebooks/dais_runners",
# node_type_id=cloud_node_type_id_dict[self.args.__dict__['cloud_provider_name']],
# dbr_version=self.args.__dict__['dbr_version'],
cloudfiles_template="demo/conf/onboarding.template",
env="prod",
source="cloudFiles",
runners_full_local_path='./demo/dbc/dais_dlt_meta_runners.dbc',
source="cloudfiles",
# runners_full_local_path='./demo/dbc/dais_dlt_meta_runners.dbc',
onboarding_file_path='demo/conf/onboarding.json'
)
if self.args.__dict__['uc_catalog_name']:
runner_conf.uc_catalog_name = self.args.__dict__['uc_catalog_name']
if self.args['uc_catalog_name']:
runner_conf.uc_catalog_name = self.args['uc_catalog_name']
runner_conf.uc_volume_name = f"{runner_conf.uc_catalog_name}_dais_demo_{run_id}"

return runner_conf
Expand All @@ -69,6 +70,7 @@ def run(self, runner_conf: DLTMetaRunnerConf):
self.launch_workflow(runner_conf)
except Exception as e:
print(e)
traceback.print_exc()
# finally:
# self.clean_up(runner_conf)

Expand All @@ -92,14 +94,13 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
Returns:
- created_job: created job object
"""
database, dlt_lib = self.init_db_dltlib(runner_conf)
dltmeta_environments = [
jobs.JobEnvironment(
environment_key="dlt_meta_dais_demo_env",
spec=compute.Environment(client=f"dlt_meta_int_test_{__version__}",
# dependencies=[f"dlt_meta=={__version__}"],
dependencies=["dlt_meta==0.0.8"],
)
environment_key="dl_meta_int_env",
spec=compute.Environment(
client=f"dlt_meta_int_test_{__version__}",
dependencies=[runner_conf.remote_whl_path],
),
)
]
return self.ws.jobs.create(
Expand All @@ -109,14 +110,14 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
jobs.Task(
task_key="setup_dlt_meta_pipeline_spec",
description="test",
environment_key="dlt_meta_dais_demo_env",
environment_key="dl_meta_int_env",
timeout_seconds=0,
python_wheel_task=jobs.PythonWheelTask(
package_name="dlt_meta",
entry_point="run",
named_parameters={
"onboard_layer": "bronze_silver",
"database": database,
"database": f"{runner_conf.uc_catalog_name}.{runner_conf.dlt_meta_schema}",
"onboarding_file_path": f"{runner_conf.uc_volume_path}/demo/conf/onboarding.json",
"silver_dataflowspec_table": "silver_dataflowspec_cdc",
"silver_dataflowspec_path": (
Expand Down Expand Up @@ -153,7 +154,7 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
description="Load Incremental Data",
depends_on=[jobs.TaskDependency(task_key="silver_initial_run")],
notebook_task=jobs.NotebookTask(
notebook_path=f"{runner_conf.runners_nb_path}/runners/load_incremental_data",
notebook_path=f"{runner_conf.runners_nb_path}/runners/load_incremental_data.py",
base_parameters={
"dbfs_tmp_path": runner_conf.uc_volume_path
}
Expand All @@ -178,19 +179,10 @@ def create_daisdemo_workflow(self, runner_conf: DLTMetaRunnerConf):
)


dais_args_map = {"--profile": "provide databricks cli profile name, if not provide databricks_host and token",
"--uc_catalog_name": "provide databricks uc_catalog name, \
this is required to create volume, schema, table",
"--cloud_provider_name": "provide cloud provider name. Supported values are aws , azure , gcp"
}

dais_mandatory_args = ["uc_catalog_name", "cloud_provider_name"]


def main():
"""Entry method to run integration tests."""
args = process_arguments(dais_args_map, dais_mandatory_args)
workspace_client = get_workspace_api_client(args.profile)
args = process_arguments()
workspace_client = get_workspace_api_client(args['profile'])
dltmeta_dais_demo_runner = DLTMETADAISDemo(args, workspace_client, "demo")
runner_conf = dltmeta_dais_demo_runner.init_runner_conf()
dltmeta_dais_demo_runner.run(runner_conf)
Expand Down
Loading
Loading