Skip to content

Commit 374e9ac

Browse files
author
Juan Rael
committed
Custom Timeout in Table Mutation
1 parent cf545e7 commit 374e9ac

File tree

5 files changed

+98
-118
lines changed

5 files changed

+98
-118
lines changed

bigtable/docs/snippets_table.py

Lines changed: 73 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,24 @@
3939
from google.cloud.bigtable import column_family
4040

4141

42-
INSTANCE_ID = "snippet-" + unique_resource_id('-')
43-
CLUSTER_ID = "clus-1-" + unique_resource_id('-')
44-
TABLE_ID = "tabl-1-" + unique_resource_id('-')
45-
COLUMN_FAMILY_ID = "col_fam_id-" + unique_resource_id('-')
46-
LOCATION_ID = 'us-central1-f'
47-
ALT_LOCATION_ID = 'us-central1-a'
42+
INSTANCE_ID = "snippet-" + unique_resource_id("-")
43+
CLUSTER_ID = "clus-1-" + unique_resource_id("-")
44+
TABLE_ID = "tabl-1-" + unique_resource_id("-")
45+
COLUMN_FAMILY_ID = "col_fam_id-" + unique_resource_id("-")
46+
LOCATION_ID = "us-central1-f"
47+
ALT_LOCATION_ID = "us-central1-a"
4848
PRODUCTION = enums.Instance.Type.PRODUCTION
4949
SERVER_NODES = 3
5050
STORAGE_TYPE = enums.StorageType.SSD
51-
LABEL_KEY = u'python-snippet'
52-
LABEL_STAMP = datetime.datetime.utcnow() \
53-
.replace(microsecond=0, tzinfo=UTC,) \
54-
.strftime("%Y-%m-%dt%H-%M-%S")
51+
LABEL_KEY = u"python-snippet"
52+
LABEL_STAMP = (
53+
datetime.datetime.utcnow()
54+
.replace(microsecond=0, tzinfo=UTC)
55+
.strftime("%Y-%m-%dt%H-%M-%S")
56+
)
5557
LABELS = {LABEL_KEY: str(LABEL_STAMP)}
56-
COL_NAME1 = b'col-name1'
57-
CELL_VAL1 = b'cell-val'
58+
COL_NAME1 = b"col-name1"
59+
CELL_VAL1 = b"cell-val"
5860

5961

