Skip to content

Commit 56f5357

Browse files
authored
fix: Prevent sending full table scan when retrying (#554)
* fix: Prevent sending full table scan when retrying Update the retry logic. Don't send empty row_key and empty row_ranges if the original message didn't ask for those. Closes internal issue 214449800 * Create InvalidRetryRequest exception. Raise InvalidRetryRequest instead of StopIteration Catch the InvalidRetryRequest Handle stop the retry request if row_limit has been reached. * Improve test coverage * Improve test coverage
1 parent ec7cc42 commit 56f5357

File tree

3 files changed

+150
-12
lines changed

3 files changed

+150
-12
lines changed

google/cloud/bigtable/row_data.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,11 +321,15 @@ def cell_values(self, column_family_id, column, max_count=None):
321321

322322

323323
class InvalidReadRowsResponse(RuntimeError):
324-
"""Exception raised to to invalid response data from back-end."""
324+
"""Exception raised to invalid response data from back-end."""
325325

326326

327327
class InvalidChunk(RuntimeError):
328-
"""Exception raised to to invalid chunk data from back-end."""
328+
"""Exception raised to invalid chunk data from back-end."""
329+
330+
331+
class InvalidRetryRequest(RuntimeError):
332+
"""Exception raised when retry request is invalid."""
329333

330334

331335
def _retry_read_rows_exception(exc):
@@ -486,6 +490,9 @@ def __iter__(self):
486490
if self.state != self.NEW_ROW:
487491
raise ValueError("The row remains partial / is not committed.")
488492
break
493+
except InvalidRetryRequest:
494+
self._cancelled = True
495+
break
489496

490497
for chunk in response.chunks:
491498
if self._cancelled:
@@ -629,10 +636,11 @@ def build_updated_request(self):
629636
data_messages_v2_pb2.ReadRowsRequest.copy_from(resume_request, self.message)
630637

631638
if self.message.rows_limit != 0:
632-
# TODO: Throw an error if rows_limit - read_so_far is 0 or negative.
633-
resume_request.rows_limit = max(
634-
1, self.message.rows_limit - self.rows_read_so_far
635-
)
639+
row_limit_remaining = self.message.rows_limit - self.rows_read_so_far
640+
if row_limit_remaining > 0:
641+
resume_request.rows_limit = row_limit_remaining
642+
else:
643+
raise InvalidRetryRequest
636644

637645
# if neither RowSet.row_keys nor RowSet.row_ranges currently exist,
638646
# add row_range that starts with last_scanned_key as start_key_open
@@ -643,6 +651,12 @@ def build_updated_request(self):
643651
else:
644652
row_keys = self._filter_rows_keys()
645653
row_ranges = self._filter_row_ranges()
654+
655+
if len(row_keys) == 0 and len(row_ranges) == 0:
656+
# Avoid sending empty row_keys and row_ranges
657+
# if that was not the intention
658+
raise InvalidRetryRequest
659+
646660
resume_request.rows = data_v2_pb2.RowSet(
647661
row_keys=row_keys, row_ranges=row_ranges
648662
)

tests/unit/test_row_data.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,18 @@ def test_RRRM__filter_row_key():
810810
assert expected_row_keys == row_keys
811811

812812

813+
def test_RRRM__filter_row_key_is_empty():
814+
table_name = "table_name"
815+
request = _ReadRowsRequestPB(table_name=table_name)
816+
request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key3", b"row_key4"])
817+
818+
last_scanned_key = b"row_key4"
819+
request_manager = _make_read_rows_request_manager(request, last_scanned_key, 4)
820+
row_keys = request_manager._filter_rows_keys()
821+
822+
assert row_keys == []
823+
824+
813825
def test_RRRM__filter_row_ranges_all_ranges_added_back(rrrm_data):
814826
from google.cloud.bigtable_v2.types import data as data_v2_pb2
815827

@@ -1036,6 +1048,76 @@ def test_RRRM__key_already_read():
10361048
assert not request_manager._key_already_read(b"row_key16")
10371049

10381050

1051+
def test_RRRM__rows_limit_reached():
1052+
from google.cloud.bigtable.row_data import InvalidRetryRequest
1053+
1054+
last_scanned_key = b"row_key14"
1055+
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
1056+
request.rows_limit = 2
1057+
request_manager = _make_read_rows_request_manager(
1058+
request, last_scanned_key=last_scanned_key, rows_read_so_far=2
1059+
)
1060+
with pytest.raises(InvalidRetryRequest):
1061+
request_manager.build_updated_request()
1062+
1063+
1064+
def test_RRRM_build_updated_request_last_row_read_raises_invalid_retry_request():
1065+
from google.cloud.bigtable.row_data import InvalidRetryRequest
1066+
1067+
last_scanned_key = b"row_key4"
1068+
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
1069+
request.rows.row_keys.extend([b"row_key1", b"row_key2", b"row_key4"])
1070+
1071+
request_manager = _make_read_rows_request_manager(
1072+
request, last_scanned_key, rows_read_so_far=3
1073+
)
1074+
with pytest.raises(InvalidRetryRequest):
1075+
request_manager.build_updated_request()
1076+
1077+
1078+
def test_RRRM_build_updated_request_row_ranges_read_raises_invalid_retry_request():
1079+
from google.cloud.bigtable.row_data import InvalidRetryRequest
1080+
from google.cloud.bigtable import row_set
1081+
1082+
row_range1 = row_set.RowRange(b"row_key21", b"row_key29")
1083+
1084+
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
1085+
request.rows.row_ranges.append(row_range1.get_range_kwargs())
1086+
1087+
last_scanned_key = b"row_key4"
1088+
request = _ReadRowsRequestPB(
1089+
table_name=TABLE_NAME,
1090+
)
1091+
request.rows.row_ranges.append(row_range1.get_range_kwargs())
1092+
1093+
request_manager = _make_read_rows_request_manager(
1094+
request, last_scanned_key, rows_read_so_far=2
1095+
)
1096+
with pytest.raises(InvalidRetryRequest):
1097+
request_manager.build_updated_request()
1098+
1099+
1100+
def test_RRRM_build_updated_request_row_ranges_valid():
1101+
from google.cloud.bigtable import row_set
1102+
1103+
row_range1 = row_set.RowRange(b"row_key21", b"row_key29")
1104+
1105+
request = _ReadRowsRequestPB(table_name=TABLE_NAME)
1106+
request.rows.row_ranges.append(row_range1.get_range_kwargs())
1107+
1108+
last_scanned_key = b"row_key21"
1109+
request = _ReadRowsRequestPB(
1110+
table_name=TABLE_NAME,
1111+
)
1112+
request.rows.row_ranges.append(row_range1.get_range_kwargs())
1113+
1114+
request_manager = _make_read_rows_request_manager(
1115+
request, last_scanned_key, rows_read_so_far=1
1116+
)
1117+
updated_request = request_manager.build_updated_request()
1118+
assert len(updated_request.rows.row_ranges) > 0
1119+
1120+
10391121
@pytest.fixture(scope="session")
10401122
def json_tests():
10411123
dirname = os.path.dirname(__file__)

tests/unit/test_table.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,6 @@ def mock_create_row_request(table_name, **kwargs):
910910

911911
def test_table_read_retry_rows():
912912
from google.api_core import retry
913-
from google.cloud.bigtable.table import _create_row_request
914913

915914
credentials = _make_credentials()
916915
client = _make_client(project="project-id", credentials=credentials, admin=True)
@@ -965,12 +964,55 @@ def test_table_read_retry_rows():
965964
result = rows[1]
966965
assert result.row_key == ROW_KEY_2
967966

968-
expected_request = _create_row_request(
969-
table.name,
970-
start_key=ROW_KEY_1,
971-
end_key=ROW_KEY_2,
967+
assert len(data_api.read_rows.mock_calls) == 3
968+
969+
970+
def test_table_read_retry_rows_no_full_table_scan():
971+
from google.api_core import retry
972+
973+
credentials = _make_credentials()
974+
client = _make_client(project="project-id", credentials=credentials, admin=True)
975+
data_api = client._table_data_client = _make_data_api()
976+
instance = client.instance(instance_id=INSTANCE_ID)
977+
table = _make_table(TABLE_ID, instance)
978+
979+
retry_read_rows = retry.Retry(predicate=_read_rows_retry_exception)
980+
981+
# Create response_iterator
982+
chunk_1 = _ReadRowsResponseCellChunkPB(
983+
row_key=ROW_KEY_2,
984+
family_name=FAMILY_NAME,
985+
qualifier=QUALIFIER,
986+
timestamp_micros=TIMESTAMP_MICROS,
987+
value=VALUE,
988+
commit_row=True,
972989
)
973-
data_api.read_rows.mock_calls = [expected_request] * 3
990+
991+
response_1 = _ReadRowsResponseV2([chunk_1])
992+
response_failure_iterator_2 = _MockFailureIterator_2([response_1])
993+
994+
data_api.table_path.return_value = (
995+
f"projects/{PROJECT_ID}/instances/{INSTANCE_ID}/tables/{TABLE_ID}"
996+
)
997+
998+
data_api.read_rows.side_effect = [
999+
response_failure_iterator_2,
1000+
]
1001+
1002+
rows = [
1003+
row
1004+
for row in table.read_rows(
1005+
start_key="doesn't matter", end_key=ROW_KEY_2, retry=retry_read_rows
1006+
)
1007+
]
1008+
assert len(rows) == 1
1009+
result = rows[0]
1010+
assert result.row_key == ROW_KEY_2
1011+
1012+
assert len(data_api.read_rows.mock_calls) == 1
1013+
assert (
1014+
len(data_api.read_rows.mock_calls[0].args[0].rows.row_ranges) > 0
1015+
) # not empty row_ranges
9741016

9751017

9761018
def test_table_yield_retry_rows():

0 commit comments

Comments
 (0)