Skip to content
16 changes: 10 additions & 6 deletions demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,17 @@ This demo will perform following tasks:

# Apply Changes From Snapshot Demo
- This demo will perform following steps
- Showcase onboarding process for apply changes from snapshot pattern
- Showcase onboarding process for apply changes from snapshot pattern([snapshot-onboarding.template](https://github.com/databrickslabs/dlt-meta/blob/main/demo/conf/snapshot-onboarding.template))
- Run onboarding for the bronze stores and products tables, which contains data snapshot data in csv files.
- Run Bronze DLT to load initial snapshot (LOAD_1.csv)
- Upload incremental snapshot LOAD_2.csv version=2 for stores and product
- Run Bronze DLT to load incremental snapshot (LOAD_2.csv). Stores is scd_type=2 so updated records will expired and added new records with version_number. Products is scd_type=1 so in case records missing for scd_type=1 will be deleted.
- Upload incremental snapshot LOAD_3.csv version=3 for stores and product
- Run Bronze DLT to load incremental snapshot (LOAD_3.csv). Stores is scd_type=2 so updated records will expired and added new records with version_number. Products is scd_type=1 so in case records missing for scd_type=1 will be deleted.
- Create source delta table for products
- Run Bronze DLT to load initial snapshot for stores(LOAD_1.csv) and products delta table
- Run Silver DLT to ingest bronze data using apply_changes_from_snapshot API
- Upload incremental snapshot LOAD_2.csv version=2 for stores and load products delta table for next snapshot
- Run Bronze DLT to load incremental snapshot (LOAD_2.csv). Products is scd_type=2 so updated records will expired and added new records with version_number. Stores is scd_type=1 so in case records missing for scd_type=1 will be deleted.
- Run Silver DLT to ingest bronze data using apply_changes_from_snapshot API
- Upload incremental snapshot LOAD_3.csv version=2 for stores and load products delta table for next snapshot
- Run Bronze DLT to load incremental snapshot (LOAD_2.csv). Products is scd_type=2 so updated records will expired and added new records with version_number. Stores is scd_type=1 so in case records missing for scd_type=1 will be deleted.
- Run Silver DLT to ingest bronze data using apply_changes_from_snapshot API
### Steps:
1. Launch Command Prompt

Expand Down
291 changes: 291 additions & 0 deletions demo/conf/dais_onboarding.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
[
{{
"data_flow_id": "100",
"data_flow_group": "A1",
"source_system": "mysql",
"source_format": "cloudFiles",
"source_details": {{
"source_database": "customers",
"source_table": "customers",
"source_path_demo": "{{uc_volume_path}}/demo/resources/data/customers",
"source_schema_path": "{{uc_volume_path}}/demo/resources/ddl/customers.ddl"
}},
"bronze_catalog_demo": "{{uc_catalog_name}}",
"bronze_database_demo": "{{bronze_schema}}",
"bronze_table": "customers",
"bronze_table_comment": "customers bronze table",
"bronze_reader_options": {{
"cloudFiles.format": "csv",
"cloudFiles.rescuedDataColumn": "_rescued_data",
"header": "true"
}},
"bronze_cluster_by":["customer_id"],
"bronze_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/customers.json",
"bronze_catalog_quarantine_demo": "{{uc_catalog_name}}",
"bronze_database_quarantine_demo": "{{bronze_schema}}",
"bronze_quarantine_table": "customers_quarantine",
"bronze_quarantine_table_comment": "customers quarantine table",
"silver_catalog_demo": "{{uc_catalog_name}}",
"silver_database_demo": "{{silver_schema}}",
"silver_table": "customers",
"silver_table_comment": "customers silver table",
"silver_cdc_apply_changes": {{
"keys": [
"customer_id"
],
"sequence_by": "dmsTimestamp",
"scd_type": "2",
"apply_as_deletes": "Op = 'D'",
"except_column_list": [
"Op",
"dmsTimestamp",
"_rescued_data"
]
}},
"silver_cluster_by":["customer_id"],
"silver_transformation_json_demo": "{{uc_volume_path}}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/customers_silver_dqe.json"

}},
{{
"data_flow_id": "101",
"data_flow_group": "A1",
"source_system": "mysql",
"source_format": "cloudFiles",
"source_details": {{
"source_database": "transactions",
"source_table": "transactions",
"source_path_demo": "{{uc_volume_path}}/demo/resources/data/transactions",
"source_schema_path": "{{uc_volume_path}}/demo/resources/ddl/transactions.ddl"
}},
"bronze_catalog_demo": "{{uc_catalog_name}}",
"bronze_database_demo": "{{bronze_schema}}",
"bronze_table": "transactions",
"bronze_table_comment": "transactions bronze table",
"bronze_reader_options": {{
"cloudFiles.format": "csv",
"cloudFiles.rescuedDataColumn": "_rescued_data",
"header": "true"
}},
"bronze_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/transactions.json",
"bronze_database_quarantine_demo": "{{uc_catalog_name}}.{{bronze_schema}}",
"bronze_quarantine_table": "transactions_quarantine",
"bronze_quarantine_table_comment": "transactions bronze quarantine table",
"silver_catalog_demo": "{{uc_catalog_name}}",
"silver_database_demo": "{{silver_schema}}",
"silver_table": "transactions",
"silver_table_comment": "transactions silver table",
"silver_cdc_apply_changes": {{
"keys": [
"transaction_id"
],
"sequence_by": "dmsTimestamp",
"scd_type": "2",
"apply_as_deletes": "Op = 'D'",
"except_column_list": [
"Op",
"dmsTimestamp",
"_rescued_data"
]
}},
"silver_transformation_json_demo": "{{uc_volume_path}}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/transactions_silver_dqe.json"
}},
{{
"data_flow_id": "103",
"data_flow_group": "A1",
"source_system": "mysql",
"source_format": "cloudFiles",
"source_details": {{
"source_database": "products",
"source_table": "products",
"source_path_demo": "{{uc_volume_path}}/demo/resources/data/products",
"source_schema_path": "{{uc_volume_path}}/demo/resources/ddl/products.ddl"
}},
"bronze_catalog_demo": "{{uc_catalog_name}}",
"bronze_database_demo": "{{bronze_schema}}",
"bronze_table": "products",
"bronze_table_comment": "products bronze table",
"bronze_reader_options": {{
"cloudFiles.format": "csv",
"cloudFiles.rescuedDataColumn": "_rescued_data",
"header": "true"
}},
"bronze_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/products.json",
"bronze_database_quarantine_demo": "{{uc_catalog_name}}.{{bronze_schema}}",
"bronze_quarantine_table": "products_quarantine",
"bronze_quarantine_table_comment": "products quarantine bronze table",
"silver_catalog_demo": "{{uc_catalog_name}}",
"silver_database_demo": "{{silver_schema}}",
"silver_table": "products",
"silver_table_comment": "products silver table",
"silver_cdc_apply_changes": {{
"keys": [
"product_id"
],
"sequence_by": "dmsTimestamp",
"scd_type": "2",
"apply_as_deletes": "Op = 'D'",
"except_column_list": [
"Op",
"dmsTimestamp",
"_rescued_data"
]
}},
"silver_transformation_json_demo": "{{uc_volume_path}}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/products_silver_dqe.json"
}},
{{
"data_flow_id": "104",
"data_flow_group": "A1",
"source_system": "mysql",
"source_format": "cloudFiles",
"source_details": {{
"source_database": "stores",
"source_table": "stores",
"source_path_demo": "{{uc_volume_path}}/demo/resources/data/stores",
"source_schema_path": "{{uc_volume_path}}/demo/resources/ddl/stores.ddl"
}},
"bronze_catalog_demo": "{{uc_catalog_name}}",
"bronze_database_demo": "{{bronze_schema}}",
"bronze_table": "stores",
"bronze_table_comment": "stores bronze table",
"bronze_reader_options": {{
"cloudFiles.format": "csv",
"cloudFiles.rescuedDataColumn": "_rescued_data",
"header": "true"
}},
"bronze_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/stores.json",
"bronze_catalog_quarantine_demo": "{{uc_catalog_name}}",
"bronze_database_quarantine_demo": "{{bronze_schema}}",
"bronze_quarantine_table": "stores_quarantine",
"bronze_quarantine_table_comment": "stores quarantine bronze table",
"silver_catalog_demo": "{{uc_catalog_name}}",
"silver_database_demo": "{{silver_schema}}",
"silver_table": "stores",
"silver_table_comment": "stores silver table",
"silver_cdc_apply_changes": {{
"keys": [
"store_id"
],
"sequence_by": "dmsTimestamp",
"scd_type": "2",
"apply_as_deletes": "Op = 'D'",
"except_column_list": [
"Op",
"dmsTimestamp",
"_rescued_data"
]
}},
"silver_transformation_json_demo": "{{uc_volume_path}}/demo/conf/silver_transformations.json",
"silver_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/stores_silver_dqe.json"
}},

{{
"data_flow_id": "105",
"data_flow_group": "A1",
"source_system": "Sensor Device",
"source_format": "eventhub",
"source_details": {{
"source_schema_path": "{{uc_volume_path}}/demo/resources/data/afam/ddl/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "{{eventhub_accesskey_name}}",
"eventhub.name": "{{eventhub_name}}",
"eventhub.accessKeySecretName": "{{eventhub_accesskey_secret_name}}",
"eventhub.secretsScopeName": "{{eventhub_secrets_scope_name}}",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"eventhub.namespace": "{{eventhub_nmspace}}",
"eventhub.port": "{{eventhub_port}}"
}},
"bronze_reader_options": {{
"maxOffsetsPerTrigger": "50000",
"startingOffsets": "earliest",
"failOnDataLoss": "false",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "60000"
}},
"bronze_catalog_demo": "{{uc_catalog_name}}",
"bronze_database_demo": "{{bronze_schema}}",
"bronze_table": "bronze_{{run_id}}_iot",
"bronze_partition_columns": "date",
"bronze_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/iot/bronze_data_quality_expectations.json",
"bronze_catalog_quarantine_demo": "{{uc_catalog_name}}",
"bronze_database_quarantine_demo": "{{bronze_schema}}",
"bronze_quarantine_table": "bronze_{{run_id}}_iot_quarantine",
"bronze_append_flows": [
{{
"name": "io_bronze_eventhub_append_flow",
"create_streaming_table": false,
"source_format": "eventhub",
"source_details": {{
"source_schema_path": "{{uc_volume_path}}/demo/resources/data/afam/ddl/eventhub_iot_schema.ddl",
"eventhub.accessKeyName": "{{eventhub_accesskey_name}}",
"eventhub.name": "{{eventhub_name_append_flow}}",
"eventhub.accessKeySecretName": "{{eventhub_accesskey_secret_name}}",
"eventhub.secretsScopeName": "{{eventhub_secrets_scope_name}}",
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"eventhub.namespace": "{{eventhub_nmspace}}",
"eventhub.port": "{{eventhub_port}}"
}},
"reader_options": {{
"maxOffsetsPerTrigger": "50000",
"startingOffsets": "earliest",
"failOnDataLoss": "false",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "60000"
}},
"once": false
}}
]
}},
{{
"data_flow_id": "106",
"data_flow_group": "A1",
"source_system": "Sensor Device",
"source_format": "kafka",
"source_details": {{
"source_schema_path": "{{uc_volume_path}}/demo/resources/ddl/eventhub_iot_schema.ddl",
"subscribe": "{{kafka_source_topic}}",
"kafka.security.protocol": "PLAINTEXT",
"kafka_source_servers_secrets_scope_name":"{{kafka_source_servers_secrets_scope_name}}",
"kafka_source_servers_secrets_scope_key":"{{kafka_source_servers_secrets_scope_key}}"
}},
"bronze_reader_options": {{
"startingOffsets": "earliest",
"kafka.request.timeout.ms": "60000",
"kafka.session.timeout.ms": "60000"
}},
"bronze_catalog_demo": "{{uc_catalog_name}}",
"bronze_database_demo": "{{bronze_schema}}",
"bronze_table": "bronze_{{run_id}}_iot",
"bronze_partition_columns": "date",
"bronze_data_quality_expectations_json_demo": "{{uc_volume_path}}/demo/conf/dqe/iot/bronze_data_quality_expectations.json",
"bronze_catalog_quarantine_demo": "{{uc_catalog_name}}",
"bronze_database_quarantine_demo": "{{bronze_schema}}",
"bronze_quarantine_table": "bronze_{{run_id}}_iot_quarantine",
"bronze_sinks": [
{{
"name": "bronze_customer_kafka_sink",
"format": "kafka",
"options": {{
"kafka_sink_servers_secret_scope_name":"{{kafka_sink_servers_secret_scope_name}}",
"kafka_sink_servers_secret_scope_key":"{{kafka_sink_servers_secret_scope_key}}",
"kafka.security.protocol":"PLAINTEXT",
"topic":"{{kafka_sink_topic}}"
}},
"select_exp":["value"],
"where_clause":"value is not null"
}},
{{
"name": "bronze_customer_delta_sink",
"format": "delta",
"options": {{
"path":"dbfs:/mnt/dltmeta_sink/iot"
}},
"select_exp":["value"],
"where_clause":"value is not null"
}}
]
}}
]
23 changes: 23 additions & 0 deletions demo/conf/silver_transformations_snapshot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[
{
"target_table": "products",
"select_exp": [
"product_id",
"name",
"price",
"dmsTimestamp"

],
"where_clause": [
"__END_AT IS NULL"
]
},
{
"target_table": "stores",
"select_exp": [
"store_id",
"address",
"dmsTimestamp"
]
}
]
Loading
Loading