Skip to content

Commit 1bb90da

Browse files
Source Marketo: 90% unit test coverage (#15817)
* #15516 source marketo: 90% unit test coverage * #15516 source marketo: upd changelog * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
1 parent 09cddff commit 1bb90da

File tree

9 files changed

+273
-38
lines changed

9 files changed

+273
-38
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@
583583
- name: Marketo
584584
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
585585
dockerRepository: airbyte/source-marketo
586-
dockerImageTag: 0.1.6
586+
dockerImageTag: 0.1.7
587587
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
588588
icon: marketo.svg
589589
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5182,7 +5182,7 @@
51825182
supportsNormalization: false
51835183
supportsDBT: false
51845184
supported_destination_sync_modes: []
5185-
- dockerImage: "airbyte/source-marketo:0.1.6"
5185+
- dockerImage: "airbyte/source-marketo:0.1.7"
51865186
spec:
51875187
documentationUrl: "https://docs.airbyte.io/integrations/sources/marketo"
51885188
connectionSpecification:

airbyte-integrations/connectors/source-marketo/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
3434
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
3535
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
3636

37-
LABEL io.airbyte.version=0.1.6
37+
LABEL io.airbyte.version=0.1.7
3838
LABEL io.airbyte.name=airbyte/source-marketo

airbyte-integrations/connectors/source-marketo/source_marketo/source.py

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,6 @@ def parse_response(self, response: requests.Response, stream_state: Mapping[str,
6363
for record in data:
6464
yield record
6565

66-
def normalize_datetime(self, dt: str, format="%Y-%m-%dT%H:%M:%SZ%z"):
67-
"""
68-
Convert '2018-09-07T17:37:18Z+0000' -> '2018-09-07T17:37:18Z'
69-
"""
70-
try:
71-
res = datetime.datetime.strptime(dt, format)
72-
except ValueError:
73-
self.logger.warning("date-time field in unexpected format: '%s'", dt)
74-
return dt
75-
return to_datetime_str(res)
76-
7766

7867
class IncrementalMarketoStream(MarketoStream):
7968
cursor_field = "createdAt"
@@ -92,7 +81,7 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin
9281
if record[self.cursor_field] >= (stream_state or {}).get(self.cursor_field, self.start_date):
9382
yield record
9483

95-
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
84+
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[MutableMapping]:
9685
json_response = response.json().get(self.data_field) or []
9786

9887
for record in json_response:
@@ -471,7 +460,18 @@ def request_params(
471460

472461
return params
473462

474-
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
463+
def normalize_datetime(self, dt: str, format="%Y-%m-%dT%H:%M:%SZ%z"):
464+
"""
465+
Convert '2018-09-07T17:37:18Z+0000' -> '2018-09-07T17:37:18Z'
466+
"""
467+
try:
468+
res = datetime.datetime.strptime(dt, format)
469+
except ValueError:
470+
self.logger.warning("date-time field in unexpected format: '%s'", dt)
471+
return dt
472+
return to_datetime_str(res)
473+
474+
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[MutableMapping]:
475475
for record in super().parse_response(response, stream_state, **kwargs):
476476
# delete +00:00 part from the end of createdAt and updatedAt
477477
record["updatedAt"] = self.normalize_datetime(record["updatedAt"])
@@ -544,7 +544,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
544544

545545
return True, None
546546
except requests.exceptions.RequestException as e:
547-
return False, e
547+
return False, repr(e)
548548

549549
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
550550
config["authenticator"] = MarketoAuthenticator(config)

airbyte-integrations/connectors/source-marketo/unit_tests/conftest.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ def config():
3939

4040

4141
@pytest.fixture
42-
def send_email_stream(config):
43-
activity = {
42+
def activity():
43+
return {
4444
"id": 6,
4545
"name": "send_email",
4646
"description": "Send Marketo Email to a person",
@@ -53,6 +53,10 @@ def send_email_stream(config):
5353
{"name": "Test Variant", "dataType": "integer"},
5454
],
5555
}
56+
57+
58+
@pytest.fixture
59+
def send_email_stream(config, activity):
5660
stream_name = f"activities_{activity['name']}"
5761
cls = type(stream_name, (Activities,), {"activity": activity})
5862
return cls(config)
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
#
2+
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
import logging
6+
from unittest.mock import ANY, Mock, patch
7+
8+
import pytest
9+
from airbyte_cdk.models.airbyte_protocol import SyncMode
10+
from source_marketo.source import Activities, Campaigns, MarketoStream, Programs, SourceMarketo
11+
12+
13+
def test_create_export_job(mocker, send_email_stream, caplog):
14+
mocker.patch("time.sleep")
15+
caplog.set_level(logging.WARNING)
16+
slices = list(send_email_stream.stream_slices(sync_mode=SyncMode.incremental))
17+
assert slices == [
18+
{"endAt": ANY, "id": "2c09ce6d", "startAt": ANY},
19+
{"endAt": ANY, "id": "cd465f55", "startAt": ANY},
20+
{"endAt": ANY, "id": "232aafb4", "startAt": ANY},
21+
]
22+
assert "Failed to create export job! Status is failed!" in caplog.records[-1].message
23+
24+
25+
@pytest.mark.parametrize(
26+
"activity, expected_schema",
27+
(
28+
(
29+
{
30+
"id": 1,
31+
"name": "daily_meeting",
32+
"description": "Connect to a daily meeting",
33+
"primaryAttribute": {"name": "Meeting ID", "dataType": "integer"},
34+
"attributes": [
35+
{"name": "Priority", "dataType": "number"},
36+
{"name": "Speakers", "dataType": "integer"},
37+
{"name": "Phone number", "dataType": "phone"},
38+
{"name": "Cost per person", "dataType": "currency"},
39+
{"name": "Is mandatory", "dataType": "boolean"},
40+
{"name": "Participants", "dataType": "array"},
41+
{"name": "Date", "dataType": "date"},
42+
{"name": "Time spent per person", "dataType": "double"},
43+
],
44+
},
45+
{
46+
"$schema": "http://json-schema.org/draft-07/schema#",
47+
"additionalProperties": True,
48+
"properties": {
49+
"activityDate": {"format": "date-time", "type": ["null", "string"]},
50+
"activityTypeId": {"type": ["null", "integer"]},
51+
"campaignId": {"type": ["null", "integer"]},
52+
"costperperson": {"type": ["number", "null"]},
53+
"date": {"format": "date-time", "type": ["string", "null"]},
54+
"ismandatory": {"type": ["boolean", "null"]},
55+
"leadId": {"type": ["null", "integer"]},
56+
"marketoGUID": {"type": ["null", "string"]},
57+
"participants": {"items": {"type": ["integer", "number", "string", "null"]}, "type": ["array", "null"]},
58+
"phonenumber": {"type": ["string", "null"]},
59+
"primaryAttributeValue": {"type": ["null", "string"]},
60+
"primaryAttributeValueId": {"type": ["null", "string"]},
61+
"priority": {"type": ["string", "null"]},
62+
"speakers": {"type": ["integer", "null"]},
63+
"timespentperperson": {"type": ["string", "null"]},
64+
},
65+
"type": ["null", "object"],
66+
},
67+
),
68+
(
69+
{
70+
"id": 1,
71+
"name": "daily_meeting",
72+
"description": "Connect to a daily meeting",
73+
"primaryAttribute": {"name": "Meeting ID", "dataType": "integer"},
74+
},
75+
{
76+
"$schema": "http://json-schema.org/draft-07/schema#",
77+
"additionalProperties": True,
78+
"properties": {
79+
"activityDate": {"format": "date-time", "type": ["null", "string"]},
80+
"activityTypeId": {"type": ["null", "integer"]},
81+
"campaignId": {"type": ["null", "integer"]},
82+
"leadId": {"type": ["null", "integer"]},
83+
"marketoGUID": {"type": ["null", "string"]},
84+
"primaryAttributeValue": {"type": ["null", "string"]},
85+
"primaryAttributeValueId": {"type": ["null", "string"]},
86+
},
87+
"type": ["null", "object"],
88+
},
89+
),
90+
),
91+
)
92+
def test_activities_schema(activity, expected_schema, config):
93+
cls = type(activity["name"], (Activities,), {"activity": activity})
94+
assert cls(config).get_json_schema() == expected_schema
95+
96+
97+
@pytest.mark.parametrize(
98+
"response_text, expected_records",
99+
(
100+
(
101+
"""Campaign Run ID,Choice Number,Has Predictive,Step ID,Test Variant,attributes
102+
1,3,true,10,15,{"spam": "true"}
103+
2,3,false,11,16,{"spam": "false"}""",
104+
[
105+
{
106+
"Campaign Run ID": "1",
107+
"Choice Number": "3",
108+
"Has Predictive": "true",
109+
"Step ID": "10",
110+
"Test Variant": "15",
111+
"spam": "true",
112+
},
113+
{
114+
"Campaign Run ID": "2",
115+
"Choice Number": "3",
116+
"Has Predictive": "false",
117+
"Step ID": "11",
118+
"Test Variant": "16",
119+
"spam": "false",
120+
},
121+
],
122+
),
123+
),
124+
)
125+
def test_export_parse_response(send_email_stream, response_text, expected_records):
126+
assert list(send_email_stream.parse_response(Mock(text=response_text))) == expected_records
127+
128+
129+
@pytest.mark.parametrize(
130+
"job_statuses",
131+
(
132+
(("Created",), ("Completed",)),
133+
(
134+
("Created",),
135+
("Cancelled",),
136+
),
137+
),
138+
)
139+
def test_export_sleep(send_email_stream, job_statuses):
140+
def tuple_to_generator(tuple_):
141+
yield from tuple_
142+
143+
job_statuses_side_effect = [tuple_to_generator(tuple_) for tuple_ in job_statuses]
144+
stream_slice = {"startAt": "2020-08-01", "endAt": "2020-08-02", "id": "1"}
145+
with patch("source_marketo.source.MarketoExportStart.read_records", return_value=iter([Mock()])) as export_start:
146+
with patch("source_marketo.source.MarketoExportStatus.read_records", side_effect=job_statuses_side_effect) as export_status:
147+
with patch("source_marketo.source.sleep") as sleep:
148+
if job_statuses[-1] == ("Cancelled",):
149+
with pytest.raises(Exception):
150+
send_email_stream.sleep_till_export_completed(stream_slice)
151+
else:
152+
assert send_email_stream.sleep_till_export_completed(stream_slice) is True
153+
export_start.assert_called()
154+
export_status.assert_called()
155+
sleep.assert_called()
156+
157+
158+
def test_programs_request_params(config):
159+
stream = Programs(config)
160+
params = stream.request_params(
161+
stream_slice={"startAt": "2020-08-01", "endAt": "2020-08-02"}, next_page_token={"nextPageToken": 2}, stream_state={}
162+
)
163+
assert params == {
164+
"batchSize": 200,
165+
"maxReturn": 200,
166+
"earliestUpdatedAt": "2020-08-01",
167+
"latestUpdatedAt": "2020-08-02",
168+
"nextPageToken": 2,
169+
}
170+
171+
172+
@pytest.mark.parametrize(
173+
"next_page_token",
174+
(
175+
{"nextPageToken": 2},
176+
{},
177+
),
178+
)
179+
def test_next_page_token(mocker, config, next_page_token):
180+
stream = MarketoStream(config)
181+
token = stream.next_page_token(Mock(json=Mock(return_value=next_page_token)))
182+
assert token == (next_page_token or None)
183+
184+
185+
@pytest.mark.parametrize(
186+
"response, state, expected_records",
187+
(
188+
(
189+
{"result": [{"id": "1", "createdAt": "2020-07-01T00:00:00Z"}, {"id": "2", "createdAt": "2020-08-02T00:00:00Z"}]},
190+
{"createdAt": "2020-08-01T20:20:00Z"},
191+
[{"id": "2", "createdAt": "2020-08-02T00:00:00Z"}],
192+
),
193+
),
194+
)
195+
def test_parse_response_incremental(config, response, state, expected_records):
196+
stream = Campaigns(config)
197+
records = stream.parse_response(Mock(json=Mock(return_value=response)), stream_state=state)
198+
assert list(records) == expected_records
199+
200+
201+
def test_source_streams(config, activity):
202+
source = SourceMarketo()
203+
with patch("source_marketo.source.ActivityTypes.read_records", Mock(return_value=[activity])):
204+
streams = source.streams(config)
205+
assert len(streams) == 6
206+
assert all(isinstance(stream, MarketoStream) for stream in streams)
207+
208+
209+
@pytest.mark.parametrize(
210+
"status_code, response, is_connection_successful, error_msg",
211+
(
212+
(200, "", True, None),
213+
(
214+
400,
215+
"Bad request",
216+
False,
217+
"HTTPError('400 Client Error: None for url: https://602-euo-598.mktorest.com/rest/v1/leads/describe')",
218+
),
219+
(
220+
403,
221+
"Forbidden",
222+
False,
223+
"HTTPError('403 Client Error: None for url: https://602-euo-598.mktorest.com/rest/v1/leads/describe')",
224+
),
225+
),
226+
)
227+
def test_check_connection(config, requests_mock, status_code, response, is_connection_successful, error_msg):
228+
requests_mock.register_uri("GET", "https://602-euo-598.mktorest.com/rest/v1/leads/describe", status_code=status_code)
229+
source = SourceMarketo()
230+
success, error = source.check_connection(logger=None, config=config)
231+
assert success is is_connection_successful
232+
assert error == error_msg
233+
234+
235+
@pytest.mark.parametrize(
236+
"input, format, expected_result",
237+
(
238+
("2020-08-01T20:20:21Z", "%Y-%m-%dT%H:%M:%SZ%z", "2020-08-01T20:20:21Z"),
239+
("2020-08-01 20:20", "%Y-%m-%d %H:%M", "2020-08-01T20:20:00Z"),
240+
("2020-08-01", "%Y-%m-%dT%H:%M:%SZ%z", "2020-08-01"),
241+
),
242+
)
243+
def test_normalize_datetime(config, input, format, expected_result):
244+
stream = Programs(config)
245+
assert stream.normalize_datetime(input, format) == expected_result

airbyte-integrations/connectors/source-marketo/unit_tests/test_stream_slices.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

airbyte-integrations/connectors/source-marketo/unit_tests/unit_test.py renamed to airbyte-integrations/connectors/source-marketo/unit_tests/test_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
("string", {"type": "string"}, str),
1212
(True, {"type": ["boolean", "null"]}, bool),
1313
(1, {"type": ["number", "null"]}, float),
14+
("", {"type": ["number", "null"]}, type(None)),
15+
("1.5", {"type": "integer"}, int),
16+
("15", {"type": "integer"}, int),
17+
("true", {"type": "boolean"}, bool),
1418
]
1519

1620

docs/integrations/sources/marketo.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ If the 50,000 limit is too stringent, contact Marketo support for a quota increa
102102

103103
| Version | Date | Pull Request | Subject |
104104
|:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------|
105+
| `0.1.7` | 2022-08-23 | [15817](https://github.com/airbytehq/airbyte/pull/15817) | Improved unit test coverage |
105106
| `0.1.6` | 2022-08-21 | [15824](https://github.com/airbytehq/airbyte/pull/15824) | Fix semi incremental streams: do not ignore start date, make one api call instead of multiple |
106107
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
107108
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |

0 commit comments

Comments
 (0)