1919from metadata_service .models .generated .ConnectorRegistryV0 import ConnectorRegistryV0
2020
2121
22-
2322GROUP_NAME = "registry"
2423
2524# ERRORS
@@ -112,7 +111,9 @@ def is_metadata_connector_type(metadata_definition: dict, connector_type: str) -
112111 return metadata_definition ["data" ]["connectorType" ] == connector_type
113112
114113
115- def construct_registry_from_metadata (legacy_registry_derived_metadata_definitions : List [MetadataDefinition ], registry_name : str ) -> ConnectorRegistryV0 :
114+ def construct_registry_from_metadata (
115+ legacy_registry_derived_metadata_definitions : List [MetadataDefinition ], registry_name : str
116+ ) -> ConnectorRegistryV0 :
116117 """Construct the registry from the metadata definitions.
117118
118119 Args:
@@ -136,6 +137,7 @@ def construct_registry_from_metadata(legacy_registry_derived_metadata_definition
136137
137138 return {"sources" : registry_sources , "destinations" : registry_destinations }
138139
140+
139141def construct_registry_with_spec_from_registry (registry : dict , cached_specs : OutputDataFrame ) -> dict :
140142 entries = [("source" , entry ) for entry in registry ["sources" ]] + [("destinations" , entry ) for entry in registry ["destinations" ]]
141143
@@ -157,7 +159,10 @@ def construct_registry_with_spec_from_registry(registry: dict, cached_specs: Out
157159 raise MissingCachedSpecError (f"No cached spec found for { entry ['dockerRegistry' ]:{entry ['dockerImageTag' ]}} " )
158160 return registry_with_specs
159161
160- def persist_registry_to_json (registry : ConnectorRegistryV0 , registry_name : str , registry_directory_manager : GCSFileManager ) -> GCSFileHandle :
162+
163+ def persist_registry_to_json (
164+ registry : ConnectorRegistryV0 , registry_name : str , registry_directory_manager : GCSFileManager
165+ ) -> GCSFileHandle :
161166 """Persist the registry to a json file.
162167
163168 Args:
@@ -173,10 +178,14 @@ def persist_registry_to_json(registry: ConnectorRegistryV0, registry_name: str,
173178 file_handle = registry_directory_manager .write_data (registry_json .encode ("utf-8" ), ext = "json" , key = registry_file_name )
174179 return file_handle
175180
181+
176182# New Registry
177183
184+
178185@asset (required_resource_keys = {"registry_directory_manager" }, group_name = GROUP_NAME )
179- def cloud_registry_from_metadata (context : OpExecutionContext , metadata_definitions : List [MetadataDefinition ], cached_specs : OutputDataFrame ) -> Output [ConnectorRegistryV0 ]:
186+ def cloud_registry_from_metadata (
187+ context : OpExecutionContext , metadata_definitions : List [MetadataDefinition ], cached_specs : OutputDataFrame
188+ ) -> Output [ConnectorRegistryV0 ]:
180189 """
181190 This asset is used to generate the cloud registry from the metadata definitions.
182191 """
@@ -185,19 +194,21 @@ def cloud_registry_from_metadata(context: OpExecutionContext, metadata_definitio
185194
186195 from_metadata = construct_registry_from_metadata (metadata_definitions , registry_name )
187196 registry_dict = construct_registry_with_spec_from_registry (from_metadata , cached_specs )
188- reigstry_model = ConnectorRegistryV0 .parse_obj (registry_dict )
197+ registry_model = ConnectorRegistryV0 .parse_obj (registry_dict )
189198
190- file_handle = persist_registry_to_json (reigstry_model , registry_name , registry_directory_manager )
199+ file_handle = persist_registry_to_json (registry_model , registry_name , registry_directory_manager )
191200
192201 metadata = {
193202 "gcs_path" : MetadataValue .url (file_handle .gcs_path ),
194203 }
195204
196- return Output (metadata = metadata , value = reigstry_model )
205+ return Output (metadata = metadata , value = registry_model )
197206
198207
199208@asset (required_resource_keys = {"registry_directory_manager" }, group_name = GROUP_NAME )
200- def oss_registry_from_metadata (context : OpExecutionContext , metadata_definitions : List [MetadataDefinition ], cached_specs : OutputDataFrame ) -> Output [ConnectorRegistryV0 ]:
209+ def oss_registry_from_metadata (
210+ context : OpExecutionContext , metadata_definitions : List [MetadataDefinition ], cached_specs : OutputDataFrame
211+ ) -> Output [ConnectorRegistryV0 ]:
201212 """
202213 This asset is used to generate the oss registry from the metadata definitions.
203214 """
@@ -206,15 +217,16 @@ def oss_registry_from_metadata(context: OpExecutionContext, metadata_definitions
206217
207218 from_metadata = construct_registry_from_metadata (metadata_definitions , registry_name )
208219 registry_dict = construct_registry_with_spec_from_registry (from_metadata , cached_specs )
209- reigstry_model = ConnectorRegistryV0 .parse_obj (registry_dict )
220+ registry_model = ConnectorRegistryV0 .parse_obj (registry_dict )
210221
211- file_handle = persist_registry_to_json (reigstry_model , registry_name , registry_directory_manager )
222+ file_handle = persist_registry_to_json (registry_model , registry_name , registry_directory_manager )
212223
213224 metadata = {
214225 "gcs_path" : MetadataValue .url (file_handle .gcs_path ),
215226 }
216227
217- return Output (metadata = metadata , value = reigstry_model )
228+ return Output (metadata = metadata , value = registry_model )
229+
218230
219231@asset (group_name = GROUP_NAME )
220232def cloud_sources_dataframe (cloud_registry_from_metadata : ConnectorRegistryV0 ) -> OutputDataFrame :
@@ -243,8 +255,10 @@ def oss_destinations_dataframe(oss_registry_from_metadata: ConnectorRegistryV0)
243255 destinations = oss_registry_from_metadata_dict ["destinations" ]
244256 return output_dataframe (pd .DataFrame (destinations ))
245257
258+
246259# Old Registry
247260
261+
248262@asset (group_name = GROUP_NAME )
249263def legacy_cloud_sources_dataframe (legacy_cloud_registry_dict : dict ) -> OutputDataFrame :
250264 sources = legacy_cloud_registry_dict ["sources" ]
@@ -278,6 +292,7 @@ def legacy_cloud_registry(legacy_cloud_registry_dict: dict) -> ConnectorRegistry
278292def legacy_oss_registry (legacy_oss_registry_dict : dict ) -> ConnectorRegistryV0 :
279293 return ConnectorRegistryV0 .parse_obj (legacy_oss_registry_dict )
280294
295+
281296@asset (required_resource_keys = {"legacy_cloud_registry_gcs_blob" }, group_name = GROUP_NAME )
282297def legacy_cloud_registry_dict (context : OpExecutionContext ) -> dict :
283298 oss_registry_file = context .resources .legacy_cloud_registry_gcs_blob
@@ -292,4 +307,3 @@ def legacy_oss_registry_dict(context: OpExecutionContext) -> dict:
292307 json_string = oss_registry_file .download_as_string ().decode ("utf-8" )
293308 oss_registry_dict = json .loads (json_string )
294309 return oss_registry_dict
295-
0 commit comments