Skip to content

Commit b0d0176

Browse files
committed
feat: Implement updated execute query protocol
1 parent 5527282 commit b0d0176

File tree

14 files changed

+575
-416
lines changed

14 files changed

+575
-416
lines changed

google/cloud/bigtable/data/execute_query/_async/execute_query_iterator.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ def __init__(
9191
"""
9292
Collects responses from ExecuteQuery requests and parses them into QueryResultRows.
9393
94+
**Please Note** this is not meant to be constructed directly by applications. It should always
95+
be created via the client. The constructor is subject to change.
96+
9497
It is **not thread-safe**. It should not be used by multiple {TASK_OR_THREAD}.
9598
9699
Args:
@@ -106,6 +109,7 @@ def __init__(
106109
retryable_excs: a list of errors that will be retried if encountered.
107110
Raises:
108111
{NO_LOOP}
112+
:class:`ValueError <exceptions.ValueError>` as a safeguard if data is processed in an unexpected state
109113
"""
110114
self._table_name = None
111115
self._app_profile_id = app_profile_id
@@ -217,6 +221,11 @@ async def _next_impl(self) -> CrossSync.Iterator[QueryResultRow]:
217221

218222
@CrossSync.convert(sync_name="__next__", replace_symbols={"__anext__": "__next__"})
219223
async def __anext__(self) -> QueryResultRow:
224+
"""
225+
Yields QueryResultRows representing the results of the query.
226+
227+
:raises: :class:`ValueError <exceptions.ValueError>` as a safeguard if data is processed in an unexpected state
228+
"""
220229
if self._is_closed:
221230
raise CrossSync.StopIteration
222231
return await self._result_generator.__anext__()

google/cloud/bigtable/data/execute_query/_byte_cursor.py

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Optional
15+
from typing import List, Optional
1616

17+
from google.cloud.bigtable.data.execute_query._checksum import _CRC32C
1718
from google.cloud.bigtable_v2 import ExecuteQueryResponse
1819

1920

@@ -31,9 +32,14 @@ class _ByteCursor:
3132
"""
3233

3334
def __init__(self):
34-
self._buffer = bytearray()
35+
self._batch_buffer = bytearray()
36+
self._batches: List[bytes] = []
3537
self._resume_token = None
3638

39+
def reset(self):
40+
self._batch_buffer = bytearray()
41+
self._batches = []
42+
3743
def prepare_for_new_request(self):
3844
"""
3945
Prepares this ``_ByteCursor`` for retrying an ``ExecuteQuery`` request.
@@ -50,13 +56,15 @@ def prepare_for_new_request(self):
5056
Returns:
5157
bytes: Last received resume_token.
5258
"""
53-
self._buffer = bytearray()
59+
# The first response of any retried stream will always contain reset, so
60+
# this isn't actually necessary, but we do it for safety
61+
self.reset()
5462
return self._resume_token
5563

5664
def empty(self) -> bool:
57-
return len(self._buffer) == 0
65+
return not self._batch_buffer and not self._batches
5866

59-
def consume(self, response: ExecuteQueryResponse) -> Optional[bytes]:
67+
def consume(self, response: ExecuteQueryResponse) -> Optional[List[bytes]]:
6068
"""
6169
Reads results bytes from an ``ExecuteQuery`` response and adds them to a buffer.
6270
@@ -72,7 +80,8 @@ def consume(self, response: ExecuteQueryResponse) -> Optional[bytes]:
7280
Response obtained from the stream.
7381
7482
Returns:
75-
bytes or None: bytes if buffers were flushed or None otherwise.
83+
bytes or None: List of bytes if buffers were flushed or None otherwise.
84+
Each element in the list represents the bytes of a `ProtoRows` message.
7685
7786
Raises:
7887
ValueError: If provided ``ExecuteQueryResponse`` is not valid
@@ -83,15 +92,31 @@ def consume(self, response: ExecuteQueryResponse) -> Optional[bytes]:
8392

8493
if response_pb.HasField("results"):
8594
results = response_pb.results
95+
if results.reset:
96+
self.reset()
8697
if results.HasField("proto_rows_batch"):
87-
self._buffer.extend(results.proto_rows_batch.batch_data)
98+
self._batch_buffer.extend(results.proto_rows_batch.batch_data)
99+
# Note that 0 is a valid checksum so we must check for field presence
100+
if results.HasField("batch_checksum"):
101+
expected_checksum = results.batch_checksum
102+
checksum = _CRC32C.checksum(self._batch_buffer)
103+
if expected_checksum != checksum:
104+
raise ValueError(
105+
f"Unexpected checksum mismatch. Expected: {expected_checksum}, got: {checksum}"
106+
)
107+
# We have a complete batch so we move it to batches and reset the
108+
# batch_buffer
109+
self._batches.append(memoryview(self._batch_buffer))
110+
self._batch_buffer = bytearray()
88111

89112
if results.resume_token:
90113
self._resume_token = results.resume_token
91114

92-
if self._buffer:
93-
return_value = memoryview(self._buffer)
94-
self._buffer = bytearray()
115+
if self._batches:
116+
if self._batch_buffer:
117+
raise ValueError("Unexpected resume_token without checksum")
118+
return_value = self._batches
119+
self._batches = []
95120
return return_value
96121
else:
97122
raise ValueError(f"Unexpected ExecuteQueryResponse: {response}")
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2025 Google LLC
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
import warnings
17+
18+
with warnings.catch_warnings(record=True) as import_warning:
19+
import google_crc32c # type: ignore
20+
21+
22+
class _CRC32C(object):
23+
"""
24+
Wrapper around ``google_crc32c`` library
25+
"""
26+
27+
warn_emitted = False
28+
29+
@classmethod
30+
def checksum(cls, val: bytearray) -> int:
31+
"""
32+
Returns the crc32c checksum of the data.
33+
"""
34+
if import_warning and not cls.warn_emitted:
35+
cls.warn_emitted = True
36+
warnings.warn(
37+
"Using pure python implementation of `google-crc32` for ExecuteQuery response "
38+
"validation. This is significantly slower than the c extension. If possible, "
39+
"run in an environment that supports the c extension.",
40+
RuntimeWarning,
41+
)
42+
memory_view = memoryview(val)
43+
return google_crc32c.value(bytes(memory_view))

google/cloud/bigtable/data/execute_query/_reader.py

Lines changed: 22 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from typing import (
16+
List,
1617
TypeVar,
1718
Generic,
1819
Iterable,
@@ -54,14 +55,13 @@ class _Reader(ABC, Generic[T]):
5455

5556
@abstractmethod
5657
def consume(
57-
self, bytes_to_consume: bytes, metadata: Metadata
58+
self, batches_to_consume: List[bytes], metadata: Metadata
5859
) -> Optional[Iterable[T]]:
59-
"""This method receives a parsable chunk of bytes and returns either a None if there is
60-
not enough chunks to return to the user yet (e.g. we haven't received all columns in a
61-
row yet), or a list of appropriate values gathered from one or more parsable chunks.
62-
60+
"""This method receives a list of batches of bytes to be parsed as ProtoRows messages.
61+
It then uses the metadata to group the values in the parsed messages into rows. Returns
62+
None if batches_to_consume is empty
6363
Args:
64-
bytes_to_consume (bytes): chunk of parsable bytes received from
64+
bytes_to_consume (bytes): chunk of parsable byte batches received from
6565
:meth:`google.cloud.bigtable.byte_cursor._ByteCursor.consume`
6666
method.
6767
metadata: metadata used to transform values to rows
@@ -85,11 +85,9 @@ class _QueryResultRowReader(_Reader[QueryResultRow]):
8585
:class:`google.cloud.bigtable.byte_cursor._ByteCursor` passed in the constructor.
8686
"""
8787

88-
def __init__(self):
89-
"""
90-
Constructs new instance of ``_QueryResultRowReader``.
91-
"""
92-
self._values = []
88+
def _parse_proto_rows(self, bytes_to_parse: bytes) -> Iterable[PBValue]:
89+
proto_rows = ProtoRows.pb().FromString(bytes_to_parse)
90+
return proto_rows.values
9391

9492
def _construct_query_result_row(
9593
self, values: Sequence[PBValue], metadata: ProtoMetadata
@@ -106,36 +104,23 @@ def _construct_query_result_row(
106104
result.add_field(column.column_name, parsed_value)
107105
return result
108106

109-
def _parse_proto_rows(self, bytes_to_parse: bytes) -> Iterable[PBValue]:
110-
proto_rows = ProtoRows.pb().FromString(bytes_to_parse)
111-
return proto_rows.values
112-
113107
def consume(
114-
self, bytes_to_consume: bytes, metadata: Metadata
108+
self, batches_to_consume: List[bytes], metadata: Metadata
115109
) -> Optional[Iterable[QueryResultRow]]:
116-
if bytes_to_consume is None:
117-
raise ValueError("bytes_to_consume shouldn't be None")
118-
119-
self._values.extend(self._parse_proto_rows(bytes_to_consume))
120-
121-
# The logic, not defined by mypy types, ensures that the value of
122-
# "metadata" is never null at the time it is retrieved here
123110
proto_metadata = cast(ProtoMetadata, metadata)
124111
num_columns = len(proto_metadata.columns)
125-
126-
if len(self._values) < num_columns:
127-
return None
128-
129112
rows = []
130-
for batch in batched(self._values, n=num_columns):
131-
if len(batch) == num_columns:
132-
rows.append(self._construct_query_result_row(batch, proto_metadata))
133-
else:
134-
raise ValueError(
135-
"Server error, recieved bad number of values. "
136-
f"Expected {num_columns} got {len(batch)}."
137-
)
138-
139-
self._values = []
113+
for batch_bytes in batches_to_consume:
114+
values = self._parse_proto_rows(batch_bytes)
115+
for row_data in batched(values, n=num_columns):
116+
if len(row_data) == num_columns:
117+
rows.append(
118+
self._construct_query_result_row(row_data, proto_metadata)
119+
)
120+
else:
121+
raise ValueError(
122+
"Unexpected error, recieved bad number of values. "
123+
f"Expected {num_columns} got {len(row_data)}."
124+
)
140125

141126
return rows

google/cloud/bigtable/data/execute_query/_sync_autogen/execute_query_iterator.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ def __init__(
6666
) -> None:
6767
"""Collects responses from ExecuteQuery requests and parses them into QueryResultRows.
6868
69+
**Please Note** this is not meant to be constructed directly by applications. It should always
70+
be created via the client. The constructor is subject to change.
71+
6972
It is **not thread-safe**. It should not be used by multiple threads.
7073
7174
Args:
@@ -80,7 +83,9 @@ def __init__(
8083
req_metadata: metadata used while sending the gRPC request
8184
retryable_excs: a list of errors that will be retried if encountered.
8285
Raises:
83-
None"""
86+
None
87+
:class:`ValueError <exceptions.ValueError>` as a safeguard if data is processed in an unexpected state
88+
"""
8489
self._table_name = None
8590
self._app_profile_id = app_profile_id
8691
self._client = client
@@ -173,6 +178,10 @@ def _next_impl(self) -> CrossSync._Sync_Impl.Iterator[QueryResultRow]:
173178
self.close()
174179

175180
def __next__(self) -> QueryResultRow:
181+
"""Yields QueryResultRows representing the results of the query.
182+
183+
:raises: :class:`ValueError <exceptions.ValueError>` as a safeguard if data is processed in an unexpected state
184+
"""
176185
if self._is_closed:
177186
raise CrossSync._Sync_Impl.StopIteration
178187
return self._result_generator.__next__()

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
"proto-plus >= 1.22.3, <2.0.0dev",
4545
"proto-plus >= 1.25.0, <2.0.0dev; python_version>='3.13'",
4646
"protobuf>=3.20.2,<6.0.0dev,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5",
47+
"google-crc32c>=1.5.0, <2.0.0dev",
4748
]
4849
extras = {"libcst": "libcst >= 0.2.5"}
4950

0 commit comments

Comments
 (0)