Skip to content
77 changes: 65 additions & 12 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
from grpc import Channel

from google.cloud.bigtable.data.execute_query.values import ExecuteQueryValueType
from google.cloud.bigtable.data.execute_query.metadata import SqlType
from google.cloud.bigtable.data.execute_query.metadata import (
SqlType,
_pb_metadata_to_metadata_types,
)
from google.cloud.bigtable.data.execute_query._parameters_formatting import (
_format_execute_query_params,
_to_param_types,
)
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
DEFAULT_CLIENT_INFO,
Expand All @@ -59,7 +63,7 @@
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup

from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _retry_exception_factory
Expand Down Expand Up @@ -542,6 +546,12 @@ async def execute_query(
ServiceUnavailable,
Aborted,
),
prepare_operation_timeout: float = 60,
prepare_attempt_timeout: float | None = 20,
prepare_retryable_errors: Sequence[type[Exception]] = (
DeadlineExceeded,
ServiceUnavailable,
),
) -> "ExecuteQueryIteratorAsync":
"""
Executes an SQL query on an instance.
Expand All @@ -550,6 +560,10 @@ async def execute_query(
Failed requests within operation_timeout will be retried based on the
retryable_errors list until operation_timeout is reached.

Note that this makes two requests, one to ``PrepareQuery`` and one to ``ExecuteQuery``.
These have separate retry configurations. ``ExecuteQuery`` is where the bulk of the
work happens.

Args:
query: Query to be run on Bigtable instance. The query can use ``@param``
placeholders to use parameter interpolation on the server. Values for all
Expand All @@ -566,16 +580,26 @@ async def execute_query(
an empty dict).
app_profile_id: The app profile to associate with requests.
https://cloud.google.com/bigtable/docs/app-profiles
operation_timeout: the time budget for the entire operation, in seconds.
operation_timeout: the time budget for the entire executeQuery operation, in seconds.
Failed requests will be retried within the budget.
Defaults to 600 seconds.
attempt_timeout: the time budget for an individual network request, in seconds.
attempt_timeout: the time budget for an individual executeQuery network request, in seconds.
If it takes longer than this time to complete, the request will be cancelled with
a DeadlineExceeded exception, and a retry will be attempted.
Defaults to the 20 seconds.
If None, defaults to operation_timeout.
retryable_errors: a list of errors that will be retried if encountered.
retryable_errors: a list of errors that will be retried if encountered during executeQuery.
Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted)
prepare_operation_timeout: the time budget for the entire prepareQuery operation, in seconds.
Failed requests will be retried within the budget.
Defaults to 60 seconds.
prepare_attempt_timeout: the time budget for an individual prepareQuery network request, in seconds.
If it takes longer than this time to complete, the request will be cancelled with
a DeadlineExceeded exception, and a retry will be attempted.
Defaults to the 20 seconds.
If None, defaults to prepare_operation_timeout.
prepare_retryable_errors: a list of errors that will be retried if encountered during prepareQuery.
Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable)
Returns:
ExecuteQueryIteratorAsync: an asynchronous iterator that yields rows returned by the query
Raises:
Expand All @@ -586,30 +610,59 @@ async def execute_query(
google.cloud.bigtable.data.exceptions.ParameterTypeInferenceFailed: Raised if
a parameter is passed without an explicit type, and the type cannot be infered
"""
warnings.warn(
"ExecuteQuery is in preview and may change in the future.",
category=RuntimeWarning,
instance_name = self._gapic_client.instance_path(self.project, instance_id)
converted_param_types = _to_param_types(parameters, parameter_types)
prepare_request = {
"instance_name": instance_name,
"query": query,
"app_profile_id": app_profile_id,
"param_types": converted_param_types,
"proto_format": {},
}
prepare_predicate = retries.if_exception_type(
*[_get_error_type(e) for e in prepare_retryable_errors]
)
prepare_operation_timeout, prepare_attempt_timeout = _align_timeouts(
prepare_operation_timeout, prepare_attempt_timeout
)
prepare_sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

target = partial(
self._gapic_client.prepare_query,
request=prepare_request,
timeout=prepare_attempt_timeout,
retry=None,
)
prepare_result = await CrossSync.retry_target(
target,
prepare_predicate,
prepare_sleep_generator,
prepare_operation_timeout,
exception_factory=_retry_exception_factory,
)

prepare_metadata = _pb_metadata_to_metadata_types(prepare_result.metadata)

retryable_excs = [_get_error_type(e) for e in retryable_errors]

pb_params = _format_execute_query_params(parameters, parameter_types)

instance_name = self._gapic_client.instance_path(self.project, instance_id)

request_body = {
"instance_name": instance_name,
"app_profile_id": app_profile_id,
"query": query,
"prepared_query": prepare_result.prepared_query,
"params": pb_params,
"proto_format": {},
}
operation_timeout, attempt_timeout = _align_timeouts(
operation_timeout, attempt_timeout
)

return CrossSync.ExecuteQueryIterator(
self,
instance_id,
app_profile_id,
request_body,
prepare_metadata,
attempt_timeout,
operation_timeout,
retryable_excs=retryable_excs,
Expand Down
28 changes: 23 additions & 5 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _get_timeouts(
attempt: The timeout value to use for each attempt, in seconds.
table: The table to use for default values.
Returns:
typle[float, float]: A tuple of (operation_timeout, attempt_timeout)
tuple[float, float]: A tuple of (operation_timeout, attempt_timeout)
"""
# load table defaults if necessary
if operation == TABLE_DEFAULT.DEFAULT:
Expand All @@ -154,15 +154,33 @@ def _get_timeouts(
elif attempt == TABLE_DEFAULT.MUTATE_ROWS:
attempt = table.default_mutate_rows_attempt_timeout

return _align_timeouts(final_operation, attempt)


def _align_timeouts(operation: float, attempt: float | None) -> tuple[float, float]:
"""
Convert passed in timeout values to floats.

attempt will use operation value if None, or if larger than operation.

Will call _validate_timeouts on the outputs, and raise ValueError if the
resulting timeouts are invalid.

Args:
operation: The timeout value to use for the entire operation, in seconds.
attempt: The timeout value to use for each attempt, in seconds.
Returns:
tuple[float, float]: A tuple of (operation_timeout, attempt_timeout)
"""
if attempt is None:
# no timeout specified, use operation timeout for both
final_attempt = final_operation
final_attempt = operation
else:
# cap attempt timeout at operation timeout
final_attempt = min(attempt, final_operation) if final_operation else attempt
final_attempt = min(attempt, operation) if operation else attempt

_validate_timeouts(final_operation, final_attempt, allow_none=False)
return final_operation, final_attempt
_validate_timeouts(operation, final_attempt, allow_none=False)
return operation, final_attempt


def _validate_timeouts(
Expand Down
74 changes: 63 additions & 11 deletions google/cloud/bigtable/data/_sync_autogen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@
from functools import partial
from grpc import Channel
from google.cloud.bigtable.data.execute_query.values import ExecuteQueryValueType
from google.cloud.bigtable.data.execute_query.metadata import SqlType
from google.cloud.bigtable.data.execute_query.metadata import (
SqlType,
_pb_metadata_to_metadata_types,
)
from google.cloud.bigtable.data.execute_query._parameters_formatting import (
_format_execute_query_params,
_to_param_types,
)
from google.cloud.bigtable_v2.services.bigtable.transports.base import (
DEFAULT_CLIENT_INFO,
Expand All @@ -48,7 +52,7 @@
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery
from google.cloud.bigtable.data.exceptions import FailedQueryShardError
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT, _align_timeouts
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _retry_exception_factory
Expand Down Expand Up @@ -404,13 +408,23 @@ def execute_query(
ServiceUnavailable,
Aborted,
),
prepare_operation_timeout: float = 60,
prepare_attempt_timeout: float | None = 20,
prepare_retryable_errors: Sequence[type[Exception]] = (
DeadlineExceeded,
ServiceUnavailable,
),
) -> "ExecuteQueryIterator":
"""Executes an SQL query on an instance.
Returns an iterator to asynchronously stream back columns from selected rows.

Failed requests within operation_timeout will be retried based on the
retryable_errors list until operation_timeout is reached.

Note that this makes two requests, one to ``PrepareQuery`` and one to ``ExecuteQuery``.
These have separate retry configurations. ``ExecuteQuery`` is where the bulk of the
work happens.

Args:
query: Query to be run on Bigtable instance. The query can use ``@param``
placeholders to use parameter interpolation on the server. Values for all
Expand All @@ -427,16 +441,26 @@ def execute_query(
an empty dict).
app_profile_id: The app profile to associate with requests.
https://cloud.google.com/bigtable/docs/app-profiles
operation_timeout: the time budget for the entire operation, in seconds.
operation_timeout: the time budget for the entire executeQuery operation, in seconds.
Failed requests will be retried within the budget.
Defaults to 600 seconds.
attempt_timeout: the time budget for an individual network request, in seconds.
attempt_timeout: the time budget for an individual executeQuery network request, in seconds.
If it takes longer than this time to complete, the request will be cancelled with
a DeadlineExceeded exception, and a retry will be attempted.
Defaults to the 20 seconds.
If None, defaults to operation_timeout.
retryable_errors: a list of errors that will be retried if encountered.
retryable_errors: a list of errors that will be retried if encountered during executeQuery.
Defaults to 4 (DeadlineExceeded), 14 (ServiceUnavailable), and 10 (Aborted)
prepare_operation_timeout: the time budget for the entire prepareQuery operation, in seconds.
Failed requests will be retried within the budget.
Defaults to 60 seconds.
prepare_attempt_timeout: the time budget for an individual prepareQuery network request, in seconds.
If it takes longer than this time to complete, the request will be cancelled with
a DeadlineExceeded exception, and a retry will be attempted.
Defaults to the 20 seconds.
If None, defaults to prepare_operation_timeout.
prepare_retryable_errors: a list of errors that will be retried if encountered during prepareQuery.
Defaults to 4 (DeadlineExceeded) and 14 (ServiceUnavailable)
Returns:
ExecuteQueryIterator: an asynchronous iterator that yields rows returned by the query
Raises:
Expand All @@ -447,25 +471,53 @@ def execute_query(
google.cloud.bigtable.data.exceptions.ParameterTypeInferenceFailed: Raised if
a parameter is passed without an explicit type, and the type cannot be infered
"""
warnings.warn(
"ExecuteQuery is in preview and may change in the future.",
category=RuntimeWarning,
instance_name = self._gapic_client.instance_path(self.project, instance_id)
converted_param_types = _to_param_types(parameters, parameter_types)
prepare_request = {
"instance_name": instance_name,
"query": query,
"app_profile_id": app_profile_id,
"param_types": converted_param_types,
"proto_format": {},
}
prepare_predicate = retries.if_exception_type(
*[_get_error_type(e) for e in prepare_retryable_errors]
)
(prepare_operation_timeout, prepare_attempt_timeout) = _align_timeouts(
prepare_operation_timeout, prepare_attempt_timeout
)
prepare_sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
target = partial(
self._gapic_client.prepare_query,
request=prepare_request,
timeout=prepare_attempt_timeout,
retry=None,
)
prepare_result = CrossSync._Sync_Impl.retry_target(
target,
prepare_predicate,
prepare_sleep_generator,
prepare_operation_timeout,
exception_factory=_retry_exception_factory,
)
prepare_metadata = _pb_metadata_to_metadata_types(prepare_result.metadata)
retryable_excs = [_get_error_type(e) for e in retryable_errors]
pb_params = _format_execute_query_params(parameters, parameter_types)
instance_name = self._gapic_client.instance_path(self.project, instance_id)
request_body = {
"instance_name": instance_name,
"app_profile_id": app_profile_id,
"query": query,
"prepared_query": prepare_result.prepared_query,
"params": pb_params,
"proto_format": {},
}
(operation_timeout, attempt_timeout) = _align_timeouts(
operation_timeout, attempt_timeout
)
return CrossSync._Sync_Impl.ExecuteQueryIterator(
self,
instance_id,
app_profile_id,
request_body,
prepare_metadata,
attempt_timeout,
operation_timeout,
retryable_excs=retryable_excs,
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/bigtable/data/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,7 @@ class InvalidExecuteQueryResponse(core_exceptions.GoogleAPICallError):

class ParameterTypeInferenceFailed(ValueError):
"""Exception raised when query parameter types were not provided and cannot be inferred."""


class EarlyMetadataCallError(RuntimeError):
"""Execption raised when metadata is request from an ExecuteQueryIterator before the first row has been read, or the query has completed"""
2 changes: 0 additions & 2 deletions google/cloud/bigtable/data/execute_query/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
)
from google.cloud.bigtable.data.execute_query.metadata import (
Metadata,
ProtoMetadata,
SqlType,
)
from google.cloud.bigtable.data.execute_query.values import (
Expand All @@ -39,7 +38,6 @@
"QueryResultRow",
"Struct",
"Metadata",
"ProtoMetadata",
"ExecuteQueryIteratorAsync",
"ExecuteQueryIterator",
]
Loading
Loading