Skip to content

Commit aab0377

Browse files
authored
🐛 Source SalesForce: better detect API type (#16086)
Signed-off-by: Sergey Chvalyuk <grubberr@gmail.com>
1 parent 20a5b98 commit aab0377

File tree

6 files changed

+34
-14
lines changed

6 files changed

+34
-14
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,7 @@
884884
- name: Salesforce
885885
sourceDefinitionId: b117307c-14b6-41aa-9422-947e34922962
886886
dockerRepository: airbyte/source-salesforce
887-
dockerImageTag: 1.0.14
887+
dockerImageTag: 1.0.15
888888
documentationUrl: https://docs.airbyte.io/integrations/sources/salesforce
889889
icon: salesforce.svg
890890
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8654,7 +8654,7 @@
86548654
supportsNormalization: false
86558655
supportsDBT: false
86568656
supported_destination_sync_modes: []
8657-
- dockerImage: "airbyte/source-salesforce:1.0.14"
8657+
- dockerImage: "airbyte/source-salesforce:1.0.15"
86588658
spec:
86598659
documentationUrl: "https://docs.airbyte.com/integrations/sources/salesforce"
86608660
connectionSpecification:

airbyte-integrations/connectors/source-salesforce/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ RUN pip install .
1313

1414
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1515

16-
LABEL io.airbyte.version=1.0.14
16+
LABEL io.airbyte.version=1.0.15
1717
LABEL io.airbyte.name=airbyte/source-salesforce

airbyte-integrations/connectors/source-salesforce/source_salesforce/api.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
#
44

55
import concurrent.futures
6+
import logging
67
from typing import Any, List, Mapping, Optional, Tuple
78

89
import requests # type: ignore[import]
9-
from airbyte_cdk import AirbyteLogger
1010
from airbyte_cdk.models import ConfiguredAirbyteCatalog
1111
from requests import adapters as request_adapters
1212
from requests.exceptions import HTTPError, RequestException # type: ignore[import]
@@ -178,9 +178,12 @@
178178

179179

180180
class Salesforce:
181-
logger = AirbyteLogger()
181+
logger = logging.getLogger("airbyte")
182182
version = "v52.0"
183183
parallel_tasks_size = 100
184+
# https://developer.salesforce.com/docs/atlas.en-us.salesforce_app_limits_cheatsheet.meta/salesforce_app_limits_cheatsheet/salesforce_app_limits_platform_api.htm
185+
# Request Size Limits
186+
REQUEST_SIZE_LIMITS = 16_384
184187

185188
def __init__(
186189
self,

airbyte-integrations/connectors/source-salesforce/source_salesforce/source.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,26 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
3636

3737
return True, None
3838

39+
@classmethod
40+
def _get_api_type(cls, stream_name, properties, stream_state):
41+
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
42+
properties_not_supported_by_bulk = {
43+
key: value for key, value in properties.items() if value.get("format") == "base64" or "object" in value["type"]
44+
}
45+
properties_length = len(",".join(p for p in properties))
46+
47+
rest_required = stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk
48+
# If we have a lot of properties we can overcome REST API URL length and get an error: "reason: URI Too Long".
49+
# For such cases connector tries to use BULK API because it uses POST request and passes properties in the request body.
50+
bulk_required = properties_length + 2000 > Salesforce.REQUEST_SIZE_LIMITS
51+
52+
if bulk_required and not rest_required:
53+
return "bulk"
54+
elif rest_required and not bulk_required:
55+
return "rest"
56+
elif not bulk_required and not rest_required:
57+
return "rest" if stream_state else "bulk"
58+
3959
@classmethod
4060
def generate_streams(
4161
cls,
@@ -51,19 +71,15 @@ def generate_streams(
5171
for stream_name, sobject_options in stream_objects.items():
5272
streams_kwargs = {"sobject_options": sobject_options}
5373
stream_state = state.get(stream_name, {}) if state else {}
54-
5574
selected_properties = stream_properties.get(stream_name, {}).get("properties", {})
56-
# Salesforce BULK API currently does not support loading fields with data type base64 and compound data
57-
properties_not_supported_by_bulk = {
58-
key: value for key, value in selected_properties.items() if value.get("format") == "base64" or "object" in value["type"]
59-
}
6075

61-
if stream_state or stream_name in UNSUPPORTED_BULK_API_SALESFORCE_OBJECTS or properties_not_supported_by_bulk:
62-
# Use REST API
76+
api_type = cls._get_api_type(stream_name, selected_properties, stream_state)
77+
if api_type == "rest":
6378
full_refresh, incremental = SalesforceStream, IncrementalSalesforceStream
64-
else:
65-
# Use BULK API
79+
elif api_type == "bulk":
6680
full_refresh, incremental = BulkSalesforceStream, BulkIncrementalSalesforceStream
81+
else:
82+
raise Exception(f"Stream {stream_name} cannot be processed by REST or BULK API.")
6783

6884
json_schema = stream_properties.get(stream_name, {})
6985
pk, replication_key = sf_object.get_pk_and_replication_key(json_schema)

docs/integrations/sources/salesforce.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ Now that you have set up the Salesforce source connector, check out the followin
119119

120120
| Version | Date | Pull Request | Subject |
121121
|:--------|:-----------|:-------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------|
122+
| 1.0.15 | 2022-08-30 | [16086](https://github.com/airbytehq/airbyte/pull/16086) | Improve API type detection |
122123
| 1.0.14 | 2022-08-29 | [16119](https://github.com/airbytehq/airbyte/pull/16119) | Exclude `KnowledgeArticleVersion` from using bulk API |
123124
| 1.0.13 | 2022-08-23 | [15901](https://github.com/airbytehq/airbyte/pull/15901) | Exclude `KnowledgeArticle` from using bulk API |
124125
| 1.0.12 | 2022-08-09 | [15444](https://github.com/airbytehq/airbyte/pull/15444) | Fixed bug when `Bulk Job` was timeout by the connector, but remained running on the server |

0 commit comments

Comments
 (0)