Skip to content

Commit 7584440

Browse files
authored
CDK: private configuration option _limit and _page_size (#5617)
* CDK: private configuration option _limit and _page_size
1 parent 3aabd92 commit 7584440

File tree

9 files changed

+162
-5
lines changed

9 files changed

+162
-5
lines changed

airbyte-cdk/python/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# Changelog
22

3+
## 0.1.15
4+
Add \_limit and \_page_size as internal config parameters for SAT
5+
36
## 0.1.14
47
If the input config file does not comply with spec schema, raise an exception instead of `system.exit`.
58

airbyte-cdk/python/airbyte_cdk/entrypoint.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from airbyte_cdk.logger import AirbyteLogger
3434
from airbyte_cdk.models import AirbyteMessage, Status, Type
3535
from airbyte_cdk.sources import Source
36-
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
36+
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
3737

3838
logger = AirbyteLogger()
3939

@@ -90,7 +90,12 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
9090
else:
9191
raw_config = self.source.read_config(parsed_args.config)
9292
config = self.source.configure(raw_config, temp_dir)
93+
# Remove internal flags from config before validating so
94+
# jsonschema's additionalProperties flag wont fail the validation
95+
config, internal_config = split_config(config)
9396
check_config_against_spec_or_exit(config, source_spec, logger)
97+
# Put internal flags back to config dict
98+
config.update(internal_config.dict())
9499

95100
if cmd == "check":
96101
check_result = self.source.check(logger, config)

airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
from airbyte_cdk.models import Type as MessageType
4444
from airbyte_cdk.sources.source import Source
4545
from airbyte_cdk.sources.streams import Stream
46+
from airbyte_cdk.sources.streams.http.http import HttpStream
47+
from airbyte_cdk.sources.utils.schema_helpers import InternalConfig, split_config
4648

4749

4850
class AbstractSource(Source, ABC):
@@ -95,6 +97,7 @@ def read(
9597
"""Implements the Read operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
9698
connector_state = copy.deepcopy(state or {})
9799
logger.info(f"Starting syncing {self.name}")
100+
config, internal_config = split_config(config)
98101
# TODO assert all streams exist in the connector
99102
# get the streams once in case the connector needs to make any queries to generate them
100103
stream_instances = {s.name: s for s in self.streams(config)}
@@ -107,7 +110,11 @@ def read(
107110

108111
try:
109112
yield from self._read_stream(
110-
logger=logger, stream_instance=stream_instance, configured_stream=configured_stream, connector_state=connector_state
113+
logger=logger,
114+
stream_instance=stream_instance,
115+
configured_stream=configured_stream,
116+
connector_state=connector_state,
117+
internal_config=internal_config,
111118
)
112119
except Exception as e:
113120
logger.exception(f"Encountered an exception while reading stream {self.name}")
@@ -121,8 +128,13 @@ def _read_stream(
121128
stream_instance: Stream,
122129
configured_stream: ConfiguredAirbyteStream,
123130
connector_state: MutableMapping[str, Any],
131+
internal_config: InternalConfig,
124132
) -> Iterator[AirbyteMessage]:
125133

134+
if internal_config.page_size and isinstance(stream_instance, HttpStream):
135+
logger.info(f"Setting page size for {stream_instance.name} to {internal_config.page_size}")
136+
stream_instance.page_size = internal_config.page_size
137+
126138
use_incremental = configured_stream.sync_mode == SyncMode.incremental and stream_instance.supports_incremental
127139
if use_incremental:
128140
record_iterator = self._read_incremental(logger, stream_instance, configured_stream, connector_state)
@@ -135,6 +147,9 @@ def _read_stream(
135147
for record in record_iterator:
136148
if record.type == MessageType.RECORD:
137149
record_counter += 1
150+
if internal_config.limit and record_counter > internal_config.limit:
151+
logger.info(f"Reached limit defined by internal config ({internal_config.limit}), stop reading")
152+
break
138153
yield record
139154

140155
logger.info(f"Read {record_counter} records from {stream_name} stream")

airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class HttpStream(Stream, ABC):
4444
"""
4545

4646
source_defined_cursor = True # Most HTTP streams use a source defined cursor (i.e: the user can't configure it like on a SQL table)
47+
page_size = None # Use this variable to define page size for API http requests with pagination support
4748

4849
def __init__(self, authenticator: HttpAuthenticator = NoAuth()):
4950
self._authenticator = authenticator

airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@
2626
import json
2727
import os
2828
import pkgutil
29-
from typing import Any, Dict, Mapping
29+
from typing import Any, ClassVar, Dict, Mapping, Tuple
3030

3131
import pkg_resources
3232
from airbyte_cdk.logger import AirbyteLogger
3333
from airbyte_cdk.models import ConnectorSpecification
3434
from jsonschema import RefResolver, validate
3535
from jsonschema.exceptions import ValidationError
36+
from pydantic import BaseModel, Field
3637

3738

3839
class JsonSchemaResolver:
@@ -142,3 +143,31 @@ def check_config_against_spec_or_exit(config: Mapping[str, Any], spec: Connector
142143
validate(instance=config, schema=spec_schema)
143144
except ValidationError as validation_error:
144145
raise Exception("Config validation error: " + validation_error.message)
146+
147+
148+
class InternalConfig(BaseModel):
149+
KEYWORDS: ClassVar[set] = {"_limit", "_page_size"}
150+
limit: int = Field(None, alias="_limit")
151+
page_size: int = Field(None, alias="_page_size")
152+
153+
def dict(self):
154+
return super().dict(by_alias=True, exclude_unset=True)
155+
156+
157+
def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
158+
"""
159+
Break config map object into 2 instances: first is a dict with user defined
160+
configuration and second is internal config that contains private keys for
161+
acceptance test configuration.
162+
:param config - Dict object that has been loaded from config file.
163+
:return tuple of user defined config dict with filtered out internal
164+
parameters and SAT internal config object.
165+
"""
166+
main_config = {}
167+
internal_config = {}
168+
for k, v in config.items():
169+
if k in InternalConfig.KEYWORDS:
170+
internal_config[k] = v
171+
else:
172+
main_config[k] = v
173+
return main_config, InternalConfig.parse_obj(internal_config)

airbyte-cdk/python/docs/tutorials/cdk-tutorial-python-http/3-define-inputs.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ Given that we'll pulling currency data for our example source, we'll define the
3535
}
3636
}
3737
```
38+
Beside regular parameter there is intenal CDK config that started with '_' character and used mainly for testing purposes:
39+
40+
* _limit - set maximum number of records being read for each stream
41+
* _page_size - for http based streams set number of records for each page. Depends on stream implementation.
42+
3843

3944
In addition to metadata, we define two inputs:
4045

airbyte-cdk/python/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535

3636
setup(
3737
name="airbyte-cdk",
38-
version="0.1.14",
38+
version="0.1.15",
3939
description="A framework for writing Airbyte Connectors.",
4040
long_description=README,
4141
long_description_content_type="text/markdown",

airbyte-cdk/python/unit_tests/sources/test_source.py

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@
2626
import json
2727
import tempfile
2828
from typing import Any, Mapping, MutableMapping
29+
from unittest.mock import MagicMock
2930

3031
import pytest
3132
from airbyte_cdk.logger import AirbyteLogger
3233
from airbyte_cdk.models import ConfiguredAirbyteCatalog
33-
from airbyte_cdk.sources import Source
34+
from airbyte_cdk.sources import AbstractSource, Source
35+
from airbyte_cdk.sources.streams.core import Stream
36+
from airbyte_cdk.sources.streams.http.http import HttpStream
3437

3538

3639
class MockSource(Source):
@@ -51,6 +54,40 @@ def source():
5154
return MockSource()
5255

5356

57+
@pytest.fixture
58+
def abstract_source(mocker):
59+
mocker.patch.multiple(HttpStream, __abstractmethods__=set())
60+
mocker.patch.multiple(Stream, __abstractmethods__=set())
61+
62+
class MockHttpStream(MagicMock, HttpStream):
63+
url_base = "http://example.com"
64+
path = "/dummy/path"
65+
66+
def __init__(self, *args, **kvargs):
67+
MagicMock.__init__(self)
68+
HttpStream.__init__(self, *args, kvargs)
69+
self.read_records = MagicMock()
70+
71+
class MockStream(MagicMock, Stream):
72+
page_size = None
73+
74+
def __init__(self, *args, **kvargs):
75+
MagicMock.__init__(self)
76+
self.read_records = MagicMock()
77+
78+
streams = [MockHttpStream(), MockStream()]
79+
80+
class MockAbstractSource(AbstractSource):
81+
def check_connection(self):
82+
return True, None
83+
84+
def streams(self, config):
85+
self.streams_config = config
86+
return streams
87+
88+
return MockAbstractSource()
89+
90+
5491
def test_read_state(source):
5592
state = {"updated_at": "yesterday"}
5693

@@ -81,3 +118,60 @@ def test_read_catalog(source):
81118
catalog_file.flush()
82119
actual = source.read_catalog(catalog_file.name)
83120
assert actual == expected
121+
122+
123+
def test_internal_config(abstract_source):
124+
configured_catalog = {
125+
"streams": [
126+
{
127+
"stream": {"name": "mock_http_stream", "json_schema": {}},
128+
"destination_sync_mode": "overwrite",
129+
"sync_mode": "full_refresh",
130+
},
131+
{
132+
"stream": {"name": "mock_stream", "json_schema": {}},
133+
"destination_sync_mode": "overwrite",
134+
"sync_mode": "full_refresh",
135+
},
136+
]
137+
}
138+
catalog = ConfiguredAirbyteCatalog.parse_obj(configured_catalog)
139+
streams = abstract_source.streams(None)
140+
assert len(streams) == 2
141+
http_stream = streams[0]
142+
non_http_stream = streams[1]
143+
assert isinstance(http_stream, HttpStream)
144+
assert not isinstance(non_http_stream, HttpStream)
145+
http_stream.read_records.return_value = [{}] * 3
146+
non_http_stream.read_records.return_value = [{}] * 3
147+
148+
# Test with empty config
149+
records = [r for r in abstract_source.read(logger=MagicMock(), config={}, catalog=catalog, state={})]
150+
# 3 for http stream and 3 for non http stream
151+
assert len(records) == 3 + 3
152+
assert http_stream.read_records.called
153+
assert non_http_stream.read_records.called
154+
# Make sure page_size havent been set
155+
assert not http_stream.page_size
156+
assert not non_http_stream.page_size
157+
# Test with records limit set to 1
158+
internal_config = {"some_config": 100, "_limit": 1}
159+
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
160+
# 1 from http stream + 1 from non http stream
161+
assert len(records) == 1 + 1
162+
assert "_limit" not in abstract_source.streams_config
163+
assert "some_config" in abstract_source.streams_config
164+
# Test with records limit set to number that exceeds expceted records
165+
internal_config = {"some_config": 100, "_limit": 20}
166+
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
167+
assert len(records) == 3 + 3
168+
169+
# Check if page_size paramter is set to http instance only
170+
internal_config = {"some_config": 100, "_page_size": 2}
171+
records = [r for r in abstract_source.read(logger=MagicMock(), config=internal_config, catalog=catalog, state={})]
172+
assert "_page_size" not in abstract_source.streams_config
173+
assert "some_config" in abstract_source.streams_config
174+
assert len(records) == 3 + 3
175+
assert http_stream.page_size == 2
176+
# Make sure page_size havent been set for non http streams
177+
assert not non_http_stream.page_size

airbyte-cdk/python/unit_tests/test_entrypoint.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ def config_mock(mocker, request):
144144
({"username": "fake"}, {"type": "object", "properties": {"name": {"type": "string"}}, "additionalProperties": False}, False),
145145
({"username": "fake"}, {"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False}, True),
146146
({"username": "fake"}, {"type": "object", "properties": {"user": {"type": "string"}}}, True),
147+
(
148+
{"username": "fake", "_limit": 22},
149+
{"type": "object", "properties": {"username": {"type": "string"}}, "additionalProperties": False},
150+
True,
151+
),
147152
],
148153
indirect=["config_mock"],
149154
)

0 commit comments

Comments
 (0)