Skip to content
Merged
13 changes: 12 additions & 1 deletion integration_tests/conf/cloudfiles-onboarding.template
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@
"pipelines.autoOptimize.zOrderCols": "id, email"
},
"silver_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/customers/silver_data_quality_expectations.json",
"silver_catalog_quarantine_it":"{uc_catalog_name}",
"silver_database_quarantine_it":"{silver_schema}",
"silver_quarantine_table":"customers_quarantine",
"silver_quarantine_table_properties": {
"pipelines.reset.allowed": "false",
"pipelines.autoOptimize.zOrderCols": "id, email"
},
"silver_append_flows": [
{
"name": "customers_silver_flow",
Expand Down Expand Up @@ -164,6 +171,10 @@
"silver_table_properties": {
"pipelines.reset.allowed": "false",
"pipelines.autoOptimize.zOrderCols": "id, customer_id"
}
},
"silver_catalog_quarantine_it":"{uc_catalog_name}",
"silver_database_quarantine_it":"{silver_schema}",
"silver_quarantine_table":"transactions_quarantine",
"silver_quarantine_cluster_by":["id","customer_id"]
}
]
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"expect_all_or_drop": {
"valid_customer_id": "id IS NOT NULL"
},
"expect_or_quarantine": {
"valid_operation": "operation IN ('DELETE')"
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"expect_all_or_drop": {
"valid_customer_id": "id IS NOT NULL"
},
"expect_or_quarantine": {
"valid_operation": "operation IN ('DELETE')"
}
}
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
with open("README.md", "r") as fh:
long_description = fh.read()

INSTALL_REQUIRES = ["setuptools", "databricks-sdk", "pyspark==3.5.5"]
INSTALL_REQUIRES = ["setuptools", "databricks-sdk"]

DEV_REQUIREMENTS = [
"flake8==6.0",
"delta-spark==3.2.1",
"pyspark==3.5.5"
"delta-spark==3.2.1"
]

IT_REQUIREMENTS = ["typer[all]==0.6.1"]
Expand Down
414 changes: 267 additions & 147 deletions src/dataflow_pipeline.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions src/dataflow_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class SilverDataflowSpec:
cdcApplyChanges: str
applyChangesFromSnapshot: str
dataQualityExpectations: str
quarantineTargetDetails: map
quarantineTableProperties: map
quarantineClusterBy: list
appendFlows: str
appendFlowsSchemas: map
version: str
Expand Down Expand Up @@ -188,6 +191,9 @@ class DataflowSpecUtils:
]
additional_silver_df_columns = [
"dataQualityExpectations",
"quarantineTargetDetails",
"quarantineTableProperties",
"quarantineClusterBy",
"appendFlows",
"appendFlowsSchemas",
"clusterBy",
Expand Down
62 changes: 41 additions & 21 deletions src/onboard_dataflowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env):
)
if onboarding_row["bronze_quarantine_table"]:
quarantine_target_details, quarantine_table_properties = self.__get_quarantine_details(
env, onboarding_row
env, "bronze", onboarding_row
)

