Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions awswrangler/cloudwatchlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def start_query(self,
:param limit: The maximum number of log events to return in the query.
:return: Query ID
"""
logger.debug(f"log_group_names: {log_group_names}")
start_timestamp = int(1000 * start_time.timestamp())
end_timestamp = int(1000 * end_time.timestamp())
logger.debug(f"start_timestamp: {start_timestamp}")
Expand Down
35 changes: 20 additions & 15 deletions awswrangler/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def metadata_to_glue(self,
partition_cols=None,
preserve_index=True,
mode="append",
cast_columns=None):
cast_columns=None,
extra_args=None):
schema = Glue._build_schema(dataframe=dataframe,
partition_cols=partition_cols,
preserve_index=preserve_index,
Expand All @@ -120,14 +121,13 @@ def metadata_to_glue(self,
self.delete_table_if_exists(database=database, table=table)
exists = self.does_table_exists(database=database, table=table)
if not exists:
self.create_table(
database=database,
table=table,
schema=schema,
partition_cols=partition_cols,
path=path,
file_format=file_format,
)
self.create_table(database=database,
table=table,
schema=schema,
partition_cols=partition_cols,
path=path,
file_format=file_format,
extra_args=extra_args)
if partition_cols:
partitions_tuples = Glue._parse_partitions_tuples(
objects_paths=objects_paths, partition_cols=partition_cols)
Expand Down Expand Up @@ -157,13 +157,17 @@ def create_table(self,
schema,
path,
file_format,
partition_cols=None):
partition_cols=None,
extra_args=None):
if file_format == "parquet":
table_input = Glue.parquet_table_definition(
table, partition_cols, schema, path)
elif file_format == "csv":
table_input = Glue.csv_table_definition(table, partition_cols,
schema, path)
table_input = Glue.csv_table_definition(table,
partition_cols,
schema,
path,
extra_args=extra_args)
else:
raise UnsupportedFileFormat(file_format)
self._client_glue.create_table(DatabaseName=database,
Expand Down Expand Up @@ -229,7 +233,8 @@ def _parse_table_name(path):
return path.rpartition("/")[2]

@staticmethod
def csv_table_definition(table, partition_cols, schema, path):
def csv_table_definition(table, partition_cols, schema, path, extra_args):
sep = extra_args["sep"] if "sep" in extra_args else ","
if not partition_cols:
partition_cols = []
return {
Expand All @@ -245,7 +250,7 @@ def csv_table_definition(table, partition_cols, schema, path):
"classification": "csv",
"compressionType": "none",
"typeOfData": "file",
"delimiter": ",",
"delimiter": sep,
"columnsOrdered": "true",
"areColumnsQuoted": "false",
},
Expand All @@ -262,7 +267,7 @@ def csv_table_definition(table, partition_cols, schema, path):
"NumberOfBuckets": -1,
"SerdeInfo": {
"Parameters": {
"field.delim": ","
"field.delim": sep
},
"SerializationLibrary":
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
Expand Down
116 changes: 71 additions & 45 deletions awswrangler/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ def to_csv(
self,
dataframe,
path,
sep=",",
database=None,
table=None,
partition_cols=None,
Expand All @@ -447,6 +448,7 @@ def to_csv(

:param dataframe: Pandas Dataframe
:param path: AWS S3 path (E.g. s3://bucket-name/folder_name/
:param sep: Same as pandas.to_csv()
:param database: AWS Glue Database name
:param table: AWS Glue table name
:param partition_cols: List of columns names that will be partitions on S3
Expand All @@ -456,18 +458,18 @@ def to_csv(
:param procs_io_bound: Number of cores used for I/O bound tasks
:return: List of objects written on S3
"""
return self.to_s3(
dataframe=dataframe,
path=path,
file_format="csv",
database=database,
table=table,
partition_cols=partition_cols,
preserve_index=preserve_index,
mode=mode,
procs_cpu_bound=procs_cpu_bound,
procs_io_bound=procs_io_bound,
)
extra_args = {"sep": sep}
return self.to_s3(dataframe=dataframe,
path=path,
file_format="csv",
database=database,
table=table,
partition_cols=partition_cols,
preserve_index=preserve_index,
mode=mode,
procs_cpu_bound=procs_cpu_bound,
procs_io_bound=procs_io_bound,
extra_args=extra_args)

def to_parquet(self,
dataframe,
Expand Down Expand Up @@ -519,7 +521,8 @@ def to_s3(self,
mode="append",
procs_cpu_bound=None,
procs_io_bound=None,
cast_columns=None):
cast_columns=None,
extra_args=None):
"""
Write a Pandas Dataframe on S3
Optionally writes metadata on AWS Glue.
Expand All @@ -535,6 +538,7 @@ def to_s3(self,
:param procs_cpu_bound: Number of cores used for CPU bound tasks
:param procs_io_bound: Number of cores used for I/O bound tasks
:param cast_columns: Dictionary of columns indexes and Arrow types to be casted. (E.g. {2: "int64", 5: "int32"}) (Only for "parquet" file_format)
:param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV)
:return: List of objects written on S3
"""
if dataframe.empty:
Expand All @@ -554,7 +558,8 @@ def to_s3(self,
mode=mode,
procs_cpu_bound=procs_cpu_bound,
procs_io_bound=procs_io_bound,
cast_columns=cast_columns)
cast_columns=cast_columns,
extra_args=extra_args)
if database:
self._session.glue.metadata_to_glue(dataframe=dataframe,
path=path,
Expand All @@ -565,7 +570,8 @@ def to_s3(self,
preserve_index=preserve_index,
file_format=file_format,
mode=mode,
cast_columns=cast_columns)
cast_columns=cast_columns,
extra_args=extra_args)
return objects_paths

