Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
6593ee9
adds a new requested_stream function and docstring
chalmerlowe Oct 3, 2024
58f3f27
Adds new function to compare preserve_order and max_stream_count
chalmerlowe Oct 4, 2024
f4e0d6e
Updates type hints to resolve mypy errors
chalmerlowe Oct 4, 2024
91190d7
corrects some mypy, linting errors
chalmerlowe Oct 4, 2024
8f1f090
updates several type hints
chalmerlowe Oct 4, 2024
0e9c1b0
updates pandas type hint because of unit-noextras
chalmerlowe Oct 4, 2024
6144379
adds validation and unit tests
chalmerlowe Oct 7, 2024
1d635ac
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 7, 2024
0a0c71c
updates precedence rules and tests
chalmerlowe Oct 8, 2024
67e7f2b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 8, 2024
2a8bb7f
updates docstring in _download_table_bqstorage
chalmerlowe Oct 8, 2024
820e4ee
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 8, 2024
656b4f7
Update google/cloud/bigquery/_pandas_helpers.py
chalmerlowe Oct 10, 2024
fe64501
Update google/cloud/bigquery/_pandas_helpers.py
chalmerlowe Oct 10, 2024
9de98fe
Update google/cloud/bigquery/_pandas_helpers.py
chalmerlowe Oct 10, 2024
bd2fea0
Merge branch 'main' into update-maxstreams
chalmerlowe Oct 10, 2024
8d67a75
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 10, 2024
2251dd9
Merge branch 'update-maxstreams' of https://github.com/googleapis/pyt…
gcf-owl-bot[bot] Oct 10, 2024
e537613
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 10, 2024
6be29d6
Merge branch 'update-maxstreams' of https://github.com/googleapis/pyt…
gcf-owl-bot[bot] Oct 10, 2024
a3160ee
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 10, 2024
94a7c28
Merge branch 'update-maxstreams' of https://github.com/googleapis/pyt…
gcf-owl-bot[bot] Oct 10, 2024
6f12c0c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Oct 10, 2024
71f2799
Merge branch 'update-maxstreams' of https://github.com/googleapis/pyt…
gcf-owl-bot[bot] Oct 10, 2024
4da6bf6
Update _pandas_helpers.py
chalmerlowe Oct 10, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 99 additions & 19 deletions google/cloud/bigquery/_pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import logging
import queue
import warnings
from typing import Any, Union
from typing import Any, Union, Optional, Callable, Generator, List


from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema


try:
import pandas # type: ignore

Expand Down Expand Up @@ -75,7 +76,7 @@ def _to_wkb(v):
_to_wkb = _to_wkb()

try:
from google.cloud.bigquery_storage import ArrowSerializationOptions
from google.cloud.bigquery_storage_v1.types import ArrowSerializationOptions
except ImportError:
_ARROW_COMPRESSION_SUPPORT = False
else:
Expand Down Expand Up @@ -816,18 +817,54 @@ def _nowait(futures):


def _download_table_bqstorage(
project_id,
table,
bqstorage_client,
preserve_order=False,
selected_fields=None,
page_to_item=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
):
"""Use (faster, but billable) BQ Storage API to construct DataFrame."""
project_id: str,
table: Any,
bqstorage_client: Any,
preserve_order: bool = False,
selected_fields: Optional[List[Any]] = None,
page_to_item: Optional[Callable] = None,
max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT,
max_stream_count: Optional[int] = None,
) -> Generator[Any, None, None]:
"""Downloads a BigQuery table using the BigQuery Storage API.

This method uses the faster, but potentially more expensive, BigQuery
Storage API to download a table as a Pandas DataFrame. It supports
parallel downloads and optional data transformations.

Args:
project_id (str): The ID of the Google Cloud project containing
the table.
table (Any): The BigQuery table to download.
bqstorage_client (Any): An
authenticated BigQuery Storage API client.
preserve_order (bool, optional): Whether to preserve the order
of the rows as they are read from BigQuery. If True this limits
the number of streams to one and overrides `max_stream_count`.
Defaults to False.
selected_fields (Optional[List[SchemaField]]):
A list of BigQuery schema fields to select for download. If None,
all fields are downloaded. Defaults to None.
page_to_item (Optional[Callable]): An optional callable
function that takes a page of data from the BigQuery Storage API
max_stream_count (Optional[int]): The maximum number of
concurrent streams to use for downloading data. If `preserve_order`
is True, the requested streams are limited to 1 regardless of the
`max_stream_count` value. If 0 or None, then the number of
requested streams will be unbounded. Defaults to None.

Yields:
pandas.DataFrame: Pandas DataFrames, one for each chunk of data
downloaded from BigQuery.

Raises:
ValueError: If attempting to read from a specific partition or snapshot.

Note:
This method requires the `google-cloud-bigquery-storage` library
to be installed.
"""

# Passing a BQ Storage client in implies that the BigQuery Storage library
# is available and can be imported.
from google.cloud import bigquery_storage

