Skip to content

Commit c1d7736

Browse files
etsybaevedgao
andauthored
🎉Updated normalization to handle new datatypes (#19721)
* Updated normalization simple stream processing to handle new datatypes * Updated normalization nested stream processing to handle new datatypes * Updated normalization nested stream processing to handle new datatypes * Updated normalization drop_scd_catalog processing to handle new datatypes * Updated normalization ephemeral test processing to handle new datatypes * fixed more tests for normalization * fixed more tests for normalization * fixed more tests for normalization * fixed more tests for normalization * fixed more issues * fixed more issues (clickhouse) * fixed more issues * fixed more issues * fixed more issues * added binary type processing for some DBs * cleared commented code and moved some hardcodes to processing as macro * fixed codestyle and cleared commented code * minor refactor * minor refactor * minor refactor * fixed bool cast error * fixed dict->str cast error * fixed is_combining_node cast py check * removed commented code * removed commented code * committed autogenerated normalization_test_output files * committed autogenerated normalization_test_output files (new files) * refactored utils.py * Updated utils.py to use Callable functions and get rid of property_type in is_number and is_bool functions * committed autogenerated normalization_test_output files (new files) * fixed typo in TIMESTAMP_WITH_TIMEZONE_TYPE * updated stream_processor to handle string type first as a wider type * fixed arrays normalization by updating is_simple_property method as per new approaches * format Co-authored-by: Edward Gao <edward.gao@airbyte.io>
1 parent ab52888 commit c1d7736

File tree

234 files changed

+4175
-1043
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

234 files changed

+4175
-1043
lines changed

airbyte-integrations/bases/base-normalization/dbt-project-template/macros/cross_db_utils/datatypes.sql

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,14 @@
88
string
99
{% endmacro %}
1010

11+
{%- macro type_binary() -%}
12+
{{ adapter.dispatch('type_binary')() }}
13+
{%- endmacro -%}
14+
15+
{%- macro default__type_binary() -%}
16+
binary
17+
{%- endmacro -%}
18+
1119
{%- macro redshift__type_json() -%}
1220
{%- if redshift_super_type() -%}
1321
super
@@ -72,6 +80,28 @@
7280
char(1000)
7381
{%- endmacro -%}
7482

83+
{# binary data ------------------------------------------------- #}
84+
85+
{%- macro postgres__type_binary() -%}
86+
bytea
87+
{%- endmacro -%}
88+
89+
{%- macro bigquery__type_binary() -%}
90+
bytes
91+
{%- endmacro -%}
92+
93+
{%- macro mssql__type_binary() -%}
94+
VARBINARY(MAX)
95+
{%- endmacro -%}
96+
97+
{%- macro snowflake__type_binary() -%}
98+
VARBINARY
99+
{%- endmacro -%}
100+
101+
{%- macro clickhouse__type_binary() -%}
102+
VARBINARY
103+
{%- endmacro -%}
104+
75105
{# float ------------------------------------------------- #}
76106
{% macro mysql__type_float() %}
77107
float

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_nested_streams/models/generated/airbyte_incremental/scd/test_normalization/nested_stream_with_complex_columns_resulting_into_long_names_scd.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
from (
3535
select distinct _airbyte_unique_key as unique_key
3636
from {{ this }}
37-
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
37+
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
3838
) recent_records
3939
left join (
4040
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
4141
from {{ this }}
42-
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
42+
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('nested_stream_with_complex_columns_resulting_into_long_names')) }}
4343
group by _airbyte_unique_key
4444
) active_counts
4545
on recent_records.unique_key = active_counts.unique_key

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/first_output/airbyte_tables/test_normalization/exchange_rate.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ select
2424
json_extract_scalar(_airbyte_data, "$['datetime_no_tz']") as datetime_no_tz,
2525
json_extract_scalar(_airbyte_data, "$['time_tz']") as time_tz,
2626
json_extract_scalar(_airbyte_data, "$['time_no_tz']") as time_no_tz,
27+
json_extract_scalar(_airbyte_data, "$['property_binary_data']") as property_binary_data,
2728
_airbyte_ab_id,
2829
_airbyte_emitted_at,
2930
CURRENT_TIMESTAMP() as _airbyte_normalized_at
@@ -74,6 +75,7 @@ select
7475
cast(nullif(time_no_tz, '') as
7576
time
7677
) as time_no_tz,
78+
cast(FROM_BASE64(property_binary_data) as bytes) as property_binary_data,
7779
_airbyte_ab_id,
7880
_airbyte_emitted_at,
7981
CURRENT_TIMESTAMP() as _airbyte_normalized_at
@@ -111,6 +113,8 @@ select
111113
string
112114
), ''), '-', coalesce(cast(time_no_tz as
113115
string
116+
), ''), '-', coalesce(cast(property_binary_data as
117+
string
114118
), '')) as
115119
string
116120
))) as _airbyte_exchange_rate_hashid,
@@ -134,6 +138,7 @@ select
134138
datetime_no_tz,
135139
time_tz,
136140
time_no_tz,
141+
property_binary_data,
137142
_airbyte_ab_id,
138143
_airbyte_emitted_at,
139144
CURRENT_TIMESTAMP() as _airbyte_normalized_at,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
from (
3535
select distinct _airbyte_unique_key as unique_key
3636
from {{ this }}
37-
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
37+
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
3838
) recent_records
3939
left join (
4040
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
4141
from {{ this }}
42-
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
42+
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
4343
group by _airbyte_unique_key
4444
) active_counts
4545
on recent_records.unique_key = active_counts.unique_key

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/models/generated/airbyte_tables/test_normalization/exchange_rate.sql

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,22 @@
33
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
44
unique_key = '_airbyte_ab_id',
55
schema = "test_normalization",
6+
post_hook = ["
7+
{%
8+
set scd_table_relation = adapter.get_relation(
9+
database=this.database,
10+
schema=this.schema,
11+
identifier='exchange_rate_scd'
12+
)
13+
%}
14+
{%
15+
if scd_table_relation is not none
16+
%}
17+
{%
18+
do adapter.drop_relation(scd_table_relation)
19+
%}
20+
{% endif %}
21+
"],
622
tags = [ "top-level" ]
723
) }}
824
-- Final base SQL model
@@ -21,6 +37,7 @@ select
2137
datetime_no_tz,
2238
time_tz,
2339
time_no_tz,
40+
property_binary_data,
2441
_airbyte_ab_id,
2542
_airbyte_emitted_at,
2643
{{ current_timestamp() }} as _airbyte_normalized_at,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_incremental/scd/test_normalization/dedup_exchange_rate_scd.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
from (
3535
select distinct _airbyte_unique_key as unique_key
3636
from {{ this }}
37-
where 1=1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
37+
where 1=1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
3838
) recent_records
3939
left join (
4040
select _airbyte_unique_key as unique_key, count(_airbyte_unique_key) as active_count
4141
from {{ this }}
42-
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', this.schema + '.' + adapter.quote('dedup_exchange_rate')) }}
42+
where _airbyte_active_row = 1 {{ incremental_clause('_airbyte_normalized_at', adapter.quote(this.schema) + '.' + adapter.quote('dedup_exchange_rate')) }}
4343
group by _airbyte_unique_key
4444
) active_counts
4545
on recent_records.unique_key = active_counts.unique_key

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/modified_models/generated/airbyte_tables/test_normalization/exchange_rate.sql

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,22 @@
33
partition_by = {"field": "_airbyte_emitted_at", "data_type": "timestamp", "granularity": "day"},
44
unique_key = '_airbyte_ab_id',
55
schema = "test_normalization",
6+
post_hook = ["
7+
{%
8+
set scd_table_relation = adapter.get_relation(
9+
database=this.database,
10+
schema=this.schema,
11+
identifier='exchange_rate_scd'
12+
)
13+
%}
14+
{%
15+
if scd_table_relation is not none
16+
%}
17+
{%
18+
do adapter.drop_relation(scd_table_relation)
19+
%}
20+
{% endif %}
21+
"],
622
tags = [ "top-level" ]
723
) }}
824
-- Final base SQL model

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/bigquery/test_simple_streams/second_output/airbyte_tables/test_normalization/exchange_rate.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ select
2424
json_extract_scalar(_airbyte_data, "$['datetime_no_tz']") as datetime_no_tz,
2525
json_extract_scalar(_airbyte_data, "$['time_tz']") as time_tz,
2626
json_extract_scalar(_airbyte_data, "$['time_no_tz']") as time_no_tz,
27+
json_extract_scalar(_airbyte_data, "$['property_binary_data']") as property_binary_data,
2728
_airbyte_ab_id,
2829
_airbyte_emitted_at,
2930
CURRENT_TIMESTAMP() as _airbyte_normalized_at
@@ -74,6 +75,7 @@ select
7475
cast(nullif(time_no_tz, '') as
7576
time
7677
) as time_no_tz,
78+
cast(FROM_BASE64(property_binary_data) as bytes) as property_binary_data,
7779
_airbyte_ab_id,
7880
_airbyte_emitted_at,
7981
CURRENT_TIMESTAMP() as _airbyte_normalized_at
@@ -111,6 +113,8 @@ select
111113
string
112114
), ''), '-', coalesce(cast(time_no_tz as
113115
string
116+
), ''), '-', coalesce(cast(property_binary_data as
117+
string
114118
), '')) as
115119
string
116120
))) as _airbyte_exchange_rate_hashid,
@@ -134,6 +138,7 @@ select
134138
datetime_no_tz,
135139
time_tz,
136140
time_no_tz,
141+
property_binary_data,
137142
_airbyte_ab_id,
138143
_airbyte_emitted_at,
139144
CURRENT_TIMESTAMP() as _airbyte_normalized_at,

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_ctes/test_normalization/dedup_exchange_rate_ab1.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11

