Skip to content

Commit 3ef6fcf

Browse files
✨feat(source-sftp-bulk): adjust file record message protocol (#57514)
1 parent f8a73a1 commit 3ef6fcf

File tree

6 files changed

+212
-57
lines changed

6 files changed

+212
-57
lines changed

airbyte-integrations/connectors/source-sftp-bulk/integration_tests/integration_test.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from airbyte_cdk import AirbyteTracedException, ConfiguredAirbyteCatalog
1616
from airbyte_cdk.models import (
17+
AirbyteRecordMessageFileReference,
1718
FailureType,
1819
Status,
1920
)
@@ -101,27 +102,37 @@ def test_get_files_empty_files(configured_catalog: ConfiguredAirbyteCatalog, con
101102

102103

103104
@pytest.mark.slow
104-
@pytest.mark.limit_memory("10 MB")
105+
@pytest.mark.limit_memory("11 MB")
105106
def test_get_file_csv_file_transfer(configured_catalog: ConfiguredAirbyteCatalog, config_fixture_use_file_transfer: Mapping[str, Any]):
106107
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_use_file_transfer, state=None)
107108
output = read(source=source, config=config_fixture_use_file_transfer, catalog=configured_catalog)
108-
expected_file_data = {
109+
110+
file_folder = "files/file_transfer"
111+
file_name = "file_transfer_1.csv"
112+
source_file_relative_path = f"{file_folder}/{file_name}"
113+
expected_file_data = AirbyteRecordMessageFileReference(
114+
file_size_bytes=46_754_266,
115+
source_file_relative_path=source_file_relative_path,
116+
staging_file_url=f"/tmp/airbyte-file-transfer/{source_file_relative_path}",
117+
)
118+
expected_record_data = {
109119
"bytes": 46_754_266,
110-
"file_relative_path": "files/file_transfer/file_transfer_1.csv",
111-
"file_url": "/tmp/airbyte-file-transfer/files/file_transfer/file_transfer_1.csv",
112-
"modified": ANY,
113-
"source_file_url": "/files/file_transfer/file_transfer_1.csv",
120+
"file_name": file_name,
121+
"folder": f"/{file_folder}",
122+
"source_uri": f"sftp://{config_fixture_use_file_transfer['username']}@{config_fixture_use_file_transfer['host']}:{config_fixture_use_file_transfer['port']}/{source_file_relative_path}",
123+
"updated_at": ANY,
114124
}
115125
assert len(output.records) == 1
116-
assert list(map(lambda record: record.record.file, output.records)) == [expected_file_data]
126+
assert list(map(lambda record: record.record.file_reference, output.records)) == [expected_file_data]
127+
assert list(map(lambda record: record.record.data, output.records)) == [expected_record_data]
117128

118129
# Additional assertion to check if the file exists at the file_url path
119-
file_path = expected_file_data["file_url"]
130+
file_path = expected_file_data.staging_file_url
120131
assert os.path.exists(file_path), f"File not found at path: {file_path}"
121132

122133

123134
@pytest.mark.slow
124-
@pytest.mark.limit_memory("10 MB")
135+
@pytest.mark.limit_memory("11 MB")
125136
def test_get_all_file_csv_file_transfer(
126137
configured_catalog: ConfiguredAirbyteCatalog, config_fixture_use_all_files_transfer: Mapping[str, Any]
127138
):
@@ -132,8 +143,8 @@ def test_get_all_file_csv_file_transfer(
132143
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_use_all_files_transfer, state=None)
133144
output = read(source=source, config=config_fixture_use_all_files_transfer, catalog=configured_catalog)
134145
assert len(output.records) == 5
135-
total_bytes = sum(list(map(lambda record: record.record.file["bytes"], output.records)))
136-
files_paths = list(map(lambda record: record.record.file["file_url"], output.records))
146+
total_bytes = sum(list(map(lambda record: record.record.file_reference.file_size_bytes, output.records)))
147+
files_paths = list(map(lambda record: record.record.file_reference.staging_file_url, output.records))
137148
for file_path in files_paths:
138149
assert os.path.exists(file_path), f"File not found at path: {file_path}"
139150
assert total_bytes == 233_771_330
@@ -150,8 +161,8 @@ def test_default_mirroring_paths_works_for_not_present_config_file_transfer(
150161
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_not_duplicates, state=None)
151162
output = read(source=source, config=config_fixture_not_duplicates, catalog=configured_catalog)
152163
assert len(output.records) == expected_uniqueness_count
153-
files_paths = set(map(lambda record: record.record.file["file_url"], output.records))
154-
files_relative_paths = set(map(lambda record: record.record.file["file_relative_path"], output.records))
164+
files_paths = set(map(lambda record: record.record.file_reference.staging_file_url, output.records))
165+
files_relative_paths = set(map(lambda record: record.record.file_reference.source_file_relative_path, output.records))
155166
assert len(files_relative_paths) == expected_uniqueness_count
156167
assert len(files_paths) == expected_uniqueness_count
157168
for file_path, files_relative_path in zip(files_paths, files_relative_paths):
@@ -170,8 +181,8 @@ def test_not_mirroring_paths_not_duplicates_file_transfer(
170181
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_not_mirroring_paths_not_duplicates, state=None)
171182
output = read(source=source, config=config_fixture_not_mirroring_paths_not_duplicates, catalog=configured_catalog)
172183
assert len(output.records) == expected_uniqueness_count
173-
files_paths = set(map(lambda record: record.record.file["file_url"], output.records))
174-
files_relative_paths = set(map(lambda record: record.record.file["file_relative_path"], output.records))
184+
files_paths = set(map(lambda record: record.record.file_reference.staging_file_url, output.records))
185+
files_relative_paths = set(map(lambda record: record.record.file_reference.source_file_relative_path, output.records))
175186
assert len(files_relative_paths) == expected_uniqueness_count
176187
assert len(files_paths) == expected_uniqueness_count
177188
for file_path, files_relative_path in zip(files_paths, files_relative_paths):

airbyte-integrations/connectors/source-sftp-bulk/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ data:
77
connectorSubtype: file
88
connectorType: source
99
definitionId: 31e3242f-dee7-4cdc-a4b8-8e06c5458517
10-
dockerImageTag: 1.7.8
10+
dockerImageTag: 1.8.0
1111
dockerRepository: airbyte/source-sftp-bulk
1212
documentationUrl: https://docs.airbyte.com/integrations/sources/sftp-bulk
1313
githubIssueLabel: source-sftp-bulk

0 commit comments

Comments
 (0)