Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from copy import deepcopy
from enum import Enum
from pathlib import Path
from typing import Generic, List, Mapping, Optional, Set, TypeVar, Union
from typing import Generic, List, Mapping, Optional, Set, TypeVar

from pydantic import BaseModel, Field, root_validator, validator
from pydantic.generics import GenericModel
Expand All @@ -19,6 +19,7 @@
)
configured_catalog_path: Optional[str] = Field(default=None, description="Path to configured catalog")
timeout_seconds: int = Field(default=None, description="Test execution timeout_seconds", ge=0)
deployment_mode: Optional[str] = Field(default=None, description="Deployment mode to run the test in", regex=r"^(cloud|oss)$")

SEMVER_REGEX = r"(0|(?:[1-9]\d*))(?:\.(0|(?:[1-9]\d*))(?:\.(0|(?:[1-9]\d*)))?(?:\-([\w][\w\.\-_]*))?)?"
ALLOW_LEGACY_CONFIG = True
Expand All @@ -45,6 +46,7 @@ class SpecTestConfig(BaseConfig):
spec_path: str = spec_path
config_path: str = config_path
timeout_seconds: int = timeout_seconds
deployment_mode: Optional[str] = deployment_mode
backward_compatibility_tests_config: BackwardCompatibilityTestsConfig = Field(
description="Configuration for the backward compatibility tests.", default=BackwardCompatibilityTestsConfig()
)
Expand All @@ -59,11 +61,13 @@ class Status(Enum):
config_path: str = config_path
status: Status = Field(Status.Succeed, description="Indicate if connection check should succeed with provided config")
timeout_seconds: int = timeout_seconds
deployment_mode: Optional[str] = deployment_mode


class DiscoveryTestConfig(BaseConfig):
config_path: str = config_path
timeout_seconds: int = timeout_seconds
deployment_mode: Optional[str] = deployment_mode
backward_compatibility_tests_config: BackwardCompatibilityTestsConfig = Field(
description="Configuration for the backward compatibility tests.", default=BackwardCompatibilityTestsConfig()
)
Expand Down Expand Up @@ -122,6 +126,7 @@ class IgnoredFieldsConfiguration(BaseConfig):

class BasicReadTestConfig(BaseConfig):
config_path: str = config_path
deployment_mode: Optional[str] = deployment_mode
configured_catalog_path: Optional[str] = configured_catalog_path
empty_streams: Set[EmptyStreamConfiguration] = Field(
default_factory=set, description="We validate that all streams has records. These are exceptions"
Expand All @@ -148,6 +153,7 @@ class FullRefreshConfig(BaseConfig):
config_path: str = config_path
configured_catalog_path: Optional[str] = configured_catalog_path
timeout_seconds: int = timeout_seconds
deployment_mode: Optional[str] = deployment_mode
ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]] = ignored_fields


Expand All @@ -162,6 +168,7 @@ class IncrementalConfig(BaseConfig):
configured_catalog_path: Optional[str] = configured_catalog_path
future_state: Optional[FutureStateConfig] = Field(description="Configuration for the future state.")
timeout_seconds: int = timeout_seconds
deployment_mode: Optional[str] = deployment_mode
skip_comprehensive_incremental_tests: Optional[bool] = Field(
description="Determines whether to skip more granular testing for incremental syncs", default=False
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ def custom_environment_variables_fixture(acceptance_test_config: Config) -> Mapp
return acceptance_test_config.custom_environment_variables


@pytest.fixture(name="deployment_mode")
def deployment_mode_fixture(inputs) -> Optional[str]:
return getattr(inputs, "deployment_mode", None)


@pytest.fixture(name="connector_config_path")
def connector_config_path_fixture(inputs, base_path) -> Path:
"""Fixture with connector's config path. The path to the latest updated configurations will be returned if any."""
Expand Down Expand Up @@ -162,12 +167,15 @@ async def dagger_client(anyio_backend):


@pytest.fixture(name="docker_runner", autouse=True)
async def docker_runner_fixture(image_tag, connector_config_path, custom_environment_variables, dagger_client) -> ConnectorRunner:
async def docker_runner_fixture(
image_tag, connector_config_path, custom_environment_variables, dagger_client, deployment_mode
) -> ConnectorRunner:
runner = ConnectorRunner(
image_tag,
dagger_client,
connector_configuration_path=connector_config_path,
custom_environment_variables=custom_environment_variables,
deployment_mode=deployment_mode,
)
await runner.load_container()
return runner
Expand All @@ -180,12 +188,12 @@ def previous_connector_image_name_fixture(image_tag, inputs) -> str:


@pytest.fixture(name="previous_connector_docker_runner")
async def previous_connector_docker_runner_fixture(previous_connector_image_name, dagger_client) -> ConnectorRunner:
async def previous_connector_docker_runner_fixture(previous_connector_image_name, dagger_client, deployment_mode) -> ConnectorRunner:
"""Fixture to create a connector runner with the previous connector docker image.
Returns None if the latest image was not found, to skip downstream tests if the current connector is not yet published to the docker registry.
Raise not found error if the previous connector image is not latest and expected to be published.
"""
runner = ConnectorRunner(previous_connector_image_name, dagger_client)
runner = ConnectorRunner(previous_connector_image_name, dagger_client, deployment_mode=deployment_mode)
await runner.load_container()
return runner

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ def __init__(
dagger_client: dagger.Client,
connector_configuration_path: Optional[Path] = None,
custom_environment_variables: Optional[Mapping] = {},
deployment_mode: Optional[str] = None,
):
self._check_connector_under_test()
self.image_tag = image_tag
self.dagger_client = dagger_client
self._connector_configuration_path = connector_configuration_path
self._custom_environment_variables = custom_environment_variables
self._deployment_mode = deployment_mode
connector_image_tarball_path = self._get_connector_image_tarball_path()
self._connector_under_test_container = self._get_connector_container(connector_image_tarball_path)

Expand Down Expand Up @@ -145,6 +147,8 @@ def _get_connector_container(self, connector_image_tarball_path: Optional[Path])
container = container.with_env_variable("CACHEBUSTER", cachebuster_value)
for key, value in self._custom_environment_variables.items():
container = container.with_env_variable(key, str(value))
if self._deployment_mode:
container = container.with_env_variable("DEPLOYMENT_MODE", self._deployment_mode.upper())
Comment on lines +150 to +151
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meaning that if there is no _deployment_mode, the ENV isn't set 👍

return container

async def _run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ async def test_get_container_env_variable_value(self, dagger_client, dev_image_n
runner = connector_runner.ConnectorRunner(dev_image_name, dagger_client, custom_environment_variables={"FOO": "BAR"})
assert await runner.get_container_env_variable_value("FOO") == "BAR"

@pytest.mark.parametrize("deployment_mode", ["oss", "cloud"])
async def test_set_deployment_mode_env(self, dagger_client, dev_image_name, local_tar_image, deployment_mode):
runner = connector_runner.ConnectorRunner(dev_image_name, dagger_client, deployment_mode=deployment_mode)
assert await runner.get_container_env_variable_value("DEPLOYMENT_MODE") == deployment_mode.upper()

def test_parse_airbyte_messages_from_command_output(self, mocker, tmp_path):
old_configuration_path = tmp_path / "config.json"
new_configuration = {"field_a": "new_value_a"}
Expand Down