Skip to content

Commit b498978

Browse files
authored
Respect timeout on client.read_rows. Don't resume on DEADLINE_EXCEEDED errors. (#8025)
* Respect timeout on `client.read_rows`. Don't resume on `DEADLINE_EXCEEDED` errors. DEADLINE_EXCEEDED should not be resumed. Currently, you can set a timeout on reads, but the reader will reconnect after the timeout value finishes. Always resuming the stream masked internal issue 132959978 from clients, where the default deadline is much too low for read streams. Wait for the deadline configuration fix for read rows to go in before merging. * Blacken
1 parent df93a68 commit b498978

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@
3232
from google.cloud.bigquery_storage_v1beta1 import types
3333

3434

35-
_STREAM_RESUMPTION_EXCEPTIONS = (
36-
google.api_core.exceptions.DeadlineExceeded,
37-
google.api_core.exceptions.ServiceUnavailable,
38-
)
35+
_STREAM_RESUMPTION_EXCEPTIONS = (google.api_core.exceptions.ServiceUnavailable,)
3936
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"
4037
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"
4138

bigquery_storage/tests/unit/test_reader.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,16 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json):
131131
return avro_blocks
132132

133133

134+
def _avro_blocks_w_unavailable(avro_blocks):
135+
for block in avro_blocks:
136+
yield block
137+
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")
138+
139+
134140
def _avro_blocks_w_deadline(avro_blocks):
135141
for block in avro_blocks:
136142
yield block
137-
raise google.api_core.exceptions.DeadlineExceeded("test: please reconnect")
143+
raise google.api_core.exceptions.DeadlineExceeded("test: timeout, don't reconnect")
138144

139145

140146
def _generate_read_session(avro_schema_json):
@@ -205,7 +211,7 @@ def test_rows_w_scalars(class_under_test, mock_client):
205211
assert got == expected
206212

207213

208-
def test_rows_w_reconnect(class_under_test, mock_client):
214+
def test_rows_w_timeout(class_under_test, mock_client):
209215
bq_columns = [{"name": "int_col", "type": "int64"}]
210216
avro_schema = _bq_to_avro_schema(bq_columns)
211217
read_session = _generate_read_session(avro_schema)
@@ -219,6 +225,40 @@ def test_rows_w_reconnect(class_under_test, mock_client):
219225
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
220226
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
221227

228+
mock_client.read_rows.return_value = avro_blocks_2
229+
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
230+
stream={"name": "test"}
231+
)
232+
233+
reader = class_under_test(
234+
avro_blocks_1,
235+
mock_client,
236+
stream_position,
237+
{"metadata": {"test-key": "test-value"}},
238+
)
239+
240+
with pytest.raises(google.api_core.exceptions.DeadlineExceeded):
241+
list(reader.rows(read_session))
242+
243+
# Don't reconnect on DeadlineException. This allows user-specified timeouts
244+
# to be respected.
245+
mock_client.read_rows.assert_not_called()
246+
247+
248+
def test_rows_w_reconnect(class_under_test, mock_client):
249+
bq_columns = [{"name": "int_col", "type": "int64"}]
250+
avro_schema = _bq_to_avro_schema(bq_columns)
251+
read_session = _generate_read_session(avro_schema)
252+
bq_blocks_1 = [
253+
[{"int_col": 123}, {"int_col": 234}],
254+
[{"int_col": 345}, {"int_col": 456}],
255+
]
256+
avro_blocks_1 = _avro_blocks_w_unavailable(
257+
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
258+
)
259+
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
260+
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
261+
222262
for block in avro_blocks_2:
223263
block.status.estimated_row_count = 7
224264

@@ -275,7 +315,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client):
275315
)
276316

277317
reader = class_under_test(
278-
_avro_blocks_w_deadline(avro_blocks_1),
318+
_avro_blocks_w_unavailable(avro_blocks_1),
279319
mock_client,
280320
stream_position,
281321
{"metadata": {"test-key": "test-value"}},
@@ -436,7 +476,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client):
436476
)
437477

438478
reader = class_under_test(
439-
_avro_blocks_w_deadline(avro_blocks_1),
479+
_avro_blocks_w_unavailable(avro_blocks_1),
440480
mock_client,
441481
stream_position,
442482
{"metadata": {"test-key": "test-value"}},

0 commit comments

Comments
 (0)