append_flows, append_flows_schemas = self.get_append_flows_json(
Expand Down Expand Up @@ -675,50 +675,50 @@ def __get_cluster_by_properties(self, onboarding_row, table_properties, cluster_
cluster_by = onboarding_row[cluster_key]
return cluster_by

def __get_quarantine_details(self, env, onboarding_row):
def __get_quarantine_details(self, env, layer, onboarding_row):
quarantine_table_partition_columns = ""
quarantine_target_details = {}
quarantine_table_properties = {}
quarantine_table_cluster_by = None
if (
"bronze_quarantine_table_partitions" in onboarding_row
and onboarding_row["bronze_quarantine_table_partitions"]
f"{layer}_quarantine_table_partitions" in onboarding_row
and onboarding_row[f"{layer}_quarantine_table_partitions"]
):
# Split if this is a list separated by commas
if "," in onboarding_row["bronze_quarantine_table_partitions"]:
quarantine_table_partition_columns = onboarding_row["bronze_quarantine_table_partitions"].split(",")
if "," in onboarding_row[f"{layer}_quarantine_table_partitions"]:
quarantine_table_partition_columns = onboarding_row[f"{layer}_quarantine_table_partitions"].split(",")
else:
quarantine_table_partition_columns = onboarding_row["bronze_quarantine_table_partitions"]
quarantine_table_partition_columns = onboarding_row[f"{layer}_quarantine_table_partitions"]
if (
"bronze_quarantine_table_properties" in onboarding_row
and onboarding_row["bronze_quarantine_table_properties"]
f"{layer}_quarantine_table_properties" in onboarding_row
and onboarding_row[f"{layer}_quarantine_table_properties"]
):
quarantine_table_properties = self.__delete_none(
onboarding_row["bronze_quarantine_table_properties"].asDict()
onboarding_row[f"{layer}_quarantine_table_properties"].asDict()
)

quarantine_table_cluster_by = self.__get_cluster_by_properties(onboarding_row, quarantine_table_properties,
"bronze_quarantine_table_cluster_by")
f"{layer}_quarantine_table_cluster_by")
if (
f"bronze_database_quarantine_{env}" in onboarding_row
and onboarding_row[f"bronze_database_quarantine_{env}"]
f"{layer}_database_quarantine_{env}" in onboarding_row
and onboarding_row[f"{layer}_database_quarantine_{env}"]
):
quarantine_target_details = {"database": onboarding_row[f"bronze_database_quarantine_{env}"],
"table": onboarding_row["bronze_quarantine_table"],
quarantine_target_details = {"database": onboarding_row[f"{layer}_database_quarantine_{env}"],
"table": onboarding_row[f"{layer}_quarantine_table"],
"partition_columns": quarantine_table_partition_columns,
"cluster_by": quarantine_table_cluster_by
}
quarantine_catalog = (
onboarding_row[f"bronze_catalog_quarantine_{env}"]
if f"bronze_catalog_quarantine_{env}" in onboarding_row
onboarding_row[f"{layer}_catalog_quarantine_{env}"]
if f"{layer}_catalog_quarantine_{env}" in onboarding_row
else None
)
if quarantine_catalog:
quarantine_target_details["catalog"] = quarantine_catalog
if "bronze_quarantine_table_comment" in onboarding_row:
quarantine_target_details["comment"] = onboarding_row["bronze_quarantine_table_comment"]
if not self.uc_enabled and f"bronze_quarantine_table_path_{env}" in onboarding_row:
quarantine_target_details["path"] = onboarding_row[f"bronze_quarantine_table_path_{env}"]
if "{layer}_quarantine_table_comment" in onboarding_row:
quarantine_target_details["comment"] = onboarding_row[f"{layer}_quarantine_table_comment"]
if not self.uc_enabled and f"{layer}_quarantine_table_path_{env}" in onboarding_row:
quarantine_target_details["path"] = onboarding_row[f"{layer}_quarantine_table_path_{env}"]

return quarantine_target_details, quarantine_table_properties

Expand Down Expand Up @@ -1009,6 +1009,9 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
"cdcApplyChanges",
"applyChangesFromSnapshot",
"dataQualityExpectations",
"quarantineTargetDetails",
"quarantineTableProperties",
"quarantineClusterBy",
"appendFlows",
"appendFlowsSchemas",
"clusterBy",
Expand Down Expand Up @@ -1038,6 +1041,9 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
StructField("cdcApplyChanges", StringType(), True),
StructField("applyChangesFromSnapshot", StringType(), True),
StructField("dataQualityExpectations", StringType(), True),
StructField("quarantineTargetDetails", MapType(StringType(), StringType(), True), True),
StructField("quarantineTableProperties", MapType(StringType(), StringType(), True), True),
StructField("quarantineClusterBy", ArrayType(StringType(), True), True),
StructField("appendFlows", StringType(), True),
StructField("appendFlowsSchemas", MapType(StringType(), StringType(), True), True),
StructField("clusterBy", ArrayType(StringType(), True), True),
Expand Down Expand Up @@ -1147,6 +1153,9 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
self.__delete_none(silver_cdc_apply_changes_row.asDict())
)
data_quality_expectations = None
silver_quarantine_target_details = None
silver_quarantine_table_properties = None
silver_quarantine_cluster_by = None
if f"silver_data_quality_expectations_json_{env}" in onboarding_row:
silver_data_quality_expectations_json = onboarding_row[
f"silver_data_quality_expectations_json_{env}"
Expand All @@ -1155,6 +1164,14 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
data_quality_expectations = self.__get_data_quality_expecations(
silver_data_quality_expectations_json
)
silver_quarantine_target_details, silver_quarantine_table_properties = self.__get_quarantine_details(
env, "silver", onboarding_row
)
silver_quarantine_cluster_by = self.__get_cluster_by_properties(
onboarding_row,
silver_quarantine_table_properties,
"silver_quarantine_cluster_by"
)
append_flows, append_flow_schemas = self.get_append_flows_json(
onboarding_row, layer="silver", env=env
)
Expand All @@ -1180,6 +1197,9 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
silver_cdc_apply_changes,
apply_changes_from_snapshot,
data_quality_expectations,
silver_quarantine_target_details,
silver_quarantine_table_properties,
silver_quarantine_cluster_by,
append_flows,
append_flow_schemas,
silver_cluster_by,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@
"no_rescued_data":"_rescued_data IS NULL",
"valid_id":"id IS NOT NULL",
"valid_operation":"operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
},
"expect_or_quarantine": {
"valid_operation": "operation IN ('DELETE')"
}
}
30 changes: 27 additions & 3 deletions tests/resources/onboarding.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,15 @@
"pipelines.autoOptimize.zOrderCols": "id,email"
},
"silver_transformation_json_dev": "tests/resources/silver_transformations.json",
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/customers/silver_data_quality_expectations.json"
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/customers/silver_data_quality_expectations.json",
"silver_catalog_quarantine_dev": "silver",
"silver_catalog_quarantine_staging": "silver",
"silver_catalog_quarantine_prd": "silver",
"silver_database_quarantine_dev": "silver",
"silver_database_quarantine_staging": "silver",
"silver_database_quarantine_prd": "silver",
"silver_quarantine_table": "customers_cdc_quarantine",
"silver_quarantine_table_path_dev": "tests/resources/data/silver/customers_quarantine"
},
{
"data_flow_id": "101",
Expand Down Expand Up @@ -145,7 +153,15 @@
"pipelines.reset.allowed": "false",
"pipelines.autoOptimize.zOrderCols": "id, customer_id"
},
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/silver_data_quality_expectations.json"
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/silver_data_quality_expectations.json",
"silver_catalog_quarantine_dev": "silver",
"silver_catalog_quarantine_staging": "silver",
"silver_catalog_quarantine_prd": "silver",
"silver_database_quarantine_dev": "silver",
"silver_database_quarantine_staging": "silver",
"silver_database_quarantine_prd": "silver",
"silver_quarantine_table": "transactions_cdc_quarantine",
"silver_quarantine_table_path_dev": "tests/resources/data/silver/transactions_quarantine"
},
{
"data_flow_id": "103",
Expand Down Expand Up @@ -197,6 +213,14 @@
},
"silver_table_path_dev": "tests/resources/data/silver/iot_cdc",
"silver_transformation_json_dev": "tests/resources/silver_transformations.json",
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/silver_data_quality_expectations.json"
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/silver_data_quality_expectations.json",
"silver_catalog_quarantine_dev": "silver",
"silver_catalog_quarantine_staging": "silver",
"silver_catalog_quarantine_prd": "silver",
"silver_database_quarantine_dev": "silver",
"silver_database_quarantine_staging": "silver",
"silver_database_quarantine_prd": "silver",
"silver_quarantine_table": "iot_cdc_quarantine",
"silver_quarantine_table_path_dev": "tests/resources/data/silver/iot_quarantine"
}
]
18 changes: 12 additions & 6 deletions tests/test_dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ class DataflowPipelineTests(DLTFrameworkTestCase):
"valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
}""",
"quarantineTargetDetails": {},
"quarantineTableProperties": {},
"quarantineClusterBy": [""],
"appendFlows": [],
"appendFlowsSchemas": {},
"sinks": {},
Expand Down Expand Up @@ -193,6 +196,9 @@ class DataflowPipelineTests(DLTFrameworkTestCase):
"valid_operation": "operation IN ('APPEND', 'DELETE', 'UPDATE')"
}
}""",
"quarantineTargetDetails": {},
"quarantineTableProperties": {},
"quarantineClusterBy": [""],
"appendFlows": [],
"appendFlowsSchemas": {},
"sinks": {},
Expand Down Expand Up @@ -445,9 +451,9 @@ def test_read_silver_with_where(self, get_silver_schema):
silver_df = dlt_data_flow.read_silver()
self.assertIsNotNone(silver_df)

