Skip to content

Commit 253ced2

Browse files
authored
feat: return singleton success future for exactly-once methods in Message (#608)
* Return singleton success future for exactly-once methods in subscriber.Message
1 parent a91bed8 commit 253ced2

File tree

2 files changed

+18
-9
lines changed

2 files changed

+18
-9
lines changed

google/cloud/pubsub_v1/subscriber/message.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
attributes: {}
4141
}}"""
4242

43+
_SUCCESS_FUTURE = futures.Future()
44+
_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS)
45+
4346

4447
def _indent(lines: str, prefix: str = " ") -> str:
4548
"""Indent some text.
@@ -291,12 +294,13 @@ def ack_with_response(self) -> "futures.Future":
291294
pubsub_v1.subscriber.exceptions.AcknowledgeError exception
292295
will be thrown.
293296
"""
294-
future = futures.Future()
295-
req_future = None
297+
req_future: Optional[futures.Future]
296298
if self._exactly_once_delivery_enabled_func():
299+
future = futures.Future()
297300
req_future = future
298301
else:
299-
future.set_result(AcknowledgeStatus.SUCCESS)
302+
future = _SUCCESS_FUTURE
303+
req_future = None
300304
time_to_ack = math.ceil(time.time() - self._received_timestamp)
301305
self._request_queue.put(
302306
requests.AckRequest(
@@ -390,12 +394,13 @@ def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future":
390394
will be thrown.
391395
392396
"""
393-
future = futures.Future()
394-
req_future = None
397+
req_future: Optional[futures.Future]
395398
if self._exactly_once_delivery_enabled_func():
399+
future = futures.Future()
396400
req_future = future
397401
else:
398-
future.set_result(AcknowledgeStatus.SUCCESS)
402+
future = _SUCCESS_FUTURE
403+
req_future = None
399404

400405
self._request_queue.put(
401406
requests.ModAckRequest(
@@ -451,12 +456,13 @@ def nack_with_response(self) -> "futures.Future":
451456
will be thrown.
452457
453458
"""
454-
future = futures.Future()
455-
req_future = None
459+
req_future: Optional[futures.Future]
456460
if self._exactly_once_delivery_enabled_func():
461+
future = futures.Future()
457462
req_future = future
458463
else:
459-
future.set_result(AcknowledgeStatus.SUCCESS)
464+
future = _SUCCESS_FUTURE
465+
req_future = None
460466

461467
self._request_queue.put(
462468
requests.NackRequest(

tests/unit/pubsub_v1/subscriber/test_message.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def test_ack_with_response_exactly_once_delivery_disabled():
156156
)
157157
)
158158
assert future.result() == AcknowledgeStatus.SUCCESS
159+
assert future == message._SUCCESS_FUTURE
159160
check_call_types(put, requests.AckRequest)
160161

161162

@@ -205,6 +206,7 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_disabled():
205206
requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None)
206207
)
207208
assert future.result() == AcknowledgeStatus.SUCCESS
209+
assert future == message._SUCCESS_FUTURE
208210
check_call_types(put, requests.ModAckRequest)
209211

210212

@@ -242,6 +244,7 @@ def test_nack_with_response_exactly_once_delivery_disabled():
242244
)
243245
)
244246
assert future.result() == AcknowledgeStatus.SUCCESS
247+
assert future == message._SUCCESS_FUTURE
245248
check_call_types(put, requests.NackRequest)
246249

247250

0 commit comments

Comments
 (0)