Skip to content

Commit f04bbaa

Browse files
huiserdarthbear
authored andcommitted
Add support for MinIO (chimpler#3)
1 parent 84d0f98 commit f04bbaa

File tree

1 file changed

+97
-2
lines changed

1 file changed

+97
-2
lines changed

aws_s3--0.0.1.sql

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,72 @@ AS $$
9191
else:
9292
s3.download_fileobj(bucket, file_path, fd)
9393
fd.flush()
94-
res = plpy.execute("COPY {table_name} {column_list} FROM {filename} {options};".format(
94+
res = plpy.execute("COPY {table_name} ({column_list}) FROM {filename} {options};".format(
95+
table_name=table_name,
96+
filename=plpy.quote_literal(fd.name),
97+
column_list=column_list,
98+
options=options
99+
)
100+
)
101+
return res.nrows()
102+
$$;
103+
104+
CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3 (
105+
table_name text,
106+
column_list text,
107+
options text,
108+
bucket text,
109+
file_path text,
110+
region text,
111+
access_key text,
112+
secret_key text,
113+
session_token text,
114+
endpoint_url text
115+
) RETURNS int
116+
LANGUAGE plpythonu
117+
AS $$
118+
def cache_import(module_name):
119+
module_cache = SD.get('__modules__', {})
120+
if module_name in module_cache:
121+
return module_cache[module_name]
122+
else:
123+
import importlib
124+
_module = importlib.import_module(module_name)
125+
if not module_cache:
126+
SD['__modules__'] = module_cache
127+
module_cache[module_name] = _module
128+
return _module
129+
130+
boto3 = cache_import('boto3')
131+
tempfile = cache_import('tempfile')
132+
gzip = cache_import('gzip')
133+
shutil = cache_import('shutil')
134+
135+
plan = plpy.prepare('select current_setting($1, true)::int', ['TEXT'])
136+
137+
s3 = boto3.client(
138+
's3',
139+
endpoint_url=endpoint_url,
140+
aws_access_key_id=access_key,
141+
aws_secret_access_key=secret_key,
142+
aws_session_token=session_token,
143+
region_name=region
144+
)
145+
146+
response = s3.head_object(Bucket=bucket, Key=file_path)
147+
content_encoding = response.get('ContentEncoding')
148+
149+
with tempfile.NamedTemporaryFile() as fd:
150+
if content_encoding and content_encoding.lower() == 'gzip':
151+
with tempfile.NamedTemporaryFile() as gzfd:
152+
s3.download_fileobj(bucket, file_path, gzfd)
153+
gzfd.flush()
154+
gzfd.seek(0)
155+
shutil.copyfileobj(gzip.GzipFile(fileobj=gzfd, mode='rb'), fd)
156+
else:
157+
s3.download_fileobj(bucket, file_path, fd)
158+
fd.flush()
159+
res = plpy.execute("COPY {table_name} ({column_list}) FROM {filename} {options};".format(
95160
table_name=table_name,
96161
filename=plpy.quote_literal(fd.name),
97162
column_list=column_list,
@@ -131,4 +196,34 @@ AS $$
131196
credentials['session_token']
132197
]
133198
)[0]['num_rows']
134-
$$;
199+
$$;
200+
201+
CREATE OR REPLACE FUNCTION aws_s3.table_import_from_s3(
202+
table_name text,
203+
column_list text,
204+
options text,
205+
s3_info aws_commons._s3_uri_1,
206+
credentials aws_commons._aws_credentials_1,
207+
endpoint_url text
208+
) RETURNS INT
209+
LANGUAGE plpythonu
210+
AS $$
211+
plan = plpy.prepare(
212+
'SELECT aws_s3.table_import_from_s3($1, $2, $3, $4, $5, $6, $7, $8, $9) AS num_rows',
213+
['TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT', 'TEXT']
214+
)
215+
return plan.execute(
216+
[
217+
table_name,
218+
column_list,
219+
options,
220+
s3_info['bucket'],
221+
s3_info['file_path'],
222+
s3_info['region'],
223+
credentials['access_key'],
224+
credentials['secret_key'],
225+
credentials['session_token'],
226+
endpoint_url
227+
]
228+
)[0]['num_rows']
229+
$$;

0 commit comments

Comments
 (0)