Skip to content

Commit e600ad8

Browse files
authored
fix: Update stream_ack_deadline with ack_deadline (#763)
1 parent 260bd18 commit e600ad8

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,10 @@ def _obtain_ack_deadline(self, maybe_update: bool) -> float:
451451
self._ack_deadline = max(
452452
self._ack_deadline, _MIN_ACK_DEADLINE_SECS_WHEN_EXACTLY_ONCE_ENABLED
453453
)
454-
454+
# If we have updated the ack_deadline and it is longer than the stream_ack_deadline
455+
# set the stream_ack_deadline to the new ack_deadline.
456+
if self._ack_deadline > self._stream_ack_deadline:
457+
self._stream_ack_deadline = self._ack_deadline
455458
return self._ack_deadline
456459

457460
@property
@@ -818,7 +821,7 @@ def open(
818821
)
819822

820823
# Create the RPC
821-
stream_ack_deadline_seconds = self.ack_deadline
824+
stream_ack_deadline_seconds = self._stream_ack_deadline
822825

823826
get_initial_request = functools.partial(
824827
self._get_initial_request, stream_ack_deadline_seconds

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension():
257257
# The deadline configured in flow control should prevail.
258258
deadline = manager._obtain_ack_deadline(maybe_update=True)
259259
assert deadline == histogram.MAX_ACK_DEADLINE
260+
assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE
260261

261262

262263
def test__obtain_ack_deadline_with_max_duration_per_lease_extension_too_low():
@@ -283,6 +284,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension_too_high():
283284
# The deadline configured in flow control should be adjusted to the maximum allowed.
284285
deadline = manager._obtain_ack_deadline(maybe_update=True)
285286
assert deadline == histogram.MAX_ACK_DEADLINE
287+
assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE
286288

287289

288290
def test__obtain_ack_deadline_with_exactly_once_enabled():
@@ -299,6 +301,7 @@ def test__obtain_ack_deadline_with_exactly_once_enabled():
299301
# Since the 60-second min ack_deadline value for exactly_once subscriptions
300302
# seconds is higher than the histogram value, the deadline should be 60 sec.
301303
assert deadline == 60
304+
assert manager._stream_ack_deadline == 60
302305

303306

304307
def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly_once_enabled():
@@ -316,6 +319,7 @@ def test__obtain_ack_deadline_with_min_duration_per_lease_extension_with_exactly
316319
# User-defined custom min ack_deadline value takes precedence over
317320
# exactly_once default of 60 seconds.
318321
assert deadline == histogram.MAX_ACK_DEADLINE
322+
assert manager._stream_ack_deadline == histogram.MAX_ACK_DEADLINE
319323

320324

321325
def test__obtain_ack_deadline_no_value_update():
@@ -1148,7 +1152,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
11481152
)
11491153
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
11501154
assert initial_request_arg.func == manager._get_initial_request
1151-
assert initial_request_arg.args[0] == 18
1155+
assert initial_request_arg.args[0] == 60
11521156
assert not manager._client.get_subscription.called
11531157

11541158
resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
@@ -1833,6 +1837,7 @@ def test__on_response_disable_exactly_once():
18331837
# exactly_once minimum since exactly_once has been disabled.
18341838
deadline = manager._obtain_ack_deadline(maybe_update=True)
18351839
assert deadline == histogram.MIN_ACK_DEADLINE
1840+
assert manager._stream_ack_deadline == 60
18361841

18371842

18381843
def test__on_response_exactly_once_immediate_modacks_fail():

0 commit comments

Comments
 (0)