@@ -54,10 +54,14 @@ class URLFile:
5454 ```
5555 """
5656
57- def __init__ (self , url : str , provider : dict ):
57+ def __init__ (self , url : str , provider : dict , binary = None , encoding = None ):
5858 self ._url = url
5959 self ._provider = provider
6060 self ._file = None
61+ self .args = {
62+ "mode" : "rb" if binary else "r" ,
63+ "encoding" : encoding ,
64+ }
6165
6266 def __enter__ (self ):
6367 return self ._file
@@ -74,29 +78,28 @@ def close(self):
7478 self ._file .close ()
7579 self ._file = None
7680
77- def open (self , binary = False ):
81+ def open (self ):
7882 self .close ()
7983 try :
80- self ._file = self ._open (binary = binary )
84+ self ._file = self ._open ()
8185 except google .api_core .exceptions .NotFound as err :
8286 raise FileNotFoundError (self .url ) from err
8387 return self
8488
85- def _open (self , binary ):
86- mode = "rb" if binary else "r"
89+ def _open (self ):
8790 storage = self .storage_scheme
8891 url = self .url
8992
9093 if storage == "gs://" :
91- return self ._open_gcs_url (binary = binary )
94+ return self ._open_gcs_url ()
9295 elif storage == "s3://" :
93- return self ._open_aws_url (binary = binary )
96+ return self ._open_aws_url ()
9497 elif storage == "azure://" :
95- return self ._open_azblob_url (binary = binary )
98+ return self ._open_azblob_url ()
9699 elif storage == "webhdfs://" :
97100 host = self ._provider ["host" ]
98101 port = self ._provider ["port" ]
99- return smart_open .open (f"webhdfs://{ host } :{ port } /{ url } " , mode = mode )
102+ return smart_open .open (f"webhdfs://{ host } :{ port } /{ url } " , ** self . args )
100103 elif storage in ("ssh://" , "scp://" , "sftp://" ):
101104 user = self ._provider ["user" ]
102105 host = self ._provider ["host" ]
@@ -114,19 +117,15 @@ def _open(self, binary):
114117 uri = f"{ storage } { user } :{ password } @{ host } :{ port } /{ url } "
115118 else :
116119 uri = f"{ storage } { user } @{ host } :{ port } /{ url } "
117- return smart_open .open (uri , transport_params = transport_params , mode = mode )
120+ return smart_open .open (uri , transport_params = transport_params , ** self . args )
118121 elif storage in ("https://" , "http://" ):
119122 transport_params = None
120123 if "user_agent" in self ._provider and self ._provider ["user_agent" ]:
121124 airbyte_version = environ .get ("AIRBYTE_VERSION" , "0.0" )
122125 transport_params = {"headers" : {"Accept-Encoding" : "identity" , "User-Agent" : f"Airbyte/{ airbyte_version } " }}
123126 logger .info (f"TransportParams: { transport_params } " )
124- return smart_open .open (
125- self .full_url ,
126- mode = mode ,
127- transport_params = transport_params ,
128- )
129- return smart_open .open (self .full_url , mode = mode )
127+ return smart_open .open (self .full_url , transport_params = transport_params , ** self .args )
128+ return smart_open .open (self .full_url , ** self .args )
130129
131130 @property
132131 def url (self ) -> str :
@@ -168,8 +167,7 @@ def storage_scheme(self) -> str:
168167 logger .error (f"Unknown Storage provider in: { self .full_url } " )
169168 return ""
170169
171- def _open_gcs_url (self , binary ) -> object :
172- mode = "rb" if binary else "r"
170+ def _open_gcs_url (self ) -> object :
173171 service_account_json = self ._provider .get ("service_account_json" )
174172 credentials = None
175173 if service_account_json :
@@ -185,28 +183,27 @@ def _open_gcs_url(self, binary) -> object:
185183 client = GCSClient (credentials = credentials , project = credentials ._project_id )
186184 else :
187185 client = GCSClient .create_anonymous_client ()
188- file_to_close = smart_open .open (self .full_url , transport_params = dict ( client = client ), mode = mode )
186+ file_to_close = smart_open .open (self .full_url , transport_params = { " client" : client }, ** self . args )
189187
190188 return file_to_close
191189
192- def _open_aws_url (self , binary ):
193- mode = "rb" if binary else "r"
190+ def _open_aws_url (self ):
194191 aws_access_key_id = self ._provider .get ("aws_access_key_id" )
195192 aws_secret_access_key = self ._provider .get ("aws_secret_access_key" )
196193 use_aws_account = aws_access_key_id and aws_secret_access_key
197194
198195 if use_aws_account :
199196 aws_access_key_id = self ._provider .get ("aws_access_key_id" , "" )
200197 aws_secret_access_key = self ._provider .get ("aws_secret_access_key" , "" )
201- result = smart_open .open (f"{ self .storage_scheme } { aws_access_key_id } :{ aws_secret_access_key } @{ self .url } " , mode = mode )
198+ url = f"{ self .storage_scheme } { aws_access_key_id } :{ aws_secret_access_key } @{ self .url } "
199+ result = smart_open .open (url , ** self .args )
202200 else :
203201 config = botocore .client .Config (signature_version = botocore .UNSIGNED )
204202 params = {"client" : boto3 .client ("s3" , config = config )}
205- result = smart_open .open (self .full_url , transport_params = params , mode = mode )
203+ result = smart_open .open (self .full_url , transport_params = params , ** self . args )
206204 return result
207205
208- def _open_azblob_url (self , binary ):
209- mode = "rb" if binary else "r"
206+ def _open_azblob_url (self ):
210207 storage_account = self ._provider .get ("storage_account" )
211208 storage_acc_url = f"https://{ storage_account } .blob.core.windows.net"
212209 sas_token = self ._provider .get ("sas_token" , None )
@@ -220,14 +217,15 @@ def _open_azblob_url(self, binary):
220217 # assuming anonymous public read access given no credential
221218 client = BlobServiceClient (account_url = storage_acc_url )
222219
223- result = smart_open . open ( f"{ self .storage_scheme } { self .url } " , transport_params = dict ( client = client ), mode = mode )
224- return result
220+ url = f"{ self .storage_scheme } { self .url } "
221+ return smart_open . open ( url , transport_params = dict ( client = client ), ** self . args )
225222
226223
227224class Client :
228225 """Class that manages reading and parsing data from streams"""
229226
230227 reader_class = URLFile
228+ binary_formats = {"excel" , "feather" , "parquet" , "orc" , "pickle" }
231229
232230 def __init__ (self , dataset_name : str , url : str , provider : dict , format : str = None , reader_options : str = None ):
233231 self ._dataset_name = dataset_name
@@ -243,6 +241,9 @@ def __init__(self, dataset_name: str, url: str, provider: dict, format: str = No
243241 logger .error (error_msg )
244242 raise ConfigurationError (error_msg ) from err
245243
244+ self .binary_source = self ._reader_format in self .binary_formats
245+ self .encoding = self ._reader_options .get ("encoding" )
246+
246247 @property
247248 def stream_name (self ) -> str :
248249 if self ._dataset_name :
@@ -336,17 +337,12 @@ def dtype_to_json_type(dtype) -> str:
336337
337338 @property
338339 def reader (self ) -> reader_class :
339- return self .reader_class (url = self ._url , provider = self ._provider )
340-
341- @property
342- def binary_source (self ):
343- binary_formats = {"excel" , "feather" , "parquet" , "orc" , "pickle" }
344- return self ._reader_format in binary_formats
340+ return self .reader_class (url = self ._url , provider = self ._provider , binary = self .binary_source , encoding = self .encoding )
345341
346342 def read (self , fields : Iterable = None ) -> Iterable [dict ]:
347343 """Read data from the stream"""
348- with self .reader .open (binary = self . binary_source ) as fp :
349- if self ._reader_format == "json" or self . _reader_format == "jsonl" :
344+ with self .reader .open () as fp :
345+ if self ._reader_format in [ "json" , "jsonl" ] :
350346 yield from self .load_nested_json (fp )
351347 elif self ._reader_format == "yaml" :
352348 fields = set (fields ) if fields else None
@@ -376,8 +372,8 @@ def _stream_properties(self, fp):
376372 def streams (self ) -> Iterable :
377373 """Discovers available streams"""
378374 # TODO handle discovery of directories of multiple files instead
379- with self .reader .open (binary = self . binary_source ) as fp :
380- if self ._reader_format == "json" or self . _reader_format == "jsonl" :
375+ with self .reader .open () as fp :
376+ if self ._reader_format in [ "json" , "jsonl" ] :
381377 json_schema = self .load_nested_json_schema (fp )
382378 else :
383379 json_schema = {
0 commit comments