Skip to content

Commit d4ef599

Browse files
authored
✨ source-file: prevent local file usage on cloud deployment mode (#30984)
Co-authored-by: pedroslopez <pedroslopez@users.noreply.github.com>
1 parent 04c4fea commit d4ef599

File tree

9 files changed

+330
-67
lines changed

9 files changed

+330
-67
lines changed

airbyte-integrations/connectors/source-file/acceptance-test-config.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,17 @@ acceptance_tests:
66
spec:
77
tests:
88
- spec_path: "source_file/spec.json"
9+
- spec_path: "integration_tests/cloud_spec.json"
10+
deployment_mode: "cloud"
911
connection:
1012
tests:
1113
- config_path: "integration_tests/config.json"
1214
status: "succeed"
1315
- config_path: "integration_tests/invalid_config.json"
1416
status: "failed"
17+
- config_path: "integration_tests/local_config.json"
18+
deployment_mode: "cloud"
19+
status: "failed"
1520
discovery:
1621
tests:
1722
- config_path: "integration_tests/config.json"
Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,216 @@
1+
{
2+
"documentationUrl": "https://docs.airbyte.com/integrations/sources/file",
3+
"connectionSpecification": {
4+
"$schema": "http://json-schema.org/draft-07/schema#",
5+
"title": "File Source Spec",
6+
"type": "object",
7+
"additionalProperties": true,
8+
"required": ["dataset_name", "format", "url", "provider"],
9+
"properties": {
10+
"dataset_name": {
11+
"type": "string",
12+
"title": "Dataset Name",
13+
"description": "The Name of the final table to replicate this file into (should include letters, numbers dash and underscores only)."
14+
},
15+
"format": {
16+
"type": "string",
17+
"enum": [
18+
"csv",
19+
"json",
20+
"jsonl",
21+
"excel",
22+
"excel_binary",
23+
"feather",
24+
"parquet",
25+
"yaml"
26+
],
27+
"default": "csv",
28+
"title": "File Format",
29+
"description": "The Format of the file which should be replicated (Warning: some formats may be experimental, please refer to the docs)."
30+
},
31+
"reader_options": {
32+
"type": "string",
33+
"title": "Reader Options",
34+
"description": "This should be a string in JSON format. It depends on the chosen file format to provide additional options and tune its behavior.",
35+
"examples": [
36+
"{}",
37+
"{\"sep\": \" \"}",
38+
"{\"sep\": \"\t\", \"header\": 0, \"names\": [\"column1\", \"column2\"] }"
39+
]
40+
},
41+
"url": {
42+
"type": "string",
43+
"title": "URL",
44+
"description": "The URL path to access the file which should be replicated.",
45+
"examples": [
46+
"https://storage.googleapis.com/covid19-open-data/v2/latest/epidemiology.csv",
47+
"gs://my-google-bucket/data.csv",
48+
"s3://gdelt-open-data/events/20190914.export.csv"
49+
]
50+
},
51+
"provider": {
52+
"type": "object",
53+
"title": "Storage Provider",
54+
"description": "The storage Provider or Location of the file(s) which should be replicated.",
55+
"default": "Public Web",
56+
"oneOf": [
57+
{
58+
"title": "HTTPS: Public Web",
59+
"required": ["storage"],
60+
"properties": {
61+
"storage": { "type": "string", "const": "HTTPS" },
62+
"user_agent": {
63+
"type": "boolean",
64+
"title": "User-Agent",
65+
"default": false,
66+
"description": "Add User-Agent to request"
67+
}
68+
}
69+
},
70+
{
71+
"title": "GCS: Google Cloud Storage",
72+
"required": ["storage"],
73+
"properties": {
74+
"storage": {
75+
"type": "string",
76+
"title": "Storage",
77+
"const": "GCS"
78+
},
79+
"service_account_json": {
80+
"type": "string",
81+
"title": "Service Account JSON",
82+
"airbyte_secret": true,
83+
"description": "In order to access private Buckets stored on Google Cloud, this connector would need a service account json credentials with the proper permissions as described <a href=\"https://cloud.google.com/iam/docs/service-accounts\" target=\"_blank\">here</a>. Please generate the credentials.json file and copy/paste its content to this field (expecting JSON formats). If accessing publicly available data, this field is not necessary."
84+
}
85+
}
86+
},
87+
{
88+
"title": "S3: Amazon Web Services",
89+
"required": ["storage"],
90+
"properties": {
91+
"storage": {
92+
"type": "string",
93+
"title": "Storage",
94+
"const": "S3"
95+
},
96+
"aws_access_key_id": {
97+
"type": "string",
98+
"title": "AWS Access Key ID",
99+
"description": "In order to access private Buckets stored on AWS S3, this connector would need credentials with the proper permissions. If accessing publicly available data, this field is not necessary."
100+
},
101+
"aws_secret_access_key": {
102+
"type": "string",
103+
"title": "AWS Secret Access Key",
104+
"description": "In order to access private Buckets stored on AWS S3, this connector would need credentials with the proper permissions. If accessing publicly available data, this field is not necessary.",
105+
"airbyte_secret": true
106+
}
107+
}
108+
},
109+
{
110+
"title": "AzBlob: Azure Blob Storage",
111+
"required": ["storage", "storage_account"],
112+
"properties": {
113+
"storage": {
114+
"type": "string",
115+
"title": "Storage",
116+
"const": "AzBlob"
117+
},
118+
"storage_account": {
119+
"type": "string",
120+
"title": "Storage Account",
121+
"description": "The globally unique name of the storage account that the desired blob sits within. See <a href=\"https://docs.microsoft.com/en-us/azure/storage/common/storage-account-overview\" target=\"_blank\">here</a> for more details."
122+
},
123+
"sas_token": {
124+
"type": "string",
125+
"title": "SAS Token",
126+
"description": "To access Azure Blob Storage, this connector would need credentials with the proper permissions. One option is a SAS (Shared Access Signature) token. If accessing publicly available data, this field is not necessary.",
127+
"airbyte_secret": true
128+
},
129+
"shared_key": {
130+
"type": "string",
131+
"title": "Shared Key",
132+
"description": "To access Azure Blob Storage, this connector would need credentials with the proper permissions. One option is a storage account shared key (aka account key or access key). If accessing publicly available data, this field is not necessary.",
133+
"airbyte_secret": true
134+
}
135+
}
136+
},
137+
{
138+
"title": "SSH: Secure Shell",
139+
"required": ["storage", "user", "host"],
140+
"properties": {
141+
"storage": {
142+
"type": "string",
143+
"title": "Storage",
144+
"const": "SSH"
145+
},
146+
"user": { "type": "string", "title": "User", "description": "" },
147+
"password": {
148+
"type": "string",
149+
"title": "Password",
150+
"description": "",
151+
"airbyte_secret": true
152+
},
153+
"host": { "type": "string", "title": "Host", "description": "" },
154+
"port": {
155+
"type": "string",
156+
"title": "Port",
157+
"default": "22",
158+
"description": ""
159+
}
160+
}
161+
},
162+
{
163+
"title": "SCP: Secure copy protocol",
164+
"required": ["storage", "user", "host"],
165+
"properties": {
166+
"storage": {
167+
"type": "string",
168+
"title": "Storage",
169+
"const": "SCP"
170+
},
171+
"user": { "type": "string", "title": "User", "description": "" },
172+
"password": {
173+
"type": "string",
174+
"title": "Password",
175+
"description": "",
176+
"airbyte_secret": true
177+
},
178+
"host": { "type": "string", "title": "Host", "description": "" },
179+
"port": {
180+
"type": "string",
181+
"title": "Port",
182+
"default": "22",
183+
"description": ""
184+
}
185+
}
186+
},
187+
{
188+
"title": "SFTP: Secure File Transfer Protocol",
189+
"required": ["storage", "user", "host"],
190+
"properties": {
191+
"storage": {
192+
"type": "string",
193+
"title": "Storage",
194+
"const": "SFTP"
195+
},
196+
"user": { "type": "string", "title": "User", "description": "" },
197+
"password": {
198+
"type": "string",
199+
"title": "Password",
200+
"description": "",
201+
"airbyte_secret": true
202+
},
203+
"host": { "type": "string", "title": "Host", "description": "" },
204+
"port": {
205+
"type": "string",
206+
"title": "Port",
207+
"default": "22",
208+
"description": ""
209+
}
210+
}
211+
}
212+
]
213+
}
214+
}
215+
}
216+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{
2+
"dataset_name": "test",
3+
"format": "csv",
4+
"reader_options": "{\"bla\": \",\", \"nrows\": 20}",
5+
"url": "file:///tmp/fake_file.csv",
6+
"provider": {
7+
"storage": "local"
8+
}
9+
}

airbyte-integrations/connectors/source-file/metadata.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,14 @@ data:
77
connectorSubtype: file
88
connectorType: source
99
definitionId: 778daa7c-feaf-4db6-96f3-70fd645acc77
10-
dockerImageTag: 0.3.13
10+
dockerImageTag: 0.3.14
1111
dockerRepository: airbyte/source-file
1212
githubIssueLabel: source-file
1313
icon: file.svg
1414
license: MIT
1515
name: File (CSV, JSON, Excel, Feather, Parquet)
1616
registries:
1717
cloud:
18-
dockerRepository: airbyte/source-file-secure
19-
dockerImageTag: 0.3.13 # Dont forget to publish source-file-secure as well when updating this.
2018
enabled: true
2119
oss:
2220
enabled: true

airbyte-integrations/connectors/source-file/setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from setuptools import find_packages, setup
77

88
MAIN_REQUIREMENTS = [
9-
"airbyte-cdk~=0.2",
9+
"airbyte-cdk~=0.51.25",
1010
"gcsfs==2022.7.1",
1111
"genson==1.2.2",
1212
"google-cloud-storage==2.5.0",

airbyte-integrations/connectors/source-file/source_file/client.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import smart_open.ssh
2525
from airbyte_cdk.entrypoint import logger
2626
from airbyte_cdk.models import AirbyteStream, FailureType, SyncMode
27-
from airbyte_cdk.utils import AirbyteTracedException
27+
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
2828
from azure.storage.blob import BlobServiceClient
2929
from genson import SchemaBuilder
3030
from google.cloud.storage import Client as GCSClient
@@ -36,7 +36,7 @@
3636
from urllib3.exceptions import ProtocolError
3737
from yaml import safe_load
3838

39-
from .utils import backoff_handler
39+
from .utils import LOCAL_STORAGE_NAME, backoff_handler
4040

4141
SSH_TIMEOUT = 60
4242

@@ -252,7 +252,6 @@ class Client:
252252
"""Class that manages reading and parsing data from streams"""
253253

254254
CSV_CHUNK_SIZE = 10_000
255-
reader_class = URLFile
256255
binary_formats = {"excel", "excel_binary", "feather", "parquet", "orc", "pickle"}
257256

258257
def __init__(self, dataset_name: str, url: str, provider: dict, format: str = None, reader_options: dict = None):
@@ -264,6 +263,13 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No
264263
self.binary_source = self._reader_format in self.binary_formats
265264
self.encoding = self._reader_options.get("encoding")
266265

266+
@property
267+
def reader_class(self):
268+
if is_cloud_environment():
269+
return URLFileSecure
270+
271+
return URLFile
272+
267273
@property
268274
def stream_name(self) -> str:
269275
if self._dataset_name:
@@ -497,3 +503,15 @@ def openpyxl_chunk_reader(self, file, **kwargs):
497503
df = pd.DataFrame(data=(next(data) for _ in range(start, min(start + step, end))), columns=cols)
498504
yield df
499505
start += step
506+
507+
508+
class URLFileSecure(URLFile):
509+
"""Updating of default logic:
510+
This connector shouldn't work with local files.
511+
"""
512+
513+
def __init__(self, url: str, provider: dict, binary=None, encoding=None):
514+
storage_name = provider["storage"].lower()
515+
if url.startswith("file://") or storage_name == LOCAL_STORAGE_NAME:
516+
raise RuntimeError("the local file storage is not supported by this connector.")
517+
super().__init__(url, provider, binary, encoding)

airbyte-integrations/connectors/source-file/source_file/source.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@
1717
AirbyteMessage,
1818
AirbyteRecordMessage,
1919
ConfiguredAirbyteCatalog,
20+
ConnectorSpecification,
2021
FailureType,
2122
Status,
2223
Type,
2324
)
2425
from airbyte_cdk.sources import Source
25-
from airbyte_cdk.utils import AirbyteTracedException
26+
from airbyte_cdk.utils import AirbyteTracedException, is_cloud_environment
2627

2728
from .client import Client
28-
from .utils import dropbox_force_download
29+
from .utils import LOCAL_STORAGE_NAME, dropbox_force_download
2930

3031

3132
class SourceFile(Source):
@@ -105,6 +106,19 @@ def _validate_and_transform(config: Mapping[str, Any]):
105106
raise AirbyteTracedException(message=message, internal_message=message, failure_type=FailureType.config_error)
106107
return config
107108

109+
def spec(self, logger: AirbyteLogger) -> ConnectorSpecification:
110+
"""Returns the json schema for the spec"""
111+
spec = super().spec(logger)
112+
113+
# override cloud spec to remove local file support
114+
if is_cloud_environment():
115+
for i in range(len(spec.connectionSpecification["properties"]["provider"]["oneOf"])):
116+
provider = spec.connectionSpecification["properties"]["provider"]["oneOf"][i]
117+
if provider["properties"]["storage"]["const"] == LOCAL_STORAGE_NAME:
118+
spec.connectionSpecification["properties"]["provider"]["oneOf"].pop(i)
119+
120+
return spec
121+
108122
def check(self, logger, config: Mapping) -> AirbyteConnectionStatus:
109123
"""
110124
Check involves verifying that the specified file is reachable with

airbyte-integrations/connectors/source-file/source_file/utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
# default logger
99
logger = logging.getLogger("airbyte")
1010

11+
LOCAL_STORAGE_NAME = "local"
12+
1113

1214
def dropbox_force_download(url):
1315
"""

0 commit comments

Comments
 (0)