Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a3df273
source-sftp-bulk: not mirroring paths initial changes
aldogonzalez8 Jan 7, 2025
477f28a
source-sftp-bulk: ruff format
aldogonzalez8 Jan 7, 2025
cbec0b3
source-sftp-bulk: update release information
aldogonzalez8 Jan 7, 2025
455b633
source-sftp-bulk: test delivery option toggle
aldogonzalez8 Jan 8, 2025
8be0fa6
source-sftp-bulk: move toggle to root
aldogonzalez8 Jan 8, 2025
cb1c430
source-sftp-bulk: ruff format
aldogonzalez8 Jan 8, 2025
03e71ed
source-sftp-bulk: move preserve_subdirectories_directories to deliver…
aldogonzalez8 Jan 8, 2025
91f6b35
source-sftp-bulk: ruff format
aldogonzalez8 Jan 8, 2025
a375a93
source-sftp-bulk: bump pre-dev cdk version
aldogonzalez8 Jan 10, 2025
8707e96
source-sftp-bulk: temoprarily use diff runner
aldogonzalez8 Jan 10, 2025
5284474
source-sftp-bulk: temoprarily use diff runner 22.04
aldogonzalez8 Jan 10, 2025
3157608
source-sftp-bulk: temoprarily use diff runner 24.04
aldogonzalez8 Jan 10, 2025
770ca69
Merge branch 'master' into aldogonzalez8/source-sftp-bulk/not-mirrori…
aldogonzalez8 Jan 13, 2025
4f01f38
source-sftp: bump to latest pre-dev cdk for testing
aldogonzalez8 Jan 13, 2025
4b68397
source-sftp-bulk: bump pre-dev cdk library
aldogonzalez8 Jan 15, 2025
76c9c83
source-sftp-bulk: bump cdk and update mirroring flag as it has change…
aldogonzalez8 Jan 15, 2025
db95641
source-sftp-bulk: bump cdk and update spec
aldogonzalez8 Jan 15, 2025
18c6645
source-sftp-bulk: bump cdk to latest prod version and update docs
aldogonzalez8 Jan 15, 2025
bd0c9a0
Merge branch 'master' into aldogonzalez8/source-sftp-bulk/not-mirrori…
aldogonzalez8 Jan 15, 2025
e7fdbe0
source-sftp: enable progressive roll out configuration
aldogonzalez8 Jan 16, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"delivery_method": { "delivery_type": "use_file_transfer" },
"host": "localhost",
"port": 2222,
"username": "foo",
"credentials": {
"auth_type": "password",
"password": "pass"
},
"file_type": "json",
"start_date": "2021-01-01T00:00:00.000000Z",
"folder_path": "/files",
"streams": [
{
"name": "test_stream",
"file_type": "csv",
"globs": [
"**/monthly-kickoff-202401.mpeg",
"**/monthly-kickoff-202402.mpeg",
"**/monthly-kickoff-202403.mpeg"
],
"legacy_prefix": "",
"validation_policy": "Emit Record",
"format": {
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"double_quote": true,
"null_values": [
"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"-nan",
"1.#IND",
"1.#QNAN",
"N/A",
"NA",
"NULL",
"NaN",
"n/a",
"nan",
"null"
],
"true_values": ["1", "True", "TRUE", "true"],
"false_values": ["0", "False", "FALSE", "false"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": false,
"encoding": "utf8",
"header_definition": {
"header_definition_type": "From CSV"
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"delivery_method": {
"delivery_type": "use_file_transfer",
"preserve_directory_structure": false
},
"host": "localhost",
"port": 2222,
"username": "foo",
"credentials": {
"auth_type": "password",
"password": "pass"
},
"file_type": "json",
"start_date": "2021-01-01T00:00:00.000000Z",
"folder_path": "/files",
"streams": [
{
"name": "test_stream",
"file_type": "csv",
"globs": [
"**/monthly-kickoff-202401.mpeg",
"**/monthly-kickoff-202402.mpeg",
"**/monthly-kickoff-202403.mpeg"
],
"legacy_prefix": "",
"validation_policy": "Emit Record",
"format": {
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"double_quote": true,
"null_values": [
"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"-nan",
"1.#IND",
"1.#QNAN",
"N/A",
"NA",
"NULL",
"NaN",
"n/a",
"nan",
"null"
],
"true_values": ["1", "True", "TRUE", "true"],
"false_values": ["0", "False", "FALSE", "false"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": false,
"encoding": "utf8",
"header_definition": {
"header_definition_type": "From CSV"
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{
"delivery_method": {
"delivery_type": "use_file_transfer",
"preserve_directory_structure": false
},
"host": "localhost",
"port": 2222,
"username": "foo",
"credentials": {
"auth_type": "password",
"password": "pass"
},
"file_type": "json",
"start_date": "2021-01-01T00:00:00.000000Z",
"folder_path": "/files",
"streams": [
{
"name": "test_stream",
"file_type": "csv",
"globs": ["**/monthly-kickoff.mpeg"],
"legacy_prefix": "",
"validation_policy": "Emit Record",
"format": {
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"double_quote": true,
"null_values": [
"",
"#N/A",
"#N/A N/A",
"#NA",
"-1.#IND",
"-1.#QNAN",
"-NaN",
"-nan",
"1.#IND",
"1.#QNAN",
"N/A",
"NA",
"NULL",
"NaN",
"n/a",
"nan",
"null"
],
"true_values": ["1", "True", "TRUE", "true"],
"false_values": ["0", "False", "FALSE", "false"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": false,
"encoding": "utf8",
"header_definition": {
"header_definition_type": "From CSV"
}
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,27 @@ def config_fixture_use_all_files_transfer(docker_client) -> Mapping[str, Any]:
yield config


@pytest.fixture(name="config_fixture_not_duplicates", scope="session")
def config_fixture_not_duplicates(docker_client) -> Mapping[str, Any]:
config = load_config("config_not_duplicates.json")
config["host"] = get_docker_ip()
yield config


@pytest.fixture(name="config_fixture_not_mirroring_paths_not_duplicates", scope="session")
def config_fixture_not_mirroring_paths_not_duplicates(docker_client) -> Mapping[str, Any]:
config = load_config("config_not_preserve_subdirectories_not_duplicates.json")
config["host"] = get_docker_ip()
yield config


@pytest.fixture(name="config_fixture_not_mirroring_paths_with_duplicates", scope="session")
def config_fixture_not_mirroring_paths_with_duplicates(docker_client) -> Mapping[str, Any]:
config = load_config("config_not_preserve_subdirectories_with_duplicates.json")
config["host"] = get_docker_ip()
yield config


@pytest.fixture(name="config_private_key", scope="session")
def config_fixture_private_key(docker_client) -> Mapping[str, Any]:
config = load_config("config_private_key.json") | {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import pytest
from source_sftp_bulk import SourceSFTPBulk

from airbyte_cdk import AirbyteTracedException, ConfiguredAirbyteCatalog, Status
from airbyte_cdk import AirbyteTracedException, ConfiguredAirbyteCatalog
from airbyte_cdk.models import (
FailureType,
Status,
)
from airbyte_cdk.sources.declarative.models import FailureType
from airbyte_cdk.test.entrypoint_wrapper import read

Expand Down Expand Up @@ -133,3 +137,62 @@ def test_get_all_file_csv_file_transfer(
for file_path in files_paths:
assert os.path.exists(file_path), f"File not found at path: {file_path}"
assert total_bytes == 233_771_330


def test_default_mirroring_paths_works_for_not_present_config_file_transfer(
configured_catalog: ConfiguredAirbyteCatalog, config_fixture_not_duplicates: Mapping[str, Any]
):
"""
If delivery_options is not provided in the config we fall back preserve directories (mirroring paths).
"""
expected_directory_path = "files/not_duplicates/data/"
expected_uniqueness_count = 3
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_not_duplicates, state=None)
output = read(source=source, config=config_fixture_not_duplicates, catalog=configured_catalog)
assert len(output.records) == expected_uniqueness_count
files_paths = set(map(lambda record: record.record.file["file_url"], output.records))
files_relative_paths = set(map(lambda record: record.record.file["file_relative_path"], output.records))
assert len(files_relative_paths) == expected_uniqueness_count
assert len(files_paths) == expected_uniqueness_count
for file_path, files_relative_path in zip(files_paths, files_relative_paths):
assert expected_directory_path in file_path, f"File not found at path: {file_path}"
assert expected_directory_path in files_relative_path, f"File not found at path: {files_relative_path}"


def test_not_mirroring_paths_not_duplicates_file_transfer(
configured_catalog: ConfiguredAirbyteCatalog, config_fixture_not_mirroring_paths_not_duplicates: Mapping[str, Any]
):
"""
Delivery options is present and preserve_directory_structure is False so we should not preserve directories (mirroring paths).
"""
source_directory_path = "files/not_duplicates/data/"
expected_uniqueness_count = 3
source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_not_mirroring_paths_not_duplicates, state=None)
output = read(source=source, config=config_fixture_not_mirroring_paths_not_duplicates, catalog=configured_catalog)
assert len(output.records) == expected_uniqueness_count
files_paths = set(map(lambda record: record.record.file["file_url"], output.records))
files_relative_paths = set(map(lambda record: record.record.file["file_relative_path"], output.records))
assert len(files_relative_paths) == expected_uniqueness_count
assert len(files_paths) == expected_uniqueness_count
for file_path, files_relative_path in zip(files_paths, files_relative_paths):
assert source_directory_path not in file_path, f"Source path found but mirroring is off: {file_path}"
assert source_directory_path not in files_relative_path, f"Source path found but mirroring is off: {files_relative_path}"


def test_not_mirroring_paths_with_duplicates_file_transfer_fails_sync(
configured_catalog: ConfiguredAirbyteCatalog, config_fixture_not_mirroring_paths_with_duplicates: Mapping[str, Any]
):
"""
Delivery options is present and preserve_directory_structure is False so we should not preserve directories (mirroring paths),
but, there are duplicates so the sync fails.
"""

source = SourceSFTPBulk(catalog=configured_catalog, config=config_fixture_not_mirroring_paths_with_duplicates, state=None)
output = read(source=source, config=config_fixture_not_mirroring_paths_with_duplicates, catalog=configured_catalog)

# assert error_message in output.errors[-1].trace.error.message
assert "3 duplicates found for file name monthly-kickoff.mpeg" in output.errors[-1].trace.error.message
assert "/files/duplicates/data/feb/monthly-kickoff.mpeg" in output.errors[-1].trace.error.message
assert "/files/duplicates/data/jan/monthly-kickoff.mpeg" in output.errors[-1].trace.error.message
assert "/files/duplicates/data/mar/monthly-kickoff.mpeg" in output.errors[-1].trace.error.message
assert not output.records
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,12 @@
"const": "use_file_transfer",
"enum": ["use_file_transfer"],
"type": "string"
},
"preserve_directory_structure": {
"default": true,
"description": "If enabled, sends subdirectory folder structure along with source file names to the destination. Otherwise, files will be synced by their names only. This option is ignored when file-based replication is not enabled.",
"title": "Preserve Sub-Directories in File Paths",
"type": "boolean"
}
},
"description": "Copy raw files without parsing their contents. Bits are copied into the destination exactly as they appeared in the source. Recommended for use with unstructured text data, non-text and compressed files.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 31e3242f-dee7-4cdc-a4b8-8e06c5458517
dockerImageTag: 1.6.0
dockerImageTag: 1.7.0-rc.1
dockerRepository: airbyte/source-sftp-bulk
documentationUrl: https://docs.airbyte.com/integrations/sources/sftp-bulk
githubIssueLabel: source-sftp-bulk
Expand All @@ -25,6 +25,8 @@ data:
enabled: true
releaseStage: alpha
releases:
rolloutConfiguration:
enableProgressiveRollout: true
breakingChanges:
1.0.0:
message: "This upgrade migrates the SFTP Bulk source to the Airbyte file-based CDK. This is the first necessary step of transitioning a file connector from community to Airbyte maintained."
Expand Down
Loading
Loading