Skip to content

Commit 114b096

Browse files
committed
Respect timeout on client.read_rows. Don't resume on DEADLINE_EXCEEDED errors.
DEADLINE_EXCEEDED should not be resumed. Currently, you can set a timeout on reads, but the reader will reconnect after the timeout value finishes. Always resuming the stream masked internal issue 132959978 from clients, where the default deadline is much too low for read streams. Wait for the deadline configuration fix for read rows to go in before merging.
1 parent 78ae0d5 commit 114b096

File tree

2 files changed

+44
-5
lines changed

2 files changed

+44
-5
lines changed

bigquery_storage/google/cloud/bigquery_storage_v1beta1/reader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434

3535
_STREAM_RESUMPTION_EXCEPTIONS = (
36-
google.api_core.exceptions.DeadlineExceeded,
3736
google.api_core.exceptions.ServiceUnavailable,
3837
)
3938
_FASTAVRO_REQUIRED = "fastavro is required to parse Avro blocks"

bigquery_storage/tests/unit/test_reader.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,16 @@ def _bq_to_avro_blocks(bq_blocks, avro_schema_json):
131131
return avro_blocks
132132

133133

134+
def _avro_blocks_w_unavailable(avro_blocks):
135+
for block in avro_blocks:
136+
yield block
137+
raise google.api_core.exceptions.ServiceUnavailable("test: please reconnect")
138+
139+
134140
def _avro_blocks_w_deadline(avro_blocks):
135141
for block in avro_blocks:
136142
yield block
137-
raise google.api_core.exceptions.DeadlineExceeded("test: please reconnect")
143+
raise google.api_core.exceptions.DeadlineExceeded("test: timeout, don't reconnect")
138144

139145

140146
def _generate_read_session(avro_schema_json):
@@ -205,7 +211,7 @@ def test_rows_w_scalars(class_under_test, mock_client):
205211
assert got == expected
206212

207213

208-
def test_rows_w_reconnect(class_under_test, mock_client):
214+
def test_rows_w_timeout(class_under_test, mock_client):
209215
bq_columns = [{"name": "int_col", "type": "int64"}]
210216
avro_schema = _bq_to_avro_schema(bq_columns)
211217
read_session = _generate_read_session(avro_schema)
@@ -219,6 +225,40 @@ def test_rows_w_reconnect(class_under_test, mock_client):
219225
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
220226
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
221227

228+
mock_client.read_rows.return_value = avro_blocks_2
229+
stream_position = bigquery_storage_v1beta1.types.StreamPosition(
230+
stream={"name": "test"}
231+
)
232+
233+
reader = class_under_test(
234+
avro_blocks_1,
235+
mock_client,
236+
stream_position,
237+
{"metadata": {"test-key": "test-value"}},
238+
)
239+
240+
with pytest.raises(google.api_core.exceptions.DeadlineExceeded):
241+
list(reader.rows(read_session))
242+
243+
# Don't reconnect on DeadlineException. This allows user-specified timeouts
244+
# to be respected.
245+
mock_client.read_rows.assert_not_called()
246+
247+
248+
def test_rows_w_reconnect(class_under_test, mock_client):
249+
bq_columns = [{"name": "int_col", "type": "int64"}]
250+
avro_schema = _bq_to_avro_schema(bq_columns)
251+
read_session = _generate_read_session(avro_schema)
252+
bq_blocks_1 = [
253+
[{"int_col": 123}, {"int_col": 234}],
254+
[{"int_col": 345}, {"int_col": 456}],
255+
]
256+
avro_blocks_1 = _avro_blocks_w_unavailable(
257+
_bq_to_avro_blocks(bq_blocks_1, avro_schema)
258+
)
259+
bq_blocks_2 = [[{"int_col": 567}, {"int_col": 789}], [{"int_col": 890}]]
260+
avro_blocks_2 = _bq_to_avro_blocks(bq_blocks_2, avro_schema)
261+
222262
for block in avro_blocks_2:
223263
block.status.estimated_row_count = 7
224264

@@ -275,7 +315,7 @@ def test_rows_w_reconnect_by_page(class_under_test, mock_client):
275315
)
276316

277317
reader = class_under_test(
278-
_avro_blocks_w_deadline(avro_blocks_1),
318+
_avro_blocks_w_unavailable(avro_blocks_1),
279319
mock_client,
280320
stream_position,
281321
{"metadata": {"test-key": "test-value"}},
@@ -436,7 +476,7 @@ def test_to_dataframe_by_page(class_under_test, mock_client):
436476
)
437477

438478
reader = class_under_test(
439-
_avro_blocks_w_deadline(avro_blocks_1),
479+
_avro_blocks_w_unavailable(avro_blocks_1),
440480
mock_client,
441481
stream_position,
442482
{"metadata": {"test-key": "test-value"}},

0 commit comments

Comments
 (0)