@patch.object(DataflowPipeline, "write_bronze_with_dqe", return_value={"called"})
@patch.object(DataflowPipeline, "write_layer_with_dqe", return_value={"called"})
@patch.object(dlt, "expect_all_or_drop", return_value={"called"})
def test_broze_write_dqe(self, expect_all_or_drop, write_bronze_with_dqe):
def test_broze_write_dqe(self, expect_all_or_drop, write_layer_with_dqe):
bronze_dataflow_spec = BronzeDataflowSpec(**DataflowPipelineTests.bronze_dataflow_spec_map)
dlt_data_flow = DataflowPipeline(
self.spark,
Expand All @@ -456,7 +462,7 @@ def test_broze_write_dqe(self, expect_all_or_drop, write_bronze_with_dqe):
f"{bronze_dataflow_spec.targetDetails['table']}_inputQView",
)
dlt_data_flow.write_bronze()
assert write_bronze_with_dqe.called
assert write_layer_with_dqe.called

@patch.object(DataflowPipeline, "cdc_apply_changes", return_value={"called"})
@patch.object(dlt, "expect_all_or_drop", return_value={"called"})
Expand Down Expand Up @@ -1287,8 +1293,8 @@ def test_write_bronze_snapshot(self, mock_apply_changes_from_snapshot):
pipeline.write_bronze()
assert mock_apply_changes_from_snapshot.called