if "$" in table.table_id:
Expand All @@ -837,18 +874,20 @@ def _download_table_bqstorage(
if "@" in table.table_id:
raise ValueError("Reading from a specific snapshot is not currently supported.")

requested_streams = 1 if preserve_order else 0
requested_streams = determine_requested_streams(preserve_order, max_stream_count)

requested_session = bigquery_storage.types.ReadSession(
table=table.to_bqstorage(), data_format=bigquery_storage.types.DataFormat.ARROW
requested_session = bigquery_storage.types.stream.ReadSession(
table=table.to_bqstorage(),
data_format=bigquery_storage.types.stream.DataFormat.ARROW,
)
if selected_fields is not None:
for field in selected_fields:
requested_session.read_options.selected_fields.append(field.name)

if _ARROW_COMPRESSION_SUPPORT:
requested_session.read_options.arrow_serialization_options.buffer_compression = (
ArrowSerializationOptions.CompressionCodec.LZ4_FRAME
# CompressionCodec(1) -> LZ4_FRAME
ArrowSerializationOptions.CompressionCodec(1)
)

session = bqstorage_client.create_read_session(
Expand Down Expand Up @@ -884,7 +923,7 @@ def _download_table_bqstorage(
elif max_queue_size is None:
max_queue_size = 0 # unbounded

worker_queue = queue.Queue(maxsize=max_queue_size)
worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size)

with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool:
try:
Expand All @@ -910,7 +949,7 @@ def _download_table_bqstorage(
# we want to block on the queue's get method, instead. This
# prevents the queue from filling up, because the main thread
# has smaller gaps in time between calls to the queue's get
# method. For a detailed explaination, see:
# method. For a detailed explanation, see:
# https://friendliness.dev/2019/06/18/python-nowait/
done, not_done = _nowait(not_done)
for future in done:
Expand Down Expand Up @@ -949,6 +988,7 @@ def download_arrow_bqstorage(
preserve_order=False,
selected_fields=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
max_stream_count=None,
):
return _download_table_bqstorage(
project_id,
Expand All @@ -958,6 +998,7 @@ def download_arrow_bqstorage(
selected_fields=selected_fields,
page_to_item=_bqstorage_page_to_arrow,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)


Expand All @@ -970,6 +1011,7 @@ def download_dataframe_bqstorage(
preserve_order=False,
selected_fields=None,
max_queue_size=_MAX_QUEUE_SIZE_DEFAULT,
max_stream_count=None,
):
page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes)
return _download_table_bqstorage(
Expand All @@ -980,6 +1022,7 @@ def download_dataframe_bqstorage(
selected_fields=selected_fields,
page_to_item=page_to_item,
max_queue_size=max_queue_size,
max_stream_count=max_stream_count,
)


Expand Down Expand Up @@ -1024,3 +1067,40 @@ def verify_pandas_imports():
raise ValueError(_NO_PANDAS_ERROR) from pandas_import_exception
if db_dtypes is None:
raise ValueError(_NO_DB_TYPES_ERROR) from db_dtypes_import_exception


def determine_requested_streams(
preserve_order: bool,
max_stream_count: Union[int, None],
) -> int:
"""Determines the value of requested_streams based on the values of
`preserve_order` and `max_stream_count`.

Args:
preserve_order (bool): Whether to preserve the order of streams. If True,
this limits the number of streams to one. `preserve_order` takes
precedence over `max_stream_count`.
max_stream_count (Union[int, None]]): The maximum number of streams
allowed. Must be a non-negative number or None, where None indicates
the value is unset. NOTE: if `preserve_order` is also set, it takes
precedence over `max_stream_count`, thus to ensure that `max_stream_count`
is used, ensure that `preserve_order` is None.

Returns:
(int) The appropriate value for requested_streams.
"""

if preserve_order:
# If preserve order is set, it takes precendence.
# Limit the requested streams to 1, to ensure that order
# is preserved)
return 1

elif max_stream_count is not None:
# If preserve_order is not set, only then do we consider max_stream_count
if max_stream_count <= -1:
raise ValueError("max_stream_count must be non-negative OR None")
return max_stream_count

# Default to zero requested streams (unbounded).
return 0
31 changes: 31 additions & 0 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import functools
import operator
import queue
from typing import Union
from unittest import mock
import warnings

Expand Down Expand Up @@ -46,6 +47,7 @@
from google.cloud.bigquery import _pyarrow_helpers
from google.cloud.bigquery import _versions_helpers
from google.cloud.bigquery import schema
from google.cloud.bigquery._pandas_helpers import determine_requested_streams

pyarrow = _versions_helpers.PYARROW_VERSIONS.try_import()

Expand Down Expand Up @@ -2053,3 +2055,32 @@ def test_verify_pandas_imports_no_db_dtypes(module_under_test, monkeypatch):
monkeypatch.setattr(module_under_test, "db_dtypes", None)
with pytest.raises(ValueError, match="Please install the 'db-dtypes' package"):
module_under_test.verify_pandas_imports()


@pytest.mark.parametrize(
"preserve_order, max_stream_count, expected_requested_streams",
[
# If preserve_order is set/True, it takes precedence:
(True, 10, 1), # use 1
(True, None, 1), # use 1
# If preserve_order is not set check max_stream_count:
(False, 10, 10), # max_stream_count (X) takes precedence
(False, None, 0), # Unbounded (0) when both are unset
],
)
def test_determine_requested_streams(
preserve_order: bool,
max_stream_count: Union[int, None],
expected_requested_streams: int,
):
"""Tests various combinations of preserve_order and max_stream_count."""
actual_requested_streams = determine_requested_streams(
preserve_order, max_stream_count
)
assert actual_requested_streams == expected_requested_streams


def test_determine_requested_streams_invalid_max_stream_count():
"""Tests that a ValueError is raised if max_stream_count is negative."""
with pytest.raises(ValueError):
determine_requested_streams(preserve_order=False, max_stream_count=-1)