22

3-
create view _airbyte_test_normalization.dedup_exchange_rate_ab1__dbt_tmp
3+
create view _airbyte_test_normalization.dedup_exchange_rate_ab1
44

55
as (
66

airbyte-integrations/bases/base-normalization/integration_tests/normalization_test_output/clickhouse/test_simple_streams/first_output/airbyte_ctes/test_normalization/dedup_exchange_rate_ab2.sql

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,26 @@
11

22

3-
create view _airbyte_test_normalization.dedup_exchange_rate_ab2__dbt_tmp
3+
create view _airbyte_test_normalization.dedup_exchange_rate_ab2
44

55
as (
66

77
-- SQL model to cast each column to its adequate SQL type converted from the JSON schema type
88
-- depends_on: _airbyte_test_normalization.dedup_exchange_rate_ab1
99
select
10-
accurateCastOrNull(id, '
10+
accurateCastOrNull(trim(BOTH '"' from id), '
1111
BIGINT
1212
') as id,
1313
nullif(accurateCastOrNull(trim(BOTH '"' from currency), 'String'), 'null') as currency,
1414
toDate(parseDateTimeBestEffortOrNull(trim(BOTH '"' from nullif(date, '')))) as date,
1515
parseDateTime64BestEffortOrNull(trim(BOTH '"' from nullif(timestamp_col, ''))) as timestamp_col,
16-
accurateCastOrNull("HKD@spéçiäl & characters", '
16+
accurateCastOrNull(trim(BOTH '"' from "HKD@spéçiäl & characters"), '
1717
Float64
1818
') as "HKD@spéçiäl & characters",
1919
nullif(accurateCastOrNull(trim(BOTH '"' from HKD_special___characters), 'String'), 'null') as HKD_special___characters,
20-
accurateCastOrNull(NZD, '
20+
accurateCastOrNull(trim(BOTH '"' from NZD), '
2121
Float64
2222
') as NZD,
23-
accurateCastOrNull(USD, '
23+
accurateCastOrNull(trim(BOTH '"' from USD), '
2424
Float64
2525
') as USD,
2626
_airbyte_ab_id,

0 commit comments

Comments
 (0)