6062
class Config(object):
@@ -63,28 +65,30 @@ class Config(object):
6365
This is a mutable stand-in to allow test set-up to modify
6466
global state.
6567
"""
68+
6669
CLIENT = None
6770
INSTANCE = None
6871
TABLE = None
6972

7073

7174
def setup_module():
7275
client = Config.CLIENT = Client(admin=True)
73-
Config.INSTANCE = client.instance(INSTANCE_ID,
74-
instance_type=PRODUCTION,
75-
labels=LABELS)
76-
cluster = Config.INSTANCE.cluster(CLUSTER_ID,
77-
location_id=LOCATION_ID,
78-
serve_nodes=SERVER_NODES,
79-
default_storage_type=STORAGE_TYPE)
76+
Config.INSTANCE = client.instance(
77+
INSTANCE_ID, instance_type=PRODUCTION, labels=LABELS
78+
)
79+
cluster = Config.INSTANCE.cluster(
80+
CLUSTER_ID,
81+
location_id=LOCATION_ID,
82+
serve_nodes=SERVER_NODES,
83+
default_storage_type=STORAGE_TYPE,
84+
)
8085
operation = Config.INSTANCE.create(clusters=[cluster])
8186
# We want to make sure the operation completes.
8287
operation.result(timeout=100)
8388
Config.TABLE = Config.INSTANCE.table(TABLE_ID)
8489
Config.TABLE.create()
8590
gc_rule = column_family.MaxVersionsGCRule(2)
86-
column_family1 = Config.TABLE.column_family(COLUMN_FAMILY_ID,
87-
gc_rule=gc_rule)
91+
column_family1 = Config.TABLE.column_family(COLUMN_FAMILY_ID, gc_rule=gc_rule)
8892
column_family1.create()
8993

9094

@@ -108,7 +112,7 @@ def test_bigtable_create_table():
108112
table2 = instance.table("table_id2")
109113
# Define the GC policy to retain only the most recent 2 versions.
110114
max_versions_rule = column_family.MaxVersionsGCRule(2)
111-
table2.create(column_families={'cf1': max_versions_rule})
115+
table2.create(column_families={"cf1": max_versions_rule})
112116

113117
# [END bigtable_create_table]
114118
assert table1.exists()
@@ -126,14 +130,13 @@ def test_bigtable_sample_row_keys():
126130

127131
table = instance.table("table_id1_samplerow")
128132
# [END bigtable_sample_row_keys]
129-
initial_split_keys = [b'split_key_1', b'split_key_10',
130-
b'split_key_20']
133+
initial_split_keys = [b"split_key_1", b"split_key_10", b"split_key_20"]
131134
table.create(initial_split_keys=initial_split_keys)
132135
# [START bigtable_sample_row_keys]
133136
data = table.sample_row_keys()
134137
actual_keys, offset = zip(*[(rk.row_key, rk.offset_bytes) for rk in data])
135138
# [END bigtable_sample_row_keys]
136-
initial_split_keys.append(b'')
139+
initial_split_keys.append(b"")
137140
assert list(actual_keys) == initial_split_keys
138141
table.delete()
139142

@@ -145,23 +148,29 @@ def test_bigtable_write_read_drop_truncate():
145148
client = Client(admin=True)
146149
instance = client.instance(INSTANCE_ID)
147150
table = instance.table(TABLE_ID)
148-
row_keys = [b'row_key_1', b'row_key_2', b'row_key_3', b'row_key_4',
149-
b'row_key_20', b'row_key_22', b'row_key_200']
150-
col_name = b'col-name1'
151+
row_keys = [
152+
b"row_key_1",
153+
b"row_key_2",
154+
b"row_key_3",
155+
b"row_key_4",
156+
b"row_key_20",
157+
b"row_key_22",
158+
b"row_key_200",
159+
]
160+
col_name = b"col-name1"
151161
rows = []
152162
for i, row_key in enumerate(row_keys):
153-
value = 'value_{}'.format(i).encode()
163+
value = "value_{}".format(i).encode()
154164
row = table.row(row_key)
155-
row.set_cell(COLUMN_FAMILY_ID,
156-
col_name,
157-
value,
158-
timestamp=datetime.datetime.utcnow())
165+
row.set_cell(
166+
COLUMN_FAMILY_ID, col_name, value, timestamp=datetime.datetime.utcnow()
167+
)
159168
rows.append(row)
160169
response = table.mutate_rows(rows)
161170
# validate that all rows written successfully
162171
for i, status in enumerate(response):
163172
if status.code is not 0:
164-
print('Row number {} failed to write'.format(i))
173+
print("Row number {} failed to write".format(i))
165174
# [END bigtable_mutate_rows]
166175
assert len(response) == len(rows)
167176
# [START bigtable_read_row]
@@ -170,10 +179,10 @@ def test_bigtable_write_read_drop_truncate():
170179
client = Client(admin=True)
171180
instance = client.instance(INSTANCE_ID)
172181
table = instance.table(TABLE_ID)
173-
row_key = 'row_key_1'
182+
row_key = "row_key_1"
174183
row = table.read_row(row_key)
175184
# [END bigtable_read_row]
176-
assert row.row_key.decode('utf-8') == row_key
185+
assert row.row_key.decode("utf-8") == row_key
177186
# [START bigtable_read_rows]
178187
from google.cloud.bigtable import Client
179188

@@ -192,13 +201,12 @@ def test_bigtable_write_read_drop_truncate():
192201
client = Client(admin=True)
193202
instance = client.instance(INSTANCE_ID)
194203
table = instance.table(TABLE_ID)
195-
row_key_prefix = b'row_key_2'
204+
row_key_prefix = b"row_key_2"
196205
table.drop_by_prefix(row_key_prefix, timeout=200)
197206
# [END bigtable_drop_by_prefix]
198-
dropped_row_keys = [b'row_key_2', b'row_key_20',
199-
b'row_key_22', b'row_key_200']
207+
dropped_row_keys = [b"row_key_2", b"row_key_20", b"row_key_22", b"row_key_200"]
200208
for row in table.read_rows():
201-
assert row.row_key.decode('utf-8') not in dropped_row_keys
209+
assert row.row_key.decode("utf-8") not in dropped_row_keys
202210

203211
# [START bigtable_truncate_table]
204212
from google.cloud.bigtable import Client
@@ -226,26 +234,31 @@ def test_bigtable_mutations_batcher():
226234

227235
# Below code will be used while creating batcher.py snippets.
228236
# So not removing this code as of now.
229-
row_keys = [b'row_key_1', b'row_key_2', b'row_key_3', b'row_key_4',
230-
b'row_key_20', b'row_key_22', b'row_key_200']
231-
column_name = 'column_name'.encode()
237+
row_keys = [
238+
b"row_key_1",
239+
b"row_key_2",
240+
b"row_key_3",
241+
b"row_key_4",
242+
b"row_key_20",
243+
b"row_key_22",
244+
b"row_key_200",
245+
]
246+
column_name = "column_name".encode()
232247
# Add a single row
233248
row_key = row_keys[0]
234249
row = table.row(row_key)
235-
row.set_cell(COLUMN_FAMILY_ID,
236-
column_name,
237-
'value-0',
238-
timestamp=datetime.datetime.utcnow())
250+
row.set_cell(
251+
COLUMN_FAMILY_ID, column_name, "value-0", timestamp=datetime.datetime.utcnow()
252+
)
239253
batcher.mutate(row)
240254
# Add a collections of rows
241255
rows = []
242256
for i in range(1, len(row_keys)):
243257
row = table.row(row_keys[i])
244-
value = 'value_{}'.format(i).encode()
245-
row.set_cell(COLUMN_FAMILY_ID,
246-
column_name,
247-
value,
248-
timestamp=datetime.datetime.utcnow())
258+
value = "value_{}".format(i).encode()
259+
row.set_cell(
260+
COLUMN_FAMILY_ID, column_name, value, timestamp=datetime.datetime.utcnow()
261+
)
249262
rows.append(row)
250263
batcher.mutate_rows(rows)
251264
# batcher will flush current batch if it
@@ -287,6 +300,7 @@ def test_bigtable_list_tables():
287300

288301
def test_bigtable_table_name():
289302
import re
303+
290304
# [START bigtable_table_name]
291305
from google.cloud.bigtable import Client
292306

@@ -296,9 +310,11 @@ def test_bigtable_table_name():
296310
table = instance.table(TABLE_ID)
297311
table_name = table.name
298312
# [END bigtable_table_name]
299-
_table_name_re = re.compile(r'^projects/(?P<project>[^/]+)/'
300-
r'instances/(?P<instance>[^/]+)/tables/'
301-
r'(?P<table_id>[_a-zA-Z0-9][-_.a-zA-Z0-9]*)$')
313+
_table_name_re = re.compile(
314+
r"^projects/(?P<project>[^/]+)/"
315+
r"instances/(?P<instance>[^/]+)/tables/"
316+
r"(?P<table_id>[_a-zA-Z0-9][-_.a-zA-Z0-9]*)$"
317+
)
302318
assert _table_name_re.match(table_name)
303319

304320

@@ -368,7 +384,7 @@ def test_bigtable_table_row():
368384
instance = client.instance(INSTANCE_ID)
369385
table = instance.table(TABLE_ID)
370386

371-
row_keys = [b'row_key_1', b'row_key_2']
387+
row_keys = [b"row_key_1", b"row_key_2"]
372388
row1_obj = table.row(row_keys[0])
373389
row2_obj = table.row(row_keys[1])
374390
# [END bigtable_table_row]
@@ -387,5 +403,5 @@ def test_bigtable_table_row():
387403
table.truncate(timeout=300)
388404

389405

390-
if __name__ == '__main__':
406+
if __name__ == "__main__":
391407
pytest.main()

bigtable/google/cloud/bigtable/batcher.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ class MutationsBatcher(object):
5656
(5 MB).
5757
"""
5858

