Skip to content

Conversation

@AGPapa
Copy link
Contributor

@AGPapa AGPapa commented Jul 3, 2024

What

For several columns in the subscriptions stream the type is incorrectly "date-time" when it should be "date". This causes syncs involving subscriptions to fail with this message: invalid type; expected datetime, string, bytes, int or float (type=type_error)

This has been brought up in at least two issues:

#15547
#14186

How

I switched the types to 'dates' instead of 'date-times' and it works! I also added in a missing column to the subscriptions stream - billing_period_end_date.

I tested this by running a connection from braintree's sandbox to a local CSV file. It failed before the change and works after it.

I also switched it from using the Dockerfile approach to using poetry instead (as directed by the test suite).

Review guide

User Impact

Users with subscription data will be able to sync it using airbyte. Users without subscription data should be unaffected, although there might be a datatype change in their empty subscription tables.

Can this PR be safely reverted and rolled back?

  • YES 💚
  • NO ❌
@vercel
Copy link

vercel bot commented Jul 3, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Jul 3, 2024 11:40pm
@CLAassistant
Copy link

CLAassistant commented Jul 3, 2024

CLA assistant check
All committers have signed the CLA.

Copy link
Contributor

@natikgadzhi natikgadzhi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR, and for moving to Poetry and base image too!

@ChristoGrab let's take a look at this. Let's see what CI tells us, and perform a regression test, take a look at the report. Perhaps that helps us move the source forward, it's pretty high usage.

@AGPapa, would you be willing to guide us through logs / screenshots of the resulting records to verify it works? I'm afraid we don't have a sandbox account seeded with the records to verify this easily.



class Subscription(CatalogModel):
add_ons: List[AddOn]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty wild to see Python schemas — we switched away from them quite some time ago it seems. But hey, if this works, I'm ok.

I'm worried that the side effect of this could be that we lose some data (the time portion?), but if the API changed and the responses only have dates, that's ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That first Airbyte issue was posted in June 2022, so if the API did change from a datetime to a date then it was at least two years ago.

@AGPapa
Copy link
Contributor Author

AGPapa commented Jul 3, 2024

@AGPapa, would you be willing to guide us through logs / screenshots of the resulting records to verify it works? I'm afraid we don't have a sandbox account seeded with the records to verify this easily.

Absolutely, let me know what you need.

Here are the error logs from the current version of the connector.

This is the original error log:

