Skip to content

Commit ced4f52

Browse files
fix: Replace asserts with None checks for graceful shutdown (#1244)
1 parent 49c7c61 commit ced4f52

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,8 +1104,11 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
11041104
)
11051105

11061106
with self._pause_resume_lock:
1107-
assert self._scheduler is not None
1108-
assert self._leaser is not None
1107+
if self._scheduler is None or self._leaser is None:
1108+
_LOGGER.debug(
1109+
f"self._scheduler={self._scheduler} or self._leaser={self._leaser} is None. Stopping further processing."
1110+
)
1111+
return
11091112

11101113
for received_message in received_messages:
11111114
if (

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1375,6 +1375,62 @@ def test_close_blocking_scheduler_shutdown():
13751375
scheduler.shutdown.assert_called_once_with(await_msg_callbacks=True)
13761376

13771377

1378+
def test__on_response_none_scheduler():
1379+
manager, _, _, _, _, _ = make_running_manager()
1380+
1381+
manager._callback = mock.sentinel.callback
1382+
manager._scheduler = None
1383+
# Set up the messages.
1384+
response = gapic_types.StreamingPullResponse(
1385+
received_messages=[
1386+
gapic_types.ReceivedMessage(
1387+
ack_id="ack1",
1388+
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
1389+
),
1390+
gapic_types.ReceivedMessage(
1391+
ack_id="ack2",
1392+
message=gapic_types.PubsubMessage(data=b"bar", message_id="2"),
1393+
delivery_attempt=6,
1394+
),
1395+
]
1396+
)
1397+
1398+
manager._maybe_release_messages = mock.Mock()
1399+
1400+
# adjust message bookkeeping in leaser
1401+
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)
1402+
manager._on_response(response)
1403+
1404+
manager._maybe_release_messages.assert_not_called
1405+
1406+
1407+
def test__on_response_none_leaser():
1408+
manager, _, _, _, _, _ = make_running_manager()
1409+
1410+
manager._callback = mock.sentinel.callback
1411+
manager._leaser = None
1412+
# Set up the messages.
1413+
response = gapic_types.StreamingPullResponse(
1414+
received_messages=[
1415+
gapic_types.ReceivedMessage(
1416+
ack_id="ack1",
1417+
message=gapic_types.PubsubMessage(data=b"foo", message_id="1"),
1418+
),
1419+
gapic_types.ReceivedMessage(
1420+
ack_id="ack2",
1421+
message=gapic_types.PubsubMessage(data=b"bar", message_id="2"),
1422+
delivery_attempt=6,
1423+
),
1424+
]
1425+
)
1426+
1427+
manager._maybe_release_messages = mock.Mock()
1428+
1429+
manager._on_response(response)
1430+
1431+
manager._maybe_release_messages.assert_not_called
1432+
1433+
13781434
def test_close_nonblocking_scheduler_shutdown():
13791435
manager, _, _, _, _, _ = make_running_manager(await_callbacks_on_shutdown=False)
13801436
scheduler = manager._scheduler

0 commit comments

Comments
 (0)