Skip to content

Commit 7d73b08

Browse files
authored
SAT: basic read on full catalog when test_strictness_level == high (#18937)
1 parent a990d8c commit 7d73b08

File tree

11 files changed

+385
-72
lines changed

11 files changed

+385
-72
lines changed

airbyte-integrations/bases/source-acceptance-test/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.2.16
4+
Run `basic_read` on the discovered catalog in `high` `test_strictness_level`. [#18937](https://github.com/airbytehq/airbyte/pull/18937).
5+
36
## 0.2.15
47
Make `expect_records` mandatory in `high` `test_strictness_level`. [#18497](https://github.com/airbytehq/airbyte/pull/18497/).
58

airbyte-integrations/bases/source-acceptance-test/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
3333
COPY source_acceptance_test ./source_acceptance_test
3434
RUN pip install .
3535

36-
LABEL io.airbyte.version=0.2.15
36+
LABEL io.airbyte.version=0.2.16
3737
LABEL io.airbyte.name=airbyte/source-acceptance-test
3838

3939
ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]

airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,20 @@
1313
from typing import Any, List, MutableMapping, Optional, Set
1414

1515
import pytest
16-
from airbyte_cdk.models import (
17-
AirbyteRecordMessage,
18-
AirbyteStream,
19-
ConfiguredAirbyteCatalog,
20-
ConfiguredAirbyteStream,
21-
ConnectorSpecification,
22-
DestinationSyncMode,
23-
Type,
24-
)
16+
from airbyte_cdk.models import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConnectorSpecification, Type
2517
from docker import errors
2618
from source_acceptance_test.base import BaseTest
27-
from source_acceptance_test.config import Config, EmptyStreamConfiguration
19+
from source_acceptance_test.config import Config, EmptyStreamConfiguration, ExpectedRecordsConfig
2820
from source_acceptance_test.tests import TestBasicRead
29-
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, load_config, load_yaml_or_json_path
21+
from source_acceptance_test.utils import (
22+
ConnectorRunner,
23+
SecretDict,
24+
build_configured_catalog_from_custom_catalog,
25+
build_configured_catalog_from_discovered_catalog_and_empty_streams,
26+
filter_output,
27+
load_config,
28+
load_yaml_or_json_path,
29+
)
3030

3131

3232
@pytest.fixture(name="acceptance_test_config", scope="session")
@@ -44,7 +44,7 @@ def base_path_fixture(pytestconfig, acceptance_test_config) -> Path:
4444

4545

4646
@pytest.fixture(name="test_strictness_level", scope="session")
47-
def test_strictness_level_fixture(acceptance_test_config: Config):
47+
def test_strictness_level_fixture(acceptance_test_config: Config) -> Config.TestStrictnessLevel:
4848
return acceptance_test_config.test_strictness_level
4949

5050

@@ -75,24 +75,17 @@ def configured_catalog_path_fixture(inputs, base_path) -> Optional[str]:
7575

7676

7777
@pytest.fixture(name="configured_catalog")
78-
def configured_catalog_fixture(configured_catalog_path, discovered_catalog) -> ConfiguredAirbyteCatalog:
79-
"""Take ConfiguredAirbyteCatalog from discover command by default"""
78+
def configured_catalog_fixture(
79+
configured_catalog_path: Optional[str],
80+
discovered_catalog: MutableMapping[str, AirbyteStream],
81+
) -> ConfiguredAirbyteCatalog:
82+
"""Build a configured catalog.
83+
If a configured catalog path is given we build a configured catalog from it, we build it from the discovered catalog otherwise.
84+
"""
8085
if configured_catalog_path:
81-
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
82-
for configured_stream in catalog.streams:
83-
configured_stream.stream = discovered_catalog.get(configured_stream.stream.name, configured_stream.stream)
84-
return catalog
85-
streams = [
86-
ConfiguredAirbyteStream(
87-
stream=stream,
88-
sync_mode=stream.supported_sync_modes[0],
89-
destination_sync_mode=DestinationSyncMode.append,
90-
cursor_field=stream.default_cursor_field,
91-
primary_key=stream.source_defined_primary_key,
92-
)
93-
for _, stream in discovered_catalog.items()
94-
]
95-
return ConfiguredAirbyteCatalog(streams=streams)
86+
return build_configured_catalog_from_custom_catalog(configured_catalog_path, discovered_catalog)
87+
else:
88+
return build_configured_catalog_from_discovered_catalog_and_empty_streams(discovered_catalog, set())
9689

9790

9891
@pytest.fixture(name="image_tag")
@@ -178,12 +171,17 @@ def empty_streams_fixture(inputs, test_strictness_level) -> Set[EmptyStreamConfi
178171
return empty_streams
179172

180173

174+
@pytest.fixture(name="expect_records_config")
175+
def expect_records_config_fixture(inputs):
176+
return inputs.expect_records
177+
178+
181179
@pytest.fixture(name="expected_records_by_stream")
182180
def expected_records_by_stream_fixture(
183181
test_strictness_level: Config.TestStrictnessLevel,
184182
configured_catalog: ConfiguredAirbyteCatalog,
185183
empty_streams: Set[EmptyStreamConfiguration],
186-
inputs,
184+
expect_records_config: ExpectedRecordsConfig,
187185
base_path,
188186
) -> MutableMapping[str, List[MutableMapping]]:
189187
def enforce_high_strictness_level_rules(expect_records_config, configured_catalog, empty_streams, records_by_stream) -> Optional[str]:
@@ -197,8 +195,9 @@ def enforce_high_strictness_level_rules(expect_records_config, configured_catalo
197195
error_prefix
198196
+ f"{', '.join(not_seeded_streams)} streams are declared in the catalog but do not have expected records. Please add expected records to {expect_records_config.path} or declare these streams in empty_streams."
199197
)
200-
201-
expect_records_config = inputs.expect_records
198+
else:
199+
if not getattr(expect_records_config, "bypass_reason", None):
200+
pytest.fail(error_prefix / "A bypass reason must be filled if no path to expected records is provided.")
202201

203202
expected_records_by_stream = {}
204203
if expect_records_config:
@@ -246,7 +245,6 @@ def discovered_catalog_fixture(connector_config, docker_runner: ConnectorRunner,
246245
catalogs = [message.catalog for message in output if message.type == Type.CATALOG]
247246
for stream in catalogs[-1].streams:
248247
cached_schemas[stream.name] = stream
249-
250248
return cached_schemas
251249

252250

airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from collections import Counter, defaultdict
99
from functools import reduce
1010
from logging import Logger
11-
from typing import Any, Dict, List, Mapping, MutableMapping, Set
11+
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set
12+
from xmlrpc.client import Boolean
1213

1314
import dpath.util
1415
import jsonschema
@@ -27,10 +28,23 @@
2728
from docker.errors import ContainerError
2829
from jsonschema._utils import flatten
2930
from source_acceptance_test.base import BaseTest
30-
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
31+
from source_acceptance_test.config import (
32+
BasicReadTestConfig,
33+
Config,
34+
ConnectionTestConfig,
35+
DiscoveryTestConfig,
36+
EmptyStreamConfiguration,
37+
ExpectedRecordsConfig,
38+
SpecTestConfig,
39+
)
3140
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, make_hashable, verify_records_schema
3241
from source_acceptance_test.utils.backward_compatibility import CatalogDiffChecker, SpecDiffChecker, validate_previous_configs
33-
from source_acceptance_test.utils.common import find_all_values_for_key_in_schema, find_keyword_schema
42+
from source_acceptance_test.utils.common import (
43+
build_configured_catalog_from_custom_catalog,
44+
build_configured_catalog_from_discovered_catalog_and_empty_streams,
45+
find_all_values_for_key_in_schema,
46+
find_keyword_schema,
47+
)
3448
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure
3549

3650

@@ -462,11 +476,58 @@ def _validate_expected_records(
462476
detailed_logger=detailed_logger,
463477
)
464478

479+
@pytest.fixture(name="should_validate_schema")
480+
def should_validate_schema_fixture(self, inputs: BasicReadTestConfig, test_strictness_level: Config.TestStrictnessLevel):
481+
if not inputs.validate_schema and test_strictness_level is Config.TestStrictnessLevel.high:
482+
pytest.fail("High strictness level error: validate_schema must be set to true in the basic read test configuration.")
483+
else:
484+
return inputs.validate_schema
485+
486+
@pytest.fixture(name="should_validate_data_points")
487+
def should_validate_data_points_fixture(self, inputs: BasicReadTestConfig) -> Boolean:
488+
# TODO: we might want to enforce this when Config.TestStrictnessLevel.high
489+
return inputs.validate_data_points
490+
491+
@pytest.fixture(name="configured_catalog")
492+
def configured_catalog_fixture(
493+
self,
494+
test_strictness_level: Config.TestStrictnessLevel,
495+
configured_catalog_path: Optional[str],
496+
discovered_catalog: MutableMapping[str, AirbyteStream],
497+
empty_streams: Set[EmptyStreamConfiguration],
498+
) -> ConfiguredAirbyteCatalog:
499+
"""Build a configured catalog for basic read only.
500+
We discard the use of custom configured catalog if:
501+
- No custom configured catalog is declared with configured_catalog_path.
502+
- We are in high test strictness level.
503+
When a custom configured catalog is discarded we use the discovered catalog from which we remove the declared empty streams.
504+
We use a custom configured catalog if a configured_catalog_path is declared and we are not in high test strictness level.
505+
Args:
506+
test_strictness_level (Config.TestStrictnessLevel): The current test strictness level according to the global test configuration.
507+
configured_catalog_path (Optional[str]): Path to a JSON file containing a custom configured catalog.
508+
discovered_catalog (MutableMapping[str, AirbyteStream]): The discovered catalog.
509+
empty_streams (Set[EmptyStreamConfiguration]): The empty streams declared in the test configuration.
510+
511+
Returns:
512+
ConfiguredAirbyteCatalog: the configured Airbyte catalog.
513+
"""
514+
if test_strictness_level is Config.TestStrictnessLevel.high or not configured_catalog_path:
515+
if configured_catalog_path:
516+
pytest.fail(
517+
"High strictness level error: you can't set a custom configured catalog on the basic read test when strictness level is high."
518+
)
519+
return build_configured_catalog_from_discovered_catalog_and_empty_streams(discovered_catalog, empty_streams)
520+
else:
521+
return build_configured_catalog_from_custom_catalog(configured_catalog_path, discovered_catalog)
522+
465523
def test_read(
466524
self,
467525
connector_config,
468526
configured_catalog,
469-
inputs: BasicReadTestConfig,
527+
expect_records_config: ExpectedRecordsConfig,
528+
should_validate_schema: Boolean,
529+
should_validate_data_points: Boolean,
530+
empty_streams: Set[EmptyStreamConfiguration],
470531
expected_records_by_stream: MutableMapping[str, List[MutableMapping]],
471532
docker_runner: ConnectorRunner,
472533
detailed_logger,
@@ -476,25 +537,25 @@ def test_read(
476537

477538
assert records, "At least one record should be read using provided catalog"
478539

479-
if inputs.validate_schema:
540+
if should_validate_schema:
480541
self._validate_schema(records=records, configured_catalog=configured_catalog)
481542

482-
self._validate_empty_streams(records=records, configured_catalog=configured_catalog, allowed_empty_streams=inputs.empty_streams)
543+
self._validate_empty_streams(records=records, configured_catalog=configured_catalog, allowed_empty_streams=empty_streams)
483544
for pks, record in primary_keys_for_records(streams=configured_catalog.streams, records=records):
484545
for pk_path, pk_value in pks.items():
485546
assert (
486547
pk_value is not None
487548
), f"Primary key subkeys {repr(pk_path)} have null values or not present in {record.stream} stream records."
488549

489550
# TODO: remove this condition after https://github.com/airbytehq/airbyte/issues/8312 is done
490-
if inputs.validate_data_points:
551+
if should_validate_data_points:
491552
self._validate_field_appears_at_least_once(records=records, configured_catalog=configured_catalog)
492553

493554
if expected_records_by_stream:
494555
self._validate_expected_records(
495556
records=records,
496557
expected_records_by_stream=expected_records_by_stream,
497-
flags=inputs.expect_records,
558+
flags=expect_records_config,
498559
detailed_logger=detailed_logger,
499560
)
500561

airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/__init__.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
#
2-
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33
#
44
from .asserts import verify_records_schema
5-
from .common import SecretDict, filter_output, full_refresh_only_catalog, incremental_only_catalog, load_config, load_yaml_or_json_path
5+
from .common import (
6+
SecretDict,
7+
build_configured_catalog_from_custom_catalog,
8+
build_configured_catalog_from_discovered_catalog_and_empty_streams,
9+
filter_output,
10+
full_refresh_only_catalog,
11+
incremental_only_catalog,
12+
load_config,
13+
load_yaml_or_json_path,
14+
)
615
from .compare import diff_dicts, make_hashable
716
from .connector_runner import ConnectorRunner
817
from .json_schema_helper import JsonSchemaHelper
@@ -19,4 +28,6 @@
1928
"diff_dicts",
2029
"make_hashable",
2130
"verify_records_schema",
31+
"build_configured_catalog_from_custom_catalog",
32+
"build_configured_catalog_from_discovered_catalog_and_empty_streams",
2233
]

airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/common.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
#
44

55
import json
6+
import logging
67
from collections import UserDict
78
from pathlib import Path
8-
from typing import Iterable, List, Union
9+
from typing import Iterable, List, MutableMapping, Set, Union
910

1011
import pytest
1112
from yaml import load
@@ -15,8 +16,15 @@
1516
except ImportError:
1617
from yaml import Loader
1718

18-
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, SyncMode
19-
from source_acceptance_test.config import Config
19+
from airbyte_cdk.models import (
20+
AirbyteMessage,
21+
AirbyteStream,
22+
ConfiguredAirbyteCatalog,
23+
ConfiguredAirbyteStream,
24+
DestinationSyncMode,
25+
SyncMode,
26+
)
27+
from source_acceptance_test.config import Config, EmptyStreamConfiguration
2028

2129

2230
def load_config(path: str) -> Config:
@@ -111,3 +119,58 @@ def find_all_values_for_key_in_schema(schema: dict, searched_key: str):
111119
yield value
112120
if isinstance(value, dict) or isinstance(value, list):
113121
yield from find_all_values_for_key_in_schema(value, searched_key)
122+
123+
124+
def build_configured_catalog_from_discovered_catalog_and_empty_streams(
125+
discovered_catalog: MutableMapping[str, AirbyteStream], empty_streams: Set[EmptyStreamConfiguration]
126+
):
127+
"""Build a configured catalog from the discovered catalog with empty streams removed.
128+
129+
Args:
130+
discovered_catalog (MutableMapping[str, AirbyteStream]): The discovered catalog.
131+
empty_streams (Set[EmptyStreamConfiguration]): The set of empty streams declared in the test configuration.
132+
133+
Returns:
134+
ConfiguredAirbyteCatalog: a configured Airbyte catalog.
135+
"""
136+
empty_stream_names = [empty_stream.name for empty_stream in empty_streams]
137+
streams = [
138+
ConfiguredAirbyteStream(
139+
stream=stream,
140+
sync_mode=stream.supported_sync_modes[0],
141+
destination_sync_mode=DestinationSyncMode.append,
142+
cursor_field=stream.default_cursor_field,
143+
primary_key=stream.source_defined_primary_key,
144+
)
145+
for _, stream in discovered_catalog.items()
146+
if stream.name not in empty_stream_names
147+
]
148+
if empty_stream_names:
149+
logging.warning(
150+
f"The configured catalog was built with the discovered catalog from which the following empty streams were removed: {', '.join(empty_stream_names)}."
151+
)
152+
else:
153+
logging.info("The configured catalog is built from a fully discovered catalog.")
154+
return ConfiguredAirbyteCatalog(streams=streams)
155+
156+
157+
def build_configured_catalog_from_custom_catalog(configured_catalog_path: str, discovered_catalog: MutableMapping[str, AirbyteStream]):
158+
"""Build a configured catalog from a local one stored in a JSON file.
159+
160+
Args:
161+
configured_catalog_path (str): Local path to a custom configured catalog path
162+
discovered_catalog (MutableMapping[str, AirbyteStream]): The discovered catalog
163+
164+
Returns:
165+
ConfiguredAirbyteCatalog: a configured Airbyte catalog
166+
"""
167+
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
168+
for configured_stream in catalog.streams:
169+
try:
170+
configured_stream.stream = discovered_catalog[configured_stream.stream.name]
171+
except KeyError:
172+
pytest.fail(
173+
f"The {configured_stream.stream.name} stream you have set in {configured_catalog_path} is not part of the discovered_catalog"
174+
)
175+
logging.info("The configured catalog is built from a custom configured catalog.")
176+
return catalog

0 commit comments

Comments
 (0)