1
1
import json
2
2
import logging
3
3
import os
4
+ import time
4
5
import rich
5
6
import yaml
6
7
from dataclasses import dataclass
11
12
from dbt_artifacts_parser .parser import parse_run_results , parse_manifest
12
13
from dbt .config .renderer import ProfileRenderer
13
14
15
+ from .tracking import (
16
+ set_entrypoint_name ,
17
+ create_end_event_json ,
18
+ create_start_event_json ,
19
+ send_event_json ,
20
+ is_tracking_enabled ,
21
+ )
22
+ from .utils import run_as_daemon , truncate_error
14
23
from . import connect_to_table , diff_tables , Algorithm
15
24
16
25
RUN_RESULTS_PATH = "/target/run_results.json"
@@ -33,6 +42,7 @@ class DiffVars:
33
42
def dbt_diff (
34
43
profiles_dir_override : Optional [str ] = None , project_dir_override : Optional [str ] = None , is_cloud : bool = False
35
44
) -> None :
45
+ set_entrypoint_name ("CLI-dbt" )
36
46
dbt_parser = DbtParser (profiles_dir_override , project_dir_override , is_cloud )
37
47
models = dbt_parser .get_models ()
38
48
dbt_parser .set_project_dict ()
@@ -190,22 +200,53 @@ def _cloud_diff(diff_vars: DiffVars) -> None:
190
200
"Authorization" : f"Key { api_key } " ,
191
201
"Content-Type" : "application/json" ,
192
202
}
203
+ if is_tracking_enabled ():
204
+ event_json = create_start_event_json ({"is_cloud" : True , "datasource_id" : diff_vars .datasource_id })
205
+ run_as_daemon (send_event_json , event_json )
193
206
194
- response = requests .request ("POST" , url , headers = headers , json = payload , timeout = 30 )
195
- response .raise_for_status ()
196
- data = response .json ()
197
- diff_id = data ["id" ]
198
- # TODO in future we should support self hosted datafold
199
- diff_url = f"https://app.datafold.com/datadiffs/{ diff_id } /overview"
200
- rich .print (
201
- "[red]"
202
- + "." .join (diff_vars .dev_path )
203
- + " <> "
204
- + "." .join (diff_vars .prod_path )
205
- + "[/] \n Diff in progress: \n "
206
- + diff_url
207
- + "\n "
208
- )
207
+ start = time .monotonic ()
208
+ error = None
209
+ diff_id = None
210
+ try :
211
+ response = requests .request ("POST" , url , headers = headers , json = payload , timeout = 30 )
212
+ response .raise_for_status ()
213
+ data = response .json ()
214
+ diff_id = data ["id" ]
215
+ # TODO in future we should support self hosted datafold
216
+ diff_url = f"https://app.datafold.com/datadiffs/{ diff_id } /overview"
217
+ rich .print (
218
+ "[red]"
219
+ + "." .join (diff_vars .dev_path )
220
+ + " <> "
221
+ + "." .join (diff_vars .prod_path )
222
+ + "[/] \n Diff in progress: \n "
223
+ + diff_url
224
+ + "\n "
225
+ )
226
+ except BaseException as ex : # Catch KeyboardInterrupt too
227
+ error = ex
228
+ finally :
229
+ # we don't currently have much of this information
230
+ # but I imagine a future iteration of this _cloud method
231
+ # will poll for results
232
+ if is_tracking_enabled ():
233
+ err_message = truncate_error (repr (error ))
234
+ event_json = create_end_event_json (
235
+ is_success = error is None ,
236
+ runtime_seconds = time .monotonic () - start ,
237
+ data_source_1_type = "" ,
238
+ data_source_2_type = "" ,
239
+ table1_count = 0 ,
240
+ table2_count = 0 ,
241
+ diff_count = 0 ,
242
+ error = err_message ,
243
+ diff_id = diff_id ,
244
+ is_cloud = True ,
245
+ )
246
+ send_event_json (event_json )
247
+
248
+ if error :
249
+ raise error
209
250
210
251
211
252
class DbtParser :
@@ -230,7 +271,6 @@ def get_models(self):
230
271
231
272
dbt_version = parse_version (run_results_obj .metadata .dbt_version )
232
273
233
- # TODO 1.4 support
234
274
if dbt_version < parse_version (LOWER_DBT_V ) or dbt_version >= parse_version (UPPER_DBT_V ):
235
275
raise Exception (
236
276
f"Found dbt: v{ dbt_version } Expected the dbt project's version to be >= { LOWER_DBT_V } and < { UPPER_DBT_V } "
0 commit comments