Skip to content

Commit 35e0349

Browse files
juan-raeltseaver
authored andcommitted
Add 'Table.mutation_timeout', allowing override of config timeouts. (#7424)
1 parent 411feea commit 35e0349

File tree

4 files changed

+24
-60
lines changed

4 files changed

+24
-60
lines changed

bigtable/google/cloud/bigtable/batcher.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ class MutationsBatcher(object):
5656
(5 MB).
5757
"""
5858

59-
def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
59+
def __init__(
60+
self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES
61+
):
6062
self.rows = []
6163
self.total_mutation_count = 0
6264
self.total_size = 0

bigtable/google/cloud/bigtable/instance.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ def list_clusters(self):
538538
clusters = [Cluster.from_pb(cluster, self) for cluster in resp.clusters]
539539
return clusters, resp.failed_locations
540540

541-
def table(self, table_id, app_profile_id=None):
541+
def table(self, table_id, mutation_timeout=None, app_profile_id=None):
542542
"""Factory to create a table associated with this instance.
543543
544544
For example:
@@ -556,7 +556,12 @@ def table(self, table_id, app_profile_id=None):
556556
:rtype: :class:`Table <google.cloud.bigtable.table.Table>`
557557
:returns: The table owned by this instance.
558558
"""
559-
return Table(table_id, self, app_profile_id=app_profile_id)
559+
return Table(
560+
table_id,
561+
self,
562+
app_profile_id=app_profile_id,
563+
mutation_timeout=mutation_timeout,
564+
)
560565

561566
def list_tables(self):
562567
"""List the tables in this instance.

bigtable/google/cloud/bigtable/table.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from grpc import StatusCode
1919

20+
from google.api_core import timeout
2021
from google.api_core.exceptions import RetryError
2122
from google.api_core.exceptions import NotFound
2223
from google.api_core.retry import if_exception_type
@@ -100,10 +101,11 @@ class Table(object):
100101
:param app_profile_id: (Optional) The unique name of the AppProfile.
101102
"""
102103

103-
def __init__(self, table_id, instance, app_profile_id=None):
104+
def __init__(self, table_id, instance, mutation_timeout=None, app_profile_id=None):
104105
self.table_id = table_id
105106
self._instance = instance
106107
self._app_profile_id = app_profile_id
108+
self.mutation_timeout = mutation_timeout
107109

108110
@property
109111
def name(self):
@@ -503,7 +505,11 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY):
503505
sent. These will be in the same order as the `rows`.
504506
"""
505507
retryable_mutate_rows = _RetryableMutateRowsWorker(
506-
self._instance._client, self.name, rows, app_profile_id=self._app_profile_id
508+
self._instance._client,
509+
self.name,
510+
rows,
511+
app_profile_id=self._app_profile_id,
512+
timeout=self.mutation_timeout,
507513
)
508514
return retryable_mutate_rows(retry=retry)
509515

@@ -658,12 +664,13 @@ class _RetryableMutateRowsWorker(object):
658664
)
659665
# pylint: enable=unsubscriptable-object
660666

661-
def __init__(self, client, table_name, rows, app_profile_id=None):
667+
def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None):
662668
self.client = client
663669
self.table_name = table_name
664670
self.rows = rows
665671
self.app_profile_id = app_profile_id
666672
self.responses_statuses = [None] * len(self.rows)
673+
self.timeout = timeout
667674

668675
def __call__(self, retry=DEFAULT_RETRY):
669676
"""Attempt to mutate all rows and retry rows with transient errors.
@@ -729,7 +736,10 @@ def _do_mutate_retryable_rows(self):
729736
inner_api_calls = data_client._inner_api_calls
730737
if "mutate_rows" not in inner_api_calls:
731738
default_retry = (data_client._method_configs["MutateRows"].retry,)
732-
default_timeout = data_client._method_configs["MutateRows"].timeout
739+
if self.timeout is None:
740+
default_timeout = data_client._method_configs["MutateRows"].timeout
741+
else:
742+
default_timeout = timeout.ExponentialTimeout(deadline=self.timeout)
733743
data_client._inner_api_calls["mutate_rows"] = wrap_method(
734744
data_client.transport.mutate_rows,
735745
default_retry=default_retry,

bigtable/tests/unit/test_table.py

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,59 +1205,6 @@ def test_callable_retry(self):
12051205
)
12061206
self.assertEqual(result, expected_result)
12071207

1208-
def test_callable_retry_timeout(self):
1209-
from google.cloud.bigtable.row import DirectRow
1210-
from google.cloud.bigtable.table import DEFAULT_RETRY
1211-
from google.cloud.bigtable_v2.gapic import bigtable_client
1212-
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
1213-
1214-
# Setup:
1215-
# - Mutate 2 rows.
1216-
# Action:
1217-
# - Initial attempt will mutate all 2 rows.
1218-
# Expectation:
1219-
# - Both rows always return retryable errors.
1220-
# - google.api_core.Retry should keep retrying.
1221-
# - Check MutateRows is called multiple times.
1222-
# - By the time deadline is reached, statuses should be
1223-
# [retryable, retryable]
1224-
1225-
data_api = bigtable_client.BigtableClient(mock.Mock())
1226-
table_api = bigtable_table_admin_client.BigtableTableAdminClient(mock.Mock())
1227-
credentials = _make_credentials()
1228-
client = self._make_client(
1229-
project="project-id", credentials=credentials, admin=True
1230-
)
1231-
client._table_data_client = data_api
1232-
client._table_admin_client = table_api
1233-
instance = client.instance(instance_id=self.INSTANCE_ID)
1234-
table = self._make_table(self.TABLE_ID, instance)
1235-
1236-
row_1 = DirectRow(row_key=b"row_key", table=table)
1237-
row_1.set_cell("cf", b"col", b"value1")
1238-
row_2 = DirectRow(row_key=b"row_key_2", table=table)
1239-
row_2.set_cell("cf", b"col", b"value2")
1240-
1241-
response = self._make_responses([self.RETRYABLE_1, self.RETRYABLE_1])
1242-
1243-
# Patch the stub used by the API method.
1244-
inner_api_calls = client._table_data_client._inner_api_calls
1245-
inner_api_calls["mutate_rows"] = mock.Mock(return_value=[response])
1246-
1247-
retry = DEFAULT_RETRY.with_delay(
1248-
initial=0.1, maximum=0.2, multiplier=2.0
1249-
).with_deadline(0.5)
1250-
worker = self._make_worker(client, table.name, [row_1, row_2])
1251-
statuses = worker(retry=retry)
1252-
1253-
result = [status.code for status in statuses]
1254-
expected_result = [self.RETRYABLE_1, self.RETRYABLE_1]
1255-
1256-
self.assertTrue(
1257-
client._table_data_client._inner_api_calls["mutate_rows"].call_count > 1
1258-
)
1259-
self.assertEqual(result, expected_result)
1260-
12611208
def test_do_mutate_retryable_rows_empty_rows(self):
12621209
from google.cloud.bigtable_admin_v2.gapic import bigtable_table_admin_client
12631210

0 commit comments

Comments
 (0)