@patch.object(DataflowPipeline, 'write_bronze_with_dqe', return_value=None)
def test_write_bronze_with_dqe(self, mock_write_bronze_with_dqe):
@patch.object(DataflowPipeline, 'write_layer_with_dqe', return_value=None)
def test_write_bronze_with_dqe(self, mock_write_layer_with_dqe):
"""Test write_bronze with data quality expectations."""
bronze_dataflow_spec = BronzeDataflowSpec(**self.bronze_dataflow_spec_map)
bronze_dataflow_spec.dataQualityExpectations = json.dumps({
Expand All @@ -1301,7 +1307,7 @@ def test_write_bronze_with_dqe(self, mock_write_bronze_with_dqe):
view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputview"
pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None)
pipeline.write_bronze()
assert mock_write_bronze_with_dqe.called
assert mock_write_layer_with_dqe.called

@patch.object(DataflowPipeline, 'cdc_apply_changes', return_value=None)
def test_write_bronze_cdc_apply_changes(self, mock_cdc_apply_changes):
Expand Down
10 changes: 5 additions & 5 deletions tests/test_onboard_dataflowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def test_get_quarantine_details_with_partitions_and_properties(self):
onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map)
quarantine_target_details, quarantine_table_properties = (
onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details(
"it", onboarding_row)
"it", "bronze", onboarding_row)
)
self.assertEqual(quarantine_target_details["database"], "quarantine_db")
self.assertEqual(quarantine_target_details["table"], "quarantine_table")
Expand All @@ -523,7 +523,7 @@ def test_get_quarantine_details_without_partitions_and_properties(self):
onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map)
quarantine_target_details, quarantine_table_properties = (
onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details(
"it", onboarding_row
"it", "bronze", onboarding_row
)
)
self.assertEqual(quarantine_target_details["path"], "quarantine_path")
Expand All @@ -543,7 +543,7 @@ def test_get_quarantine_details_with_uc_enabled(self):
)
quarantine_target_details, quarantine_table_properties = (
onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details(
"it", onboarding_row
"it", "bronze", onboarding_row
)
)
self.assertEqual(quarantine_target_details["database"], "quarantine_db")
Expand All @@ -565,7 +565,7 @@ def test_get_quarantine_details_with_cluster_by_and_properties(self):
onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map)
quarantine_target_details, quarantine_table_properties = (
onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details(
"it", onboarding_row)
"it", "bronze", onboarding_row)
)
self.assertEqual(quarantine_target_details["database"], "quarantine_db")
self.assertEqual(quarantine_target_details["table"], "quarantine_table")
Expand All @@ -587,7 +587,7 @@ def test_set_quarantine_details_with_cluster_by_and_zOrder_properties(self):

with self.assertRaises(Exception) as context:
onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details(
"it", onboarding_row)
"it", "bronze", onboarding_row)
print(str(context.exception))
self.assertEqual(str(context.exception),
"Can not support zOrder and cluster_by together at bronze_quarantine_table_cluster_by")
Expand Down
Loading