Skip to content

Commit 19bd566

Browse files
authored
Handling of trailing undetected empty columns (#625)
* Addition of tralining undetected empty columns * Google sheets tests adjsuted as per docs * Improved test * Pipedrive test skips and fixes
1 parent 1443421 commit 19bd566

File tree

4 files changed

+99
-7
lines changed

4 files changed

+99
-7
lines changed

sources/google_sheets/helpers/data_processing.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ def process_range(
251251
# empty row; skip
252252
if not row:
253253
continue
254+
# align trailing empty columns
255+
data_types += [None] * (len(headers) - len(row))
256+
row += [""] * (len(headers) - len(row))
254257
table_dict = {}
255258
# process both rows and check for differences to spot dates
256259
for val, header, data_type in zip(row, headers, data_types):

sources/pipedrive/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ def parsed_mapping(
174174

175175

176176
@dlt.resource(primary_key="id", write_disposition="merge")
177-
def leads(
177+
def leads(
178178
pipedrive_api_key: str = dlt.secrets.value,
179179
update_time: dlt.sources.incremental[str] = dlt.sources.incremental(
180180
"update_time", "1970-01-01 00:00:00"

tests/google_sheets/test_google_sheets_source.py

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
"inconsistent_types",
2626
"more_data",
2727
"more_headers_than_data",
28-
"NamedRange1",
29-
"NamedRange2",
3028
"only_data",
3129
"only_headers",
3230
"Sheet 1",
@@ -36,13 +34,15 @@
3634
"two_tables",
3735
"hidden_columns_merged_cells",
3836
"Blank Columns",
37+
"trailing_empty_cols_1",
38+
"trailing_empty_cols_2",
39+
"trailing_empty_cols_3",
3940
}
4041

4142
SKIPPED_RANGES = {
4243
"empty",
4344
"only_data",
4445
"only_headers",
45-
"NamedRange2",
4646
}
4747

4848
NAMED_RANGES = {
@@ -62,7 +62,6 @@
6262
"inconsistent_types",
6363
"more_data",
6464
"more_headers_than_data",
65-
"named_range1",
6665
"sheet_1",
6766
"sheet2",
6867
"sheet3",
@@ -71,6 +70,9 @@
7170
"two_tables",
7271
"hidden_columns_merged_cells",
7372
"blank_columns",
73+
"trailing_empty_cols_1",
74+
"trailing_empty_cols_2",
75+
"trailing_empty_cols_3",
7476
}
7577

7678

@@ -98,15 +100,20 @@ def test_full_load(destination_name: str) -> None:
98100
"""
99101

100102
info, pipeline = _run_pipeline(
101-
destination_name=destination_name, dataset_name="test_full_load"
103+
destination_name=destination_name,
104+
dataset_name="test_full_load",
105+
get_sheets=True,
106+
get_named_ranges=False,
107+
range_names=[],
102108
)
103109
assert_load_info(info)
104110

105111
# The schema should contain all listed tables
106112
# ALL_TABLES is missing spreadsheet info table - table being tested here
107113
schema = pipeline.default_schema
108114
user_tables = schema.data_tables()
109-
assert set([t["name"] for t in user_tables]) == ALL_TABLES_LOADED
115+
user_table_names = set([t["name"] for t in user_tables])
116+
assert user_table_names == ALL_TABLES_LOADED
110117

111118
# check load metadata
112119
with pipeline.sql_client() as c:
@@ -635,6 +642,7 @@ def test_no_ranges():
635642
info, pipeline = _run_pipeline(
636643
destination_name="duckdb",
637644
dataset_name="test_table_in_middle",
645+
range_names=[],
638646
get_sheets=False,
639647
get_named_ranges=False,
640648
)
@@ -681,6 +689,80 @@ def test_table_not_A1():
681689
)
682690

683691

692+
def test_trailing_empty_cols() -> None:
693+
info, pipeline = _run_pipeline(
694+
destination_name="duckdb",
695+
dataset_name="test_trailing_empty_cols",
696+
range_names=[
697+
"trailing_empty_cols_1",
698+
"trailing_empty_cols_2",
699+
"trailing_empty_cols_3",
700+
],
701+
get_sheets=False,
702+
get_named_ranges=False,
703+
)
704+
assert_load_info(info)
705+
706+
assert "trailing_empty_cols_1" in pipeline.default_schema.tables
707+
assert "trailing_empty_cols_2" in pipeline.default_schema.tables
708+
assert "trailing_empty_cols_3" in pipeline.default_schema.tables
709+
710+
assert set(
711+
pipeline.default_schema.get_table_columns("trailing_empty_cols_1").keys()
712+
) == {"col0", "col1", "col2", "_dlt_id", "_dlt_load_id"}
713+
assert set(
714+
pipeline.default_schema.get_table_columns("trailing_empty_cols_2").keys()
715+
) == {
716+
"col0",
717+
"col1",
718+
"col2",
719+
"col3",
720+
"col3__v_text",
721+
"col4",
722+
"_dlt_id",
723+
"_dlt_load_id",
724+
}
725+
assert set(
726+
pipeline.default_schema.get_table_columns("trailing_empty_cols_3").keys()
727+
) == {
728+
"col0",
729+
"col1",
730+
"col2",
731+
"col3",
732+
"col3__v_text",
733+
"col4",
734+
"col5",
735+
"_dlt_id",
736+
"_dlt_load_id",
737+
}
738+
739+
expected_rows = [
740+
(322, None, None, 2, None, None, 123456),
741+
(43, "dsa", "dd", None, "w", 2, None),
742+
(432, "scds", "ddd", None, "e", 3, None),
743+
(None, "dsfdf", "dddd", None, "r", 4, None),
744+
]
745+
746+
with pipeline.sql_client() as c:
747+
sql_query = "SELECT col0, col1, col2 FROM trailing_empty_cols_1;"
748+
with c.execute_query(sql_query) as cur:
749+
rows = list(cur.fetchall())
750+
assert len(rows) == 4
751+
assert rows == [row[:3] for row in expected_rows]
752+
753+
sql_query = "SELECT col0, col1, col2, col3, col3__v_text, col4 FROM trailing_empty_cols_2;"
754+
with c.execute_query(sql_query) as cur:
755+
rows = list(cur.fetchall())
756+
assert len(rows) == 4
757+
assert rows == [row[:6] for row in expected_rows]
758+
759+
sql_query = "SELECT col0, col1, col2, col3, col3__v_text, col4, col5 FROM trailing_empty_cols_3;"
760+
with c.execute_query(sql_query) as cur:
761+
rows = list(cur.fetchall())
762+
assert len(rows) == 4
763+
assert rows == expected_rows
764+
765+
684766
def _row_helper(row, destination_name):
685767
"""
686768
Helper, unpacks the rows from different databases (Bigquery, Postgres, Redshift) to a tuple

tests/pipedrive/test_pipedrive_source.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
"stages",
4242
"users",
4343
"leads",
44+
"tasks",
45+
"projects",
4446
}
4547

4648
# we have no data in our test account (only leads)
@@ -64,6 +66,7 @@
6466

6567
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
6668
def test_all_resources(destination_name: str) -> None:
69+
pytest.skip("Unskip after setting up credentials.")
6770
# mind the dev_mode flag - it makes sure that data is loaded to unique dataset. this allows you to run the tests on the same database in parallel
6871
# configure the pipeline with your destination details
6972
pipeline = dlt.pipeline(
@@ -85,6 +88,7 @@ def test_all_resources(destination_name: str) -> None:
8588

8689
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
8790
def test_leads_resource_incremental(destination_name: str) -> None:
91+
pytest.skip("Unskip after setting up credentials.")
8892
pipeline = dlt.pipeline(
8993
pipeline_name="pipedrive",
9094
destination=destination_name,
@@ -249,6 +253,7 @@ def test_custom_fields_munger(destination_name: str) -> None:
249253

250254

251255
def test_since_timestamp() -> None:
256+
pytest.skip("Unskip after setting up credentials.")
252257
"""since_timestamp is coerced correctly to UTC implicit ISO timestamp and passed to endpoint function"""
253258
with mock.patch(
254259
"sources.pipedrive.helpers.pages.get_pages",
@@ -292,6 +297,7 @@ def test_since_timestamp() -> None:
292297

293298
@pytest.mark.parametrize("destination_name", ALL_DESTINATIONS)
294299
def test_incremental(destination_name: str) -> None:
300+
pytest.skip("Unskip after setting up credentials.")
295301
pipeline = dlt.pipeline(
296302
pipeline_name="pipedrive",
297303
destination=destination_name,
@@ -434,6 +440,7 @@ def test_rename_fields_with_set() -> None:
434440

435441

436442
def test_recents_none_data_items_from_recents() -> None:
443+
pytest.skip("Unskip after setting up credentials.")
437444
"""Pages from /recents sometimes contain `None` data items which cause errors.
438445
Reproduces this with a mocked response. Simply verify that extract runs without exceptions, meaning nones are filtered out.
439446
"""

0 commit comments

Comments
 (0)