2024-07-02 14:09:59 source > Syncing stream: transaction_stream 2024-07-02 14:09:59 platform > Stream status TRACE received of status: STARTED for stream transaction_stream 2024-07-02 14:09:59 platform > Sending update for transaction_stream - null -> RUNNING 2024-07-02 14:09:59 platform > Stream Status Update Received: transaction_stream - RUNNING 2024-07-02 14:09:59 platform > Creating status: transaction_stream - RUNNING 2024-07-02 14:10:00 source > Encountered an exception while reading stream transaction_stream Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 122, in read yield from self._read_stream( File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 197, in _read_stream for record in record_iterator: File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 328, in _read_full_refresh for record_data_or_message in record_data_or_messages: File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 104, in read_records yield from self.retriever.read_records(stream_slice) File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 304, in read_records for stream_data in self._read_pages(self._parse_records, self.state, stream_slice): File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 283, in _read_pages yield from records_generator_fn(response, stream_state, stream_slice) File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 366, in _parse_records yield from self._parse_response(response, stream_slice=stream_slice, stream_state=stream_state) File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 227, in _parse_response records = self.record_selector.select_records( File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/extractors/record_selector.py", line 44, in select_records all_data = self.extractor.extract_records(response) File "/airbyte/integration_code/source_braintree/source.py", line 126, in extract_records return [ File "/airbyte/integration_code/source_braintree/source.py", line 127, in <listcomp> Transaction(**self._get_json_from_resource(BTransaction(None, transaction))).dict(exclude_unset=True) File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 341, in __init__ raise validation_error pydantic.error_wrappers.ValidationError: 2 validation errors for Transaction subscription_details -> billing_period_end_date invalid type; expected datetime, string, bytes, int or float (type=type_error) subscription_details -> billing_period_start_date invalid type; expected datetime, string, bytes, int or float (type=type_error) 2024-07-02 14:10:01 platform > Stream status TRACE received of status: INCOMPLETE for stream transaction_stream 2024-07-02 14:10:01 platform > Sending update for transaction_stream - RUNNING -> INCOMPLETE 2024-07-02 14:10:01 platform > Stream Status Update Received: transaction_stream - INCOMPLETE 2024-07-02 14:10:01 platform > Updating status: transaction_stream - INCOMPLETE 2024-07-02 14:10:01 platform > readFromSource: source exception io.airbyte.workers.internal.exception.SourceException: Source process exited with non-zero exit code 1 at io.airbyte.workers.general.BufferedReplicationWorker.readFromSource(BufferedReplicationWorker.java:378) ~[io.airbyte-airbyte-commons-worker-0.63.3.jar:?] at io.airbyte.workers.general.BufferedReplicationWorker.lambda$runAsyncWithHeartbeatCheck$3(BufferedReplicationWorker.java:242) ~[io.airbyte-airbyte-commons-worker-0.63.3.jar:?] at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?] at java.base/java.lang.Thread.run(Thread.java:1583) [?:?] 

and this is the error log from fixing the billing start/end dates, but not the paid through date:

2024-07-01 22:25:56 platform > Updating status: transaction_stream - INCOMPLETE 2024-07-01 22:25:56 platform > Stream status TRACE received of status: STARTED for stream subscription_stream 2024-07-01 22:25:56 platform > Sending update for subscription_stream - null -> RUNNING 2024-07-01 22:25:56 platform > Stream Status Update Received: subscription_stream - RUNNING 2024-07-01 22:25:56 platform > Creating status: subscription_stream - RUNNING 2024-07-01 22:25:56 source > Encountered an exception while reading stream subscription_stream Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 135, in read yield from self._read_stream( File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py", line 230, in _read_stream for record_data_or_message in record_iterator: File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py", line 169, in read for record_data_or_message in records: File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py", line 126, in read_records yield from self.retriever.read_records(self.get_json_schema(), stream_slice) File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 327, in read_records for stream_data in self._read_pages(record_generator, self.state, _slice): File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 292, in _read_pages yield from records_generator_fn(response) File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 397, in _parse_records yield from self._parse_response( File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py", line 238, in _parse_response for record in record_generator: File "/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/extractors/record_selector.py", line 63, in select_records all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) File "/airbyte/integration_code/source_braintree/source.py", line 145, in extract_records return [ File "/airbyte/integration_code/source_braintree/source.py", line 146, in <listcomp> Subscription(**self._get_json_from_resource(BSubscription(None, subscription))).dict(exclude_unset=True) File "/usr/local/lib/python3.9/site-packages/pydantic/main.py", line 341, in __init__ raise validation_error pydantic.error_wrappers.ValidationError: 1 validation error for Subscription paid_through_date invalid type; expected datetime, string, bytes, int or float (type=type_error) 2024-07-01 22:25:56 source > Marking stream subscription_stream as STOPPED 2024-07-01 22:25:56 platform > Stream status TRACE received of status: INCOMPLETE for stream subscription_stream 2024-07-01 22:25:56 source > Finished syncing subscription_stream 2024-07-01 22:25:56 platform > Sending update for subscription_stream - RUNNING -> INCOMPLETE 2024-07-01 22:25:56 platform > Stream Status Update Received: subscription_stream - INCOMPLETE 2024-07-01 22:25:56 platform > Updating status: subscription_stream - INCOMPLETE 

And here's a screenshot of it all working with both changes.

Screenshot 2024-07-03 at 8 11 27 AM

If you need any other screenshots or logs then let me know and I will attach them. I can make adjustments to this PR too if we need it!

@ChristoGrab
Copy link
Contributor

@AGPapa Thanks for the solid contribution, and for sharing the logs so thoroughly! There were a couple failures in our pipeline that were unrelated to your changes. I merged the latest changes from master which should resolve the issue.

@natikgadzhi All LGTM. I also had a chance to peek through the Braintree docs to verify, they're a bit sneaky as the fields themselves are listed as "Time" fields, but the actual descriptions do clearly specify the values simply as dates 🙄 Only thing I'm not sure about is how to approve the deployment to community-CI to unblock the last couple pipeline steps?

@AGPapa
Copy link
Contributor Author

AGPapa commented Jul 3, 2024

In other testing I did this morning I ran across a problem when I tried an incremental load (instead of full refresh). The problem is similar to one mentioned in this issue: #31061

The error I had was No format in ['%Y-%m-%d %H:%M:%S'] matching 2024-06-30T13:03:33".

I think that this PR fixes everything for full reloads but might still have an unrelated issue for incremental loads.

I should be able to fix that error too over the next day or so, I think I just need to tweak the datetime_format in the yml, should I include the change in this same PR here? Should we hold off on merging until we can figure that out?

@natikgadzhi
Copy link
Contributor

Okay, I see two errors in CI, let's fix them up real quick.

@AGPapa one trick you can do to have a nice report is to run airbyte-ci connectors --name source-braintree test. This will build the image and run tests similar to CI. I currently see two errors:

  • PyAirbyte installation failed with some obscure crap (will paste the log)
  • Incremental failure that you're calling out in the integration tests.

We should fix both, it's a high usage connector.

PyAirbyte failure

Processing /airbyte-integrations/connectors/source-braintree Installing build dependencies: started Installing build dependencies: finished with status 'done' Getting requirements to build wheel: started Getting requirements to build wheel: finished with status 'done' Preparing metadata (pyproject.toml): started Preparing metadata (pyproject.toml): finished with status 'done' Collecting airbyte-cdk<1,>=0 Downloading airbyte_cdk-0.90.0-py3-none-any.whl (422 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 422.4/422.4 kB 23.1 MB/s eta 0:00:00 Collecting braintree<5.0.0,>=4.28.0 Downloading braintree-4.28.0-py2.py3-none-any.whl (141 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 141.2/141.2 kB 57.0 MB/s eta 0:00:00 Collecting isodate<0.7.0,>=0.6.1 Using cached isodate-0.6.1-py2.py3-none-any.whl (41 kB) Collecting backoff Using cached backoff-2.2.1-py3-none-any.whl (15 kB) Collecting pydantic<2.0.0,>=1.10.8 Using cached pydantic-1.10.17-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.1 MB) Collecting Deprecated<1.3,>=1.2 Using cached Deprecated-1.2.14-py2.py3-none-any.whl (9.6 kB) Collecting wcmatch==8.4 Using cached wcmatch-8.4-py3-none-any.whl (40 kB) Collecting pyjwt<3.0.0,>=2.8.0 Using cached PyJWT-2.8.0-py3-none-any.whl (22 kB) Collecting python-dateutil Using cached python_dateutil-2.9.0.post0-py2.py3-none-any.whl (229 kB) Collecting genson==1.2.2 Using cached genson-1.2.2-py2.py3-none-any.whl Collecting pytz==2024.1 Using cached pytz-2024.1-py2.py3-none-any.whl (505 kB) Collecting langchain_core==0.1.42 Using cached langchain_core-0.1.42-py3-none-any.whl (287 kB) Collecting cachetools Using cached cachetools-5.3.3-py3-none-any.whl (9.3 kB) Collecting jsonref<0.3,>=0.2 Using cached jsonref-0.2-py3-none-any.whl (9.3 kB) Collecting dpath<2.1.0,>=2.0.1 Using cached dpath-2.0.8-py3-none-any.whl (15 kB) Collecting pyrate-limiter<3.2.0,>=3.1.0 Using cached pyrate_limiter-3.1.1-py3-none-any.whl (23 kB) Collecting jsonschema<3.3.0,>=3.2.0 Using cached jsonschema-3.2.0-py2.py3-none-any.whl (56 kB) Collecting cryptography<43.0.0,>=42.0.5 Using cached cryptography-42.0.8-cp39-abi3-manylinux_2_28_x86_64.whl (3.9 MB) Collecting pendulum<3.0.0 Using cached pendulum-2.1.2-cp310-cp310-manylinux_2_36_x86_64.whl Collecting airbyte-protocol-models<1.0,>=0.9.0 Downloading airbyte_protocol_models-0.12.2-py3-none-any.whl (10 kB) Collecting PyYAML<7.0.0,>=6.0.1 Using cached PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (705 kB) Collecting Jinja2<3.2.0,>=3.1.2 Using cached jinja2-3.1.4-py3-none-any.whl (133 kB) Collecting requests Using cached requests-2.32.3-py3-none-any.whl (64 kB) Collecting requests_cache Using cached requests_cache-1.2.1-py3-none-any.whl (61 kB) Collecting jsonpatch<2.0,>=1.33 Using cached jsonpatch-1.33-py2.py3-none-any.whl (12 kB) Collecting tenacity<9.0.0,>=8.1.0 Using cached tenacity-8.4.2-py3-none-any.whl (28 kB) Collecting packaging<24.0,>=23.2 Using cached packaging-23.2-py3-none-any.whl (53 kB) Collecting langsmith<0.2.0,>=0.1.0 Using cached langsmith-0.1.83-py3-none-any.whl (127 kB) Collecting bracex>=2.1.1 Using cached bracex-2.4-py3-none-any.whl (11 kB) Collecting cffi>=1.12 Using cached cffi-1.16.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (443 kB) Collecting wrapt<2,>=1.10 Using cached wrapt-1.16.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (80 kB) Collecting six Using cached six-1.16.0-py2.py3-none-any.whl (11 kB) Collecting MarkupSafe>=2.0 Using cached MarkupSafe-2.1.5-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (25 kB) Collecting attrs>=17.4.0 Using cached attrs-23.2.0-py3-none-any.whl (60 kB) Requirement already satisfied: setuptools in ./.venv-source-braintree/lib/python3.10/site-packages (from jsonschema<3.3.0,>=3.2.0->airbyte-cdk<1,>=0->source-braintree==0.3.1) (65.5.0) Collecting pyrsistent>=0.14.0 Using cached pyrsistent-0.20.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (117 kB) Collecting pytzdata>=2020.1 Using cached pytzdata-2020.1-py2.py3-none-any.whl (489 kB) Collecting typing-extensions>=4.2.0 Using cached typing_extensions-4.12.2-py3-none-any.whl (37 kB) Collecting certifi>=2017.4.17 Using cached certifi-2024.6.2-py3-none-any.whl (164 kB) Collecting urllib3<3,>=1.21.1 Using cached urllib3-2.2.2-py3-none-any.whl (121 kB) Collecting idna<4,>=2.5 Using cached idna-3.7-py3-none-any.whl (66 kB) Collecting charset-normalizer<4,>=2 Using cached charset_normalizer-3.3.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (142 kB) Collecting url-normalize>=1.4 Using cached url_normalize-1.4.3-py2.py3-none-any.whl (6.8 kB) Collecting cattrs>=22.2 Using cached cattrs-23.2.3-py3-none-any.whl (57 kB) Collecting platformdirs>=2.5 Using cached platformdirs-4.2.2-py3-none-any.whl (18 kB) Collecting exceptiongroup>=1.1.1 Using cached exceptiongroup-1.2.1-py3-none-any.whl (16 kB) Collecting pycparser Using cached pycparser-2.22-py3-none-any.whl (117 kB) Collecting jsonpointer>=1.9 Using cached jsonpointer-3.0.0-py2.py3-none-any.whl (7.6 kB) Collecting orjson<4.0.0,>=3.9.14 Downloading orjson-3.10.6-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (141 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 141.1/141.1 kB 54.6 MB/s eta 0:00:00 Building wheels for collected packages: source-braintree Building wheel for source-braintree (pyproject.toml): started Building wheel for source-braintree (pyproject.toml): finished with status 'done' Created wheel for source-braintree: filename=source_braintree-0.3.1-py3-none-any.whl size=32123 sha256=f5fd0a8ff7bdf001126f09780961c2b845e735960519bf7d44d3c0719f095e2a Stored in directory: /root/.cache/pip/wheels/68/00/af/ac1fa1074b12614fcef4e3d6822ff7a54e0cd042552edca6e1 Successfully built source-braintree Installing collected packages: pytz, jsonref, genson, wrapt, urllib3, typing-extensions, tenacity, six, PyYAML, pytzdata, pyrsistent, pyrate-limiter, pyjwt, pycparser, platformdirs, packaging, orjson, MarkupSafe, jsonpointer, idna, exceptiongroup, dpath, charset-normalizer, certifi, cachetools, bracex, backoff, attrs, wcmatch, url-normalize, requests, python-dateutil, pydantic, jsonschema, jsonpatch, Jinja2, isodate, Deprecated, cffi, cattrs, requests_cache, pendulum, langsmith, cryptography, braintree, airbyte-protocol-models, langchain_core, airbyte-cdk, source-braintree Successfully installed Deprecated-1.2.14 Jinja2-3.1.4 MarkupSafe-2.1.5 PyYAML-6.0.1 airbyte-cdk-0.90.0 airbyte-protocol-models-0.12.2 attrs-23.2.0 backoff-2.2.1 bracex-2.4 braintree-4.28.0 cachetools-5.3.3 cattrs-23.2.3 certifi-2024.6.2 cffi-1.16.0 charset-normalizer-3.3.2 cryptography-42.0.8 dpath-2.0.8 exceptiongroup-1.2.1 genson-1.2.2 idna-3.7 isodate-0.6.1 jsonpatch-1.33 jsonpointer-3.0.0 jsonref-0.2 jsonschema-3.2.0 langchain_core-0.1.42 langsmith-0.1.83 orjson-3.10.6 packaging-23.2 pendulum-2.1.2 platformdirs-4.2.2 pycparser-2.22 pydantic-1.10.17 pyjwt-2.8.0 pyrate-limiter-3.1.1 pyrsistent-0.20.0 python-dateutil-2.9.0.post0 pytz-2024.1 pytzdata-2020.1 requests-2.32.3 requests_cache-1.2.1 six-1.16.0 source-braintree-0.3.1 tenacity-8.4.2 typing-extensions-4.12.2 url-normalize-1.4.3 urllib3-2.2.2 wcmatch-8.4 wrapt-1.16.0 Thank you for using PyAirbyte! Anonymous usage reporting is currently enabled. For more information, please see https://docs.airbyte.com/telemetry Creating source and validating spec is returned successfully... Connector executable not found within the virtual environment at /airbyte-integrations/connectors/source-braintree/.venv-source-braintree/bin/sou rce-braintree. Reinstalling... Installing 'source-braintree' into virtual environment '/airbyte-integrations/connectors/source-braintree/.venv-source-braintree'. Running 'pip install airbyte-{connector_name}'... 

Acceptance test in incrementals

=================================== FAILURES =================================== _______ TestIncremental.test_state_with_abnormally_large_values[inputs0] _______ self = <connector_acceptance_test.tests.test_incremental.TestIncremental object at 0x7fb8aac92d10> inputs = IncrementalConfig(config_path='secrets/config.json', configured_catalog_path='integration_tests/configured_catalog_inc..., timeout_seconds=None, deployment_mode=None, skip_comprehensive_incremental_tests=False, client_container_config=None) connector_config = SecretDict(******) configured_catalog = ConfiguredAirbyteCatalog(streams=[ConfiguredAirbyteStream(stream=AirbyteStream(name='customer_stream', json_schema={'$...cremental'>, cursor_field=None, destination_sync_mode=<DestinationSyncMode.overwrite: 'overwrite'>, primary_key=None)]) future_state = {'customer_stream': {'created_at': '2222-01-01 00:00:00'}, 'dispute_stream': {'received_date': '2222-01-01 00:00:00'},...bscription_stream': {'created_at': '2222-01-01 00:00:00'}, 'transaction_stream': {'created_at': '2222-01-01 00:00:00'}} docker_runner = <connector_acceptance_test.utils.connector_runner.ConnectorRunner object at 0x7fb8aa99b460> async def test_state_with_abnormally_large_values( self, inputs: IncrementalConfig, connector_config, configured_catalog, future_state, docker_runner: ConnectorRunner ): configured_catalog = incremental_only_catalog(configured_catalog) output = await docker_runner.call_read_with_state(config=connector_config, catalog=configured_catalog, state=future_state) records = filter_output(output, type_=Type.RECORD) states = filter_output(output, type_=Type.STATE) assert ( not records ), f"The sync should produce no records when run with the state with abnormally large values {records[0].record.stream}" assert states, "The sync should produce at least one STATE message" if states and is_global_state(states[0]): # TODO: DB sources to fill out this case. Also, can we assume all states will be global if the first one is? pass # TODO: else: cursor_fields_per_stream = { stream.stream.name: self._get_cursor_field(stream) for stream in configured_catalog.streams if stream.sync_mode == SyncMode.incremental } actual_state_cursor_values_per_stream = { state.state.stream.stream_descriptor.name: self._get_cursor_values_from_states_by_cursor( state.state.stream.stream_state.dict(), cursor_fields_per_stream[state.state.stream.stream_descriptor.name] ) for state in states } > future_state_cursor_values_per_stream = { state["stream"]["stream_descriptor"]["name"]: self._get_cursor_values_from_states_by_cursor( state["stream"]["stream_state"], cursor_fields_per_stream[state["stream"]["stream_descriptor"]["name"]] ) for state in future_state if state["stream"]["stream_descriptor"]["name"] in cursor_fields_per_stream } /app/connector_acceptance_test/tests/test_incremental.py:313: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ .0 = <dict_keyiterator object at 0x7fb8aa822b10> future_state_cursor_values_per_stream = { state["stream"]["stream_descriptor"]["name"]: self._get_cursor_values_from_states_by_cursor( state["stream"]["stream_state"], cursor_fields_per_stream[state["stream"]["stream_descriptor"]["name"]] ) for state in future_state > if state["stream"]["stream_descriptor"]["name"] in cursor_fields_per_stream } E TypeError: string indices must be integers /app/connector_acceptance_test/tests/test_incremental.py:318: TypeError ----------------------------- Captured stderr call ----------------------------- �[35m136:�[0m mkfile /data/state.json �[35m136:�[0m > in �[34mTest Pipeline - source-braintree > Build source-braintree docker image for platform(s) linux/amd64�[0m �[35m136:�[0m mkfile /data/state.json �[32mDONE�[0m �[35m135:�[0m mkdir /data �[35m135:�[0m > in �[34mTest Pipeline - source-braintree > Build source-braintree docker image for platform(s) linux/amd64�[0m �[35m135:�[0m mkdir /data �[32mDONE�[0m �[35m134:�[0m mkfile /data/catalog.json �[35m134:�[0m > in �[34mTest Pipeline - source-braintree > Build source-braintree docker image for platform(s) linux/amd64�[0m �[35m134:�[0m mkfile /data/catalog.json �[32mDONE�[0m �[35m133:�[0m exec python /airbyte/integration_code/main.py read --config /data/config.json --catalog /data/catalog.json --state /data/state.json �[35m133:�[0m > in �[34mTest Pipeline - source-braintree > Build source-braintree docker image for platform(s) linux/amd64�[0m �[35m133:�[0m [3.34s] {"type": "LOG", "log": {"level": "INFO", "message": "Starting syncing SourceBraintree"}} �[35m133:�[0m [3.36s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream customer_stream as STARTED"}} �[35m133:�[0m [3.36s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471872.63, "stream_status": {"stream_descriptor": {"name": "customer_stream", "namespace": null}, "status": "STARTED"}}} �[35m133:�[0m [3.37s] {"type": "LOG", "log": {"level": "INFO", "message": "Setting state of SourceBraintree stream to {'created_at': '2222-01-01 00:00:00'}"}} �[35m133:�[0m [3.37s] {"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: customer_stream "}} �[35m133:�[0m [3.37s] {"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "customer_stream", "namespace": null}, "stream_state": {"created_at": "2222-01-01 00:00:00"}}, "sourceStats": {"recordCount": 0.0}}} �[35m133:�[0m [3.37s] {"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from customer_stream stream"}} �[35m133:�[0m [3.37s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream customer_stream as STOPPED"}} �[35m133:�[0m [3.38s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471883.5571, "stream_status": {"stream_descriptor": {"name": "customer_stream", "namespace": null}, "status": "COMPLETE"}}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing customer_stream"}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "SourceBraintree runtimes:\nSyncing stream customer_stream 0:00:00.011363"}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream dispute_stream as STARTED"}} �[35m133:�[0m [3.38s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471884.592, "stream_status": {"stream_descriptor": {"name": "dispute_stream", "namespace": null}, "status": "STARTED"}}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Setting state of SourceBraintree stream to {'received_date': '2222-01-01 00:00:00'}"}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: dispute_stream "}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "ERROR", "message": "Encountered an exception while reading stream dispute_stream\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 135, in read\n yield from self._read_stream(\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 230, in _read_stream\n for record_data_or_message in record_iterator:\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py\", line 154, in read\n checkpoint_reader = self._get_checkpoint_reader(\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py\", line 398, in _get_checkpoint_reader\n slices = self.stream_slices(\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py\", line 148, in stream_slices\n return self.retriever.stream_slices()\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 378, in stream_slices\n return self.stream_slicer.stream_slices()\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 168, in stream_slices\n start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 174, in _calculate_earliest_possible_value\n cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 185, in _calculate_cursor_datetime_from_state\n return self.parse_date(stream_state[self._cursor_field.eval(self.config)])\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 234, in parse_date\n raise ValueError(f\"No format in {self.cursor_datetime_formats} matching {date}\")\nValueError: No format in ['%Y-%m-%d'] matching 2222-01-01 00:00:00"}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream dispute_stream as STOPPED"}} �[35m133:�[0m [3.38s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471891.748, "stream_status": {"stream_descriptor": {"name": "dispute_stream", "namespace": null}, "status": "INCOMPLETE"}}} �[35m133:�[0m [3.38s] {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1720031471891.9321, "error": {"message": "Something went wrong in the connector. See the logs for more details.", "internal_message": "No format in ['%Y-%m-%d'] matching 2222-01-01 00:00:00", "stack_trace": "Traceback (most recent call last):\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 135, in read\n yield from self._read_stream(\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 230, in _read_stream\n for record_data_or_message in record_iterator:\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py\", line 154, in read\n checkpoint_reader = self._get_checkpoint_reader(\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/streams/core.py\", line 398, in _get_checkpoint_reader\n slices = self.stream_slices(\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/declarative_stream.py\", line 148, in stream_slices\n return self.retriever.stream_slices()\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py\", line 378, in stream_slices\n return self.stream_slicer.stream_slices()\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 168, in stream_slices\n start_datetime = self._calculate_earliest_possible_value(self._select_best_end_datetime())\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 174, in _calculate_earliest_possible_value\n cursor_datetime = self._calculate_cursor_datetime_from_state(self.get_stream_state())\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 185, in _calculate_cursor_datetime_from_state\n return self.parse_date(stream_state[self._cursor_field.eval(self.config)])\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py\", line 234, in parse_date\n raise ValueError(f\"No format in {self.cursor_datetime_formats} matching {date}\")\nValueError: No format in ['%Y-%m-%d'] matching 2222-01-01 00:00:00\n", "failure_type": "system_error", "stream_descriptor": {"name": "dispute_stream"}}}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing dispute_stream"}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "SourceBraintree runtimes:\nSyncing stream customer_stream 0:00:00.011363\nSyncing stream dispute_stream 0:00:00.008134"}} �[35m133:�[0m [3.38s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream transaction_stream as STARTED"}} �[35m133:�[0m [3.38s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471893.12, "stream_status": {"stream_descriptor": {"name": "transaction_stream", "namespace": null}, "status": "STARTED"}}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Setting state of SourceBraintree stream to {'created_at': '2222-01-01 00:00:00'}"}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: transaction_stream "}} �[35m133:�[0m [3.39s] {"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "transaction_stream", "namespace": null}, "stream_state": {"created_at": "2222-01-01 00:00:00"}}, "sourceStats": {"recordCount": 0.0}}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from transaction_stream stream"}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream transaction_stream as STOPPED"}} �[35m133:�[0m [3.39s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471900.3389, "stream_status": {"stream_descriptor": {"name": "transaction_stream", "namespace": null}, "status": "COMPLETE"}}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing transaction_stream"}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "SourceBraintree runtimes:\nSyncing stream customer_stream 0:00:00.011363\nSyncing stream dispute_stream 0:00:00.008134\nSyncing stream transaction_stream 0:00:00.007680"}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream subscription_stream as STARTED"}} �[35m133:�[0m [3.39s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471901.4539, "stream_status": {"stream_descriptor": {"name": "subscription_stream", "namespace": null}, "status": "STARTED"}}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Setting state of SourceBraintree stream to {'created_at': '2222-01-01 00:00:00'}"}} �[35m133:�[0m [3.39s] {"type": "LOG", "log": {"level": "INFO", "message": "Syncing stream: subscription_stream "}} �[35m133:�[0m [3.40s] {"type": "STATE", "state": {"type": "STREAM", "stream": {"stream_descriptor": {"name": "subscription_stream", "namespace": null}, "stream_state": {"created_at": "2222-01-01 00:00:00"}}, "sourceStats": {"recordCount": 0.0}}} �[35m133:�[0m [3.40s] {"type": "LOG", "log": {"level": "INFO", "message": "Read 0 records from subscription_stream stream"}} �[35m133:�[0m [3.40s] {"type": "LOG", "log": {"level": "INFO", "message": "Marking stream subscription_stream as STOPPED"}} �[35m133:�[0m [3.40s] {"type": "TRACE", "trace": {"type": "STREAM_STATUS", "emitted_at": 1720031471908.2551, "stream_status": {"stream_descriptor": {"name": "subscription_stream", "namespace": null}, "status": "COMPLETE"}}} �[35m133:�[0m [3.40s] {"type": "LOG", "log": {"level": "INFO", "message": "Finished syncing subscription_stream"}} �[35m133:�[0m [3.40s] {"type": "LOG", "log": {"level": "INFO", "message": "SourceBraintree runtimes:\nSyncing stream customer_stream 0:00:00.011363\nSyncing stream dispute_stream 0:00:00.008134\nSyncing stream subscription_stream 0:00:00.007327\nSyncing stream transaction_stream 0:00:00.007680"}} �[35m133:�[0m [3.40s] {"type": "LOG", "log": {"level": "INFO", "message": "During the sync, the following streams did not sync successfully: dispute_stream: AirbyteTracedException(\"No format in ['%Y-%m-%d'] matching 2222-01-01 00:00:00\")"}} �[35m133:�[0m [3.40s] {"type": "LOG", "log": {"level": "FATAL", "message": "None\nTraceback (most recent call last):\n File \"/airbyte/integration_code/main.py\", line 8, in <module>\n run()\n File \"/airbyte/integration_code/source_braintree/run.py\", line 14, in run\n launch(source, sys.argv[1:])\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 235, in launch\n for message in source_entrypoint.run(parsed_args):\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run\n yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 166, in read\n for message in self.source.read(self.logger, config, catalog, state):\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py\", line 167, in read\n yield from super().read(logger, config, catalog, state)\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 184, in read\n raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: None"}} �[35m133:�[0m [3.40s] {"type": "TRACE", "trace": {"type": "ERROR", "emitted_at": 1720031471910.394, "error": {"message": "During the sync, the following streams did not sync successfully: dispute_stream: AirbyteTracedException(\"No format in ['%Y-%m-%d'] matching 2222-01-01 00:00:00\")", "internal_message": null, "stack_trace": "Traceback (most recent call last):\n File \"/airbyte/integration_code/main.py\", line 8, in <module>\n run()\n File \"/airbyte/integration_code/source_braintree/run.py\", line 14, in run\n launch(source, sys.argv[1:])\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 235, in launch\n for message in source_entrypoint.run(parsed_args):\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 122, in run\n yield from map(AirbyteEntrypoint.airbyte_message_to_string, self.read(source_spec, config, config_catalog, state))\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/entrypoint.py\", line 166, in read\n for message in self.source.read(self.logger, config, catalog, state):\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/declarative/manifest_declarative_source.py\", line 167, in read\n yield from super().read(logger, config, catalog, state)\n File \"/usr/local/lib/python3.9/site-packages/airbyte_cdk/sources/abstract_source.py\", line 184, in read\n raise AirbyteTracedException(message=error_message, failure_type=FailureType.config_error)\nairbyte_cdk.utils.traced_exception.AirbyteTracedException: None\n", "failure_type": "config_error", "stream_descriptor": null}}} �[35m133:�[0m exec python /airbyte/integration_code/main.py read --config /data/config.json --catalog /data/catalog.json --state /data/state.json �[31mERROR: process "python /airbyte/integration_code/main.py read --config /data/config.json --catalog /data/catalog.json --state /data/state.json" did not complete successfully: exit code: 1�[0m --------------- generated report log file: /tmp/report_log.jsonl --------------- ============================= slowest 3 durations ============================== 26.90s call test_incremental.py::TestIncremental::test_read_sequential_slices[inputs0] 11.80s call test_full_refresh.py::TestFullRefresh::test_sequential_reads[inputs0] 9.32s call test_incremental.py::TestIncremental::test_two_sequential_reads[inputs0] =========================== short test summary info ============================ SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestConnectorAttributes.test_streams_define_primary_key: not found in the config. SKIPPED [1] ../app/connector_acceptance_test/plugin.py:63: Skipping TestConnectorDocumentation.test_prerequisites_content: not found in the config. SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:547: Source does not have OAuth method. SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:120: The previous and actual specifications are identical. SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:682: The previous and actual discovered catalogs are identical. SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:789: This tests currently leads to too much failures. We need to fix the connectors at scale first. SKIPPED [1] ../app/connector_acceptance_test/tests/test_core.py:1252: Skipping the test for supported file types as it is only applicable for certified file-based connectors. FAILED test_incremental.py::TestIncremental::test_state_with_abnormally_large_values[inputs0] ======== 1 failed, 40 passed, 7 skipped, 8 warnings in 68.61s (0:01:08) ======== 
@natikgadzhi
Copy link
Contributor

Oh I think I know what's up with pyairbyte. Braintree is still on Dockerfile + setup.py.

We're moving all connectors to a unified base image and poetry, and this is one of the last outliers.

@natikgadzhi
Copy link
Contributor

@AGPapa mind adding me to your fork so I can push up a few commits to this PR?

Courtesy of Claude:
Add the person as a collaborator to your fork:

  • Go to your fork's repository page on GitHub.
  • Click on "Settings" > "Collaborators".
  • Click "Add people" and enter the username of the person you want to add.
  • Select their profile and choose the appropriate permission level (usually "Write" is sufficient).
  • GitHub will send an invitation to the collaborator.
@natikgadzhi
Copy link
Contributor

Oh my lordy I see now. Braintree used Airbyte CDK 0.1. It's a bit older than I am. I guess I'll pin to the latest 0.1 available to see if it helps with tests.

@AGPapa
Copy link
Contributor Author

AGPapa commented Jul 3, 2024

I've added a commit to add cursor_datetime_formats - that resolved the error I had when running incremental syncs.

Running airbyte-ci connectors --name source-braintree test still fails the PyAirbyte validation tests and the Acceptance tests.

Hopefully pinning that version will fix the PyAirbyte validation tests - do you think it will fix the Acceptance tests as well? Or is there something else we need to update? This is the error I'm seeing

FileNotFoundError: [Errno 2] No such file or directory: '/test_input/secrets/config.json' 

I'm thinking that these tests are an older style and need to be updated? Not sure

@natikgadzhi
Copy link
Contributor

The failure that you see with '/test_input/secrets/config.json' just means that to run acceptance tests it wants to see a valid config. Config here is the secrets — a file in the format of connector spec that contains your API tokens. We're using GCP secrets manager, and CI dynamically fetches our sandbox accounts. Locally, you'd need your own file with secrets.

I think I've fixed pyairbyte and integration tests problems in #40722, let's see.

@AGPapa
Copy link
Contributor Author

AGPapa commented Jul 5, 2024

Closing this PR since #40722 was merged in it's place

@AGPapa AGPapa closed this Jul 5, 2024
@natikgadzhi
Copy link
Contributor

@AGPapa thank you for fixing up Source Braintree! <3 Great work.

Is there anything else that you're looking to improve / clean up? What's the best way for me and the team to support you in using Airbyte?

@AGPapa
Copy link
Contributor Author

AGPapa commented Jul 8, 2024

@AGPapa thank you for fixing up Source Braintree! <3 Great work.

Is there anything else that you're looking to improve / clean up? What's the best way for me and the team to support you in using Airbyte?

I see there's also this issue about the braintree connector only loading 50 records at a time since this was updated to a "low code" connector. Perhaps this is because of pagination in the API?

My sandbox environment doesn't have 50 records, so I'll need to make some more to be able to test it. If I can reproduce the issue then what's the best way to fix it? Is there another connector that I can look at to see how to handle this pagination? This data type issue was simple enough, but I'm not sure where to start with that other issue.

@natikgadzhi
Copy link
Contributor

It definitely looks like a broken paginator, but I am not certain why — haven't looked at the code. I don't think we have a sandbox account, but we can perform a regression test with one of our customers using Braintree.

If you're interested in investigating and checking the difference in how 0.1.5 approached pagination and how 0.2.0 does it and putting together a PR, I will be happy to review, @AGPapa

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/connectors Connector related issues area/documentation Improvements or additions to documentation community connectors/source/braintree

5 participants