def data_to_s3(self,
Expand All @@ -577,7 +583,8 @@ def data_to_s3(self,
mode="append",
procs_cpu_bound=None,
procs_io_bound=None,
cast_columns=None):
cast_columns=None,
extra_args=None):
if not procs_cpu_bound:
procs_cpu_bound = self._session.procs_cpu_bound
if not procs_io_bound:
Expand All @@ -601,7 +608,8 @@ def data_to_s3(self,
target=self._data_to_s3_dataset_writer_remote,
args=(send_pipe, dataframe.iloc[bounder[0]:bounder[1], :],
path, partition_cols, preserve_index,
self._session.primitives, file_format, cast_columns),
self._session.primitives, file_format, cast_columns,
extra_args),
)
proc.daemon = False
proc.start()
Expand All @@ -619,7 +627,8 @@ def data_to_s3(self,
preserve_index=preserve_index,
session_primitives=self._session.primitives,
file_format=file_format,
cast_columns=cast_columns)
cast_columns=cast_columns,
extra_args=extra_args)
if mode == "overwrite_partitions" and partition_cols:
if procs_io_bound > procs_cpu_bound:
num_procs = floor(
Expand All @@ -639,7 +648,8 @@ def _data_to_s3_dataset_writer(dataframe,
preserve_index,
session_primitives,
file_format,
cast_columns=None):
cast_columns=None,
extra_args=None):
objects_paths = []
if not partition_cols:
object_path = Pandas._data_to_s3_object_writer(
Expand All @@ -648,7 +658,8 @@ def _data_to_s3_dataset_writer(dataframe,
preserve_index=preserve_index,
session_primitives=session_primitives,
file_format=file_format,
cast_columns=cast_columns)
cast_columns=cast_columns,
extra_args=extra_args)
objects_paths.append(object_path)
else:
for keys, subgroup in dataframe.groupby(partition_cols):
Expand All @@ -665,21 +676,21 @@ def _data_to_s3_dataset_writer(dataframe,
preserve_index=preserve_index,
session_primitives=session_primitives,
file_format=file_format,
cast_columns=cast_columns)
cast_columns=cast_columns,
extra_args=extra_args)
objects_paths.append(object_path)
return objects_paths

@staticmethod
def _data_to_s3_dataset_writer_remote(
send_pipe,
dataframe,
path,
partition_cols,
preserve_index,
session_primitives,
file_format,
cast_columns=None,
):
def _data_to_s3_dataset_writer_remote(send_pipe,
dataframe,
path,
partition_cols,
preserve_index,
session_primitives,
file_format,
cast_columns=None,
extra_args=None):
send_pipe.send(
Pandas._data_to_s3_dataset_writer(
dataframe=dataframe,
Expand All @@ -688,7 +699,8 @@ def _data_to_s3_dataset_writer_remote(
preserve_index=preserve_index,
session_primitives=session_primitives,
file_format=file_format,
cast_columns=cast_columns))
cast_columns=cast_columns,
extra_args=extra_args))
send_pipe.close()

