Skip to content

Conversation

ravi-databricks
Copy link
Contributor

@ravi-databricks ravi-databricks commented Jun 25, 2024

Need to provide ability so that file metadata can be added to dataframe

e.g import dlt @dlt.table def bronze(): return (spark.readStream.format("cloudFiles") # define the schema for the ~6 common columns across files. All other input fields will be "rescued" into a JSON string column that can be queried via dot notation. .schema("Common1 string, Common2 string, _file_path string") # _file_path is a hidden auto-field but shows up in rescueData column JSON with this method. Spoofing that I have the same column in my input file so i can drop this spoofed column later .option("cloudFiles.format", "csv") .option("cloudFiles.schemaEvolutionMode", "rescue") .option("cloudFiles.rescuedDataColumn","extraFields") # override default _rescuedData column name with whatever you want to call this column .option("header","true") .load("/Volumes/vol/data/*.txt") .select("*","_metadata") # add file metadata information to output .drop("_file_path") # discard dummy input column to keep _file_path out of extraFields rescue data column ) 

Introduced select_metadata_cols inside source_details in onboarding file

 "source_metadata": { "include_autoloader_metadata_column": "True", "autoloader_metadata_col_name": "source_metadata", "select_metadata_cols": { "input_file_name": "_metadata.file_name", "input_file_path": "_metadata.file_path" } 

This will be utilized to add metadata columns to target tables.

Introducing custom_transform_func so that customers can bring their own transformations:
e.g.
you have custom transformations defined as python code as show below which takes dataframe as input and returns transformed dataframe

from pyspark.sql import DataFrame from pyspark.sql.functions import lit def custom_tranform_func_test(input_df) -> DataFrame: return input_df.withColumn('custom_col', lit('test')) 

you should able to pass this function to dlt-meta invoke function as below

layer = spark.conf.get("layer", None) from src.dataflow_pipeline import DataflowPipeline DataflowPipeline.invoke_dlt_pipeline(spark, layer, custom_tranform_func=custom_tranform_func_test) 
Copy link

codecov bot commented Jun 25, 2024

Codecov Report

Attention: Patch coverage is 56.25000% with 14 lines in your changes missing coverage. Please review.

Please upload report for BASE (feature/v0.0.8@3555aaa). Learn more about missing BASE report.

Files Patch % Lines
src/pipeline_readers.py 33.33% 13 Missing and 1 partial ⚠️
Additional details and impacted files
@@ Coverage Diff @@ ## feature/v0.0.8 #56 +/- ## ================================================= Coverage ? 88.57% ================================================= Files ? 8 Lines ? 858 Branches ? 168 ================================================= Hits ? 760 Misses ? 46 Partials ? 52 
Flag Coverage Δ
unittests 88.57% <56.25%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

1.Custom Transformations for bronze layer 2.Unit tests 3.Increased version to v0.0.8
@ravi-databricks ravi-databricks added this to the v0.0.8 milestone Jul 3, 2024
@ravi-databricks ravi-databricks added the enhancement New feature or request label Jul 3, 2024
| data_flow_group | This is group identifer for launching multiple pipelines under single DLT |
| source_format | Source format e.g `cloudFiles`, `eventhub`, `kafka`, `delta` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database` and for eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
| source_details | This map Type captures all source details for cloudfiles = `source_schema_path`, `source_path_{env}`, `source_database`, `source_metadata` For eventhub= `source_schema_path` , `eventhub.accessKeyName`, `eventhub.accessKeySecretName`, `eventhub.name` , `eventhub.secretsScopeName` , `kafka.sasl.mechanism`, `kafka.security.protocol`, `eventhub.namespace`, `eventhub.port`. For Source schema file spark DDL schema format parsing is supported <br> In case of custom schema format then write schema parsing function `bronze_schema_mapper(schema_file_path, spark):Schema` and provide to `OnboardDataflowspec` initialization <br> .e.g `onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs()` |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a period before "For eventhub="

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

2 participants