59-
def __init__(self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES):
59+
def __init__(
60+
self, table, flush_count=FLUSH_COUNT, max_row_bytes=MAX_ROW_BYTES
61+
):
6062
self.rows = []
6163
self.total_mutation_count = 0
6264
self.total_size = 0
@@ -144,7 +146,7 @@ def mutate_rows(self, rows):
144146

145147
def flush(self):
146148
""" Sends the current. batch to Cloud Bigtable. """
147-
if len(self.rows) is not 0:
149+
if len(self.rows) != 0:
148150
self.table.mutate_rows(self.rows)
149151
self.total_mutation_count = 0
150152
self.total_size = 0

bigtable/google/cloud/bigtable/instance.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def list_clusters(self):
553553
clusters = [Cluster.from_pb(cluster, self) for cluster in resp.clusters]
554554
return clusters, resp.failed_locations
555555

556-
def table(self, table_id, app_profile_id=None):
556+
def table(self, table_id, mutation_timeout=None, app_profile_id=None):
557557
"""Factory to create a table associated with this instance.
558558
559559
For example:
@@ -571,7 +571,12 @@ def table(self, table_id, app_profile_id=None):
571571
:rtype: :class:`Table <google.cloud.bigtable.table.Table>`
572572
:returns: The table owned by this instance.
573573
"""
574-
return Table(table_id, self, app_profile_id=app_profile_id)
574+
return Table(
575+
table_id,
576+
self,
577+
app_profile_id=app_profile_id,
578+
mutation_timeout=mutation_timeout,
579+
)
575580