@staticmethod
Expand All @@ -697,7 +709,8 @@ def _data_to_s3_object_writer(dataframe,
preserve_index,
session_primitives,
file_format,
cast_columns=None):
cast_columns=None,
extra_args=None):
fs = s3.get_fs(session_primitives=session_primitives)
fs = pyarrow.filesystem._ensure_filesystem(fs)
s3.mkdir_if_not_exists(fs, path)
Expand All @@ -713,27 +726,40 @@ def _data_to_s3_object_writer(dataframe,
path=object_path,
preserve_index=preserve_index,
fs=fs,
cast_columns=cast_columns)
cast_columns=cast_columns,
extra_args=extra_args)
elif file_format == "csv":
Pandas.write_csv_dataframe(
dataframe=dataframe,
path=object_path,
preserve_index=preserve_index,
fs=fs,
)
Pandas.write_csv_dataframe(dataframe=dataframe,
path=object_path,
preserve_index=preserve_index,
fs=fs,
extra_args=extra_args)
return object_path

@staticmethod
def write_csv_dataframe(dataframe, path, preserve_index, fs):
def write_csv_dataframe(dataframe,
path,
preserve_index,
fs,
extra_args=None):
csv_extra_args = {}
if "sep" in extra_args:
csv_extra_args["sep"] = extra_args["sep"]
csv_buffer = bytes(
dataframe.to_csv(None, header=False, index=preserve_index),
"utf-8")
dataframe.to_csv(None,
header=False,
index=preserve_index,
**csv_extra_args), "utf-8")
with fs.open(path, "wb") as f:
f.write(csv_buffer)

@staticmethod
def write_parquet_dataframe(dataframe, path, preserve_index, fs,
cast_columns):
def write_parquet_dataframe(dataframe,
path,
preserve_index,
fs,
cast_columns,
extra_args=None):
if not cast_columns:
cast_columns = {}
casted_in_pandas = []
Expand Down
1 change: 1 addition & 0 deletions building/build-docs.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
set -e

cd ..
sphinx-apidoc --separate -f -H "API Reference" -o docs/source/api awswrangler/
Expand Down
1 change: 1 addition & 0 deletions building/build-glue-egg.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
set -e

cd ..
rm -rf *.egg-info build dist/*.egg
Expand Down
1 change: 1 addition & 0 deletions building/build-image.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
set -e

cp ../requirements.txt .
cp ../requirements-dev.txt .
Expand Down
1 change: 1 addition & 0 deletions building/build-lambda-layer.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
set -e

# Go to home
cd ~
Expand Down
1 change: 1 addition & 0 deletions building/deploy-source.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
set -e

cd ..
rm -rf *.egg-info dist/*.tar.gz
Expand Down
1 change: 1 addition & 0 deletions building/publish.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/bin/bash
set -e

cd ..
rm -fr build dist .egg awswrangler.egg-info
Expand Down
2 changes: 2 additions & 0 deletions testing/run-tests.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

set -e

cd ..
rm -rf *.pytest_cache
yapf --in-place --recursive setup.py awswrangler testing/test_awswrangler
Expand Down
2 changes: 1 addition & 1 deletion testing/test_awswrangler/test_athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def database(cloudformation_outputs):
def test_query_cancelled(session, database):
client_athena = boto3.client("athena")
query_execution_id = session.athena.run_query(query="""
SELECT
SELECT
rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(),
rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(),
rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(),
Expand Down
2 changes: 2 additions & 0 deletions testing/test_awswrangler/test_cloudwatchlogs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from datetime import datetime
from time import sleep

import pytest
import boto3
Expand Down Expand Up @@ -63,6 +64,7 @@ def logstream(cloudformation_outputs, loggroup):
if token:
args["sequenceToken"] = token
client.put_log_events(**args)
sleep(120)
yield logstream


Expand Down
Loading