Skip to content
This repository was archived by the owner on Sep 12, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions google/cloud/bigquery_storage_v1/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,13 @@ def to_arrow(self):
record_batches = []
for page in self.pages:
record_batches.append(page.to_arrow())
return pyarrow.Table.from_batches(record_batches)

if record_batches:
return pyarrow.Table.from_batches(record_batches)

# No data, return an empty Table.
self._stream_parser._parse_arrow_schema()
return pyarrow.Table.from_batches([], schema=self._stream_parser._schema)

def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
Expand Down Expand Up @@ -323,17 +329,66 @@ def to_dataframe(self, dtypes=None):
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
schema_type = self._read_session.WhichOneof("schema")

if schema_type == "arrow_schema":
record_batch = self.to_arrow()
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df

frames = []
for page in self.pages:
frames.append(page.to_dataframe(dtypes=dtypes))
return pandas.concat(frames)
frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages]

if frames:
return pandas.concat(frames)

# No data, construct an empty dataframe with columns matching the schema.
# The result should be consistent with what an empty ARROW stream would produce.
self._stream_parser._parse_avro_schema()
schema = self._stream_parser._avro_schema_json

column_dtypes = self._dtypes_from_avro(schema["fields"])
column_dtypes.update(dtypes)

df = pandas.DataFrame(columns=column_dtypes.keys())
for column in df:
df[column] = pandas.Series([], dtype=column_dtypes[column])

return df

def _dtypes_from_avro(self, avro_fields):
"""Determine Pandas dtypes for columns in Avro schema.

Args:
avro_fields (Iterable[Mapping[str, Any]]):
Avro fields' metadata.

Returns:
colelctions.OrderedDict[str, str]:
Column names with their corresponding Pandas dtypes.
"""
result = collections.OrderedDict()

type_map = {"long": "int64", "double": "float64", "boolean": "bool"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine for this change, but I wonder if we should consider consolidating all our various type/schema conversion code in the storage library and bigquery. Is there demand outside of our own usages (e.g. in other storage APIs) that we should consider moving this into a more central dependency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe in some libs working closely with the BigQuery API? cc: @tswast

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to have a look at it in the near future. It might be worth to wait for all the pending fixes and dependency version updates, though, to see how of that "lipstick" logic for types is still needed.


for field_info in avro_fields:
# If a type is an union of multiple types, pick the first type
# that is not "null".
if isinstance(field_info["type"], list):
type_info = next(item for item in field_info["type"] if item != "null")

if isinstance(type_info, six.string_types):
field_dtype = type_map.get(type_info, "object")
else:
logical_type = type_info.get("logicalType")
if logical_type == "timestamp-micros":
field_dtype = "datetime64[ns, UTC]"
else:
field_dtype = "object"

result[field_info["name"]] = field_dtype

return result


class ReadRowsPage(object):
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/test_reader_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,92 @@ def test_to_dataframe_w_dtypes_arrow(class_under_test):
)


def test_to_dataframe_empty_w_scalars_avro(class_under_test):
avro_schema = _bq_to_avro_schema(SCALAR_COLUMNS)
read_session = _generate_avro_read_session(avro_schema)
avro_blocks = _bq_to_avro_blocks([], avro_schema)
reader = class_under_test(avro_blocks, mock_client, "", 0, {})

got = reader.to_dataframe(read_session)

expected = pandas.DataFrame(columns=SCALAR_COLUMN_NAMES)
expected["int_col"] = expected["int_col"].astype("int64")
expected["float_col"] = expected["float_col"].astype("float64")
expected["bool_col"] = expected["bool_col"].astype("bool")
expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_scalars_arrow(class_under_test):
arrow_schema = _bq_to_arrow_schema(SCALAR_COLUMNS)
read_session = _generate_arrow_read_session(arrow_schema)
arrow_batches = _bq_to_arrow_batches([], arrow_schema)
reader = class_under_test(arrow_batches, mock_client, "", 0, {})

got = reader.to_dataframe(read_session)

expected = pandas.DataFrame([], columns=SCALAR_COLUMN_NAMES)
expected["int_col"] = expected["int_col"].astype("int64")
expected["float_col"] = expected["float_col"].astype("float64")
expected["bool_col"] = expected["bool_col"].astype("bool")
expected["ts_col"] = expected["ts_col"].astype("datetime64[ns, UTC]")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_dtypes_avro(class_under_test, mock_client):
avro_schema = _bq_to_avro_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_avro_read_session(avro_schema)
avro_blocks = _bq_to_avro_blocks([], avro_schema)
reader = class_under_test(avro_blocks, mock_client, "", 0, {})

got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"])
expected["bigfloat"] = expected["bigfloat"].astype("float64")
expected["lilfloat"] = expected["lilfloat"].astype("float16")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_empty_w_dtypes_arrow(class_under_test, mock_client):
arrow_schema = _bq_to_arrow_schema(
[
{"name": "bigfloat", "type": "float64"},
{"name": "lilfloat", "type": "float64"},
]
)
read_session = _generate_arrow_read_session(arrow_schema)
arrow_batches = _bq_to_arrow_batches([], arrow_schema)
reader = class_under_test(arrow_batches, mock_client, "", 0, {})

got = reader.to_dataframe(read_session, dtypes={"lilfloat": "float16"})

expected = pandas.DataFrame([], columns=["bigfloat", "lilfloat"])
expected["bigfloat"] = expected["bigfloat"].astype("float64")
expected["lilfloat"] = expected["lilfloat"].astype("float16")

pandas.testing.assert_frame_equal(
got.reset_index(drop=True), # reset_index to ignore row labels
expected.reset_index(drop=True),
)


def test_to_dataframe_by_page(class_under_test, mock_client):
bq_columns = [
{"name": "int_col", "type": "int64"},
Expand Down