576581
def list_tables(self):
577582
"""List the tables in this instance.

bigtable/google/cloud/bigtable/table.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from grpc import StatusCode
1919

20+
from google.api_core import timeout
2021
from google.api_core.exceptions import RetryError
2122
from google.api_core.exceptions import NotFound
2223
from google.api_core.retry import if_exception_type
@@ -100,10 +101,11 @@ class Table(object):
100101
:param app_profile_id: (Optional) The unique name of the AppProfile.
101102
"""
102103

103-
def __init__(self, table_id, instance, app_profile_id=None):
104+
def __init__(self, table_id, instance, mutation_timeout=None, app_profile_id=None):
104105
self.table_id = table_id
105106
self._instance = instance
106107
self._app_profile_id = app_profile_id
108+
self.mutation_timeout = mutation_timeout
107109

108110
@property
109111
def name(self):
@@ -503,7 +505,11 @@ def mutate_rows(self, rows, retry=DEFAULT_RETRY):
503505
sent. These will be in the same order as the `rows`.
504506
"""
505507
retryable_mutate_rows = _RetryableMutateRowsWorker(
506-
self._instance._client, self.name, rows, app_profile_id=self._app_profile_id
508+
self._instance._client,
509+
self.name,
510+
rows,
511+
app_profile_id=self._app_profile_id,
512+
timeout=self.mutation_timeout,
507513
)
508514
return retryable_mutate_rows(retry=retry)
509515

@@ -658,12 +664,13 @@ class _RetryableMutateRowsWorker(object):
658664
)
659665
# pylint: enable=unsubscriptable-object
660666

661-
def __init__(self, client, table_name, rows, app_profile_id=None):
667+
def __init__(self, client, table_name, rows, app_profile_id=None, timeout=None):
662668
self.client = client
663669
self.table_name = table_name
664670
self.rows = rows
665671
self.app_profile_id = app_profile_id
666672
self.responses_statuses = [None] * len(self.rows)
673+
self.timeout = timeout
667674

668675
def __call__(self, retry=DEFAULT_RETRY):
669676
"""Attempt to mutate all rows and retry rows with transient errors.
@@ -729,7 +736,10 @@ def _do_mutate_retryable_rows(self):
729736
inner_api_calls = data_client._inner_api_calls
730737
if "mutate_rows" not in inner_api_calls:
731738
default_retry = (data_client._method_configs["MutateRows"].retry,)
732-
default_timeout = data_client._method_configs["MutateRows"].timeout
739+
if self.timeout is None:
740+
default_timeout = data_client._method_configs["MutateRows"].timeout
741+
else:
742+
default_timeout = timeout.ExponentialTimeout(deadline=self.timeout)
733743
data_client._inner_api_calls["mutate_rows"] = wrap_method(
734744
data_client.transport.mutate_rows,
735745
default_retry=default_retry,

0 commit comments

Comments
 (0)