Skip to content

Commit 219491e

Browse files
authored
Fix: Handle duplicate acks with streaming pull (#662)
1 parent 1a07d7c commit 219491e

File tree

2 files changed

+336
-5
lines changed

2 files changed

+336
-5
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828

2929
from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
3030
from google.cloud.pubsub_v1.subscriber._protocol import requests
31+
from google.cloud.pubsub_v1.subscriber.exceptions import (
32+
AcknowledgeStatus,
33+
)
3134

3235
if typing.TYPE_CHECKING: # pragma: NO COVER
3336
import queue
@@ -128,17 +131,50 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
128131
nack_requests: List[requests.NackRequest] = []
129132
drop_requests: List[requests.DropRequest] = []
130133

134+
lease_ids = set()
135+
modack_ids = set()
136+
ack_ids = set()
137+
nack_ids = set()
138+
drop_ids = set()
139+
exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled()
140+
131141
for item in items:
132142
if isinstance(item, requests.LeaseRequest):
133-
lease_requests.append(item)
143+
if (
144+
item.ack_id not in lease_ids
145+
): # LeaseRequests have no futures to handle.
146+
lease_ids.add(item.ack_id)
147+
lease_requests.append(item)
134148
elif isinstance(item, requests.ModAckRequest):
135-
modack_requests.append(item)
149+
if item.ack_id in modack_ids:
150+
self._handle_duplicate_request_future(
151+
exactly_once_delivery_enabled, item
152+
)
153+
else:
154+
modack_ids.add(item.ack_id)
155+
modack_requests.append(item)
136156
elif isinstance(item, requests.AckRequest):
137-
ack_requests.append(item)
157+
if item.ack_id in ack_ids:
158+
self._handle_duplicate_request_future(
159+
exactly_once_delivery_enabled, item
160+
)
161+
else:
162+
ack_ids.add(item.ack_id)
163+
ack_requests.append(item)
138164
elif isinstance(item, requests.NackRequest):
139-
nack_requests.append(item)
165+
if item.ack_id in nack_ids:
166+
self._handle_duplicate_request_future(
167+
exactly_once_delivery_enabled, item
168+
)
169+
else:
170+
nack_ids.add(item.ack_id)
171+
nack_requests.append(item)
140172
elif isinstance(item, requests.DropRequest):
141-
drop_requests.append(item)
173+
if (
174+
item.ack_id not in drop_ids
175+
): # DropRequests have no futures to handle.
176+
drop_ids.add(item.ack_id)
177+
drop_requests.append(item)
142178
else:
143179
warnings.warn(
144180
f'Skipping unknown request item of type "{type(item)}"',
@@ -164,6 +200,29 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
164200
if drop_requests:
165201
self.drop(drop_requests)
166202

203+
def _handle_duplicate_request_future(
204+
self,
205+
exactly_once_delivery_enabled: bool,
206+
item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest],
207+
) -> None:
208+
_LOGGER.debug(
209+
"This is a duplicate %s with the same ack_id: %s.",
210+
type(item),
211+
item.ack_id,
212+
)
213+
if item.future:
214+
if exactly_once_delivery_enabled:
215+
item.future.set_exception(
216+
ValueError(f"Duplicate ack_id for {type(item)}")
217+
)
218+
# Futures may be present even with exactly-once delivery
219+
# disabled, in transition periods after the setting is changed on
220+
# the subscription.
221+
else:
222+
# When exactly-once delivery is NOT enabled, acks/modacks are considered
223+
# best-effort, so the future should succeed even though this is a duplicate.
224+
item.future.set_result(AcknowledgeStatus.SUCCESS)
225+
167226
def ack(self, items: Sequence[requests.AckRequest]) -> None:
168227
"""Acknowledge the given messages.
169228

tests/unit/pubsub_v1/subscriber/test_dispatcher.py

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import mock
2626
import pytest
27+
from google.cloud.pubsub_v1.subscriber.exceptions import (
28+
AcknowledgeStatus,
29+
)
2730

2831

2932
@pytest.mark.parametrize(
@@ -48,6 +51,7 @@ def test_dispatch_callback_active_manager(item, method_name):
4851
dispatcher_.dispatch_callback(items)
4952

5053
method.assert_called_once_with([item])
54+
manager._exactly_once_delivery_enabled.assert_called()
5155

5256

5357
@pytest.mark.parametrize(
@@ -73,6 +77,274 @@ def test_dispatch_callback_inactive_manager(item, method_name):
7377
dispatcher_.dispatch_callback(items)
7478

7579
method.assert_called_once_with([item])
80+
manager._exactly_once_delivery_enabled.assert_called()
81+
82+
83+
@pytest.mark.parametrize(
84+
"items,method_name",
85+
[
86+
(
87+
[
88+
requests.AckRequest("0", 0, 0, "", None),
89+
requests.AckRequest("0", 0, 1, "", None),
90+
],
91+
"ack",
92+
),
93+
(
94+
[
95+
requests.DropRequest("0", 0, ""),
96+
requests.DropRequest("0", 1, ""),
97+
],
98+
"drop",
99+
),
100+
(
101+
[
102+
requests.LeaseRequest("0", 0, ""),
103+
requests.LeaseRequest("0", 1, ""),
104+
],
105+
"lease",
106+
),
107+
(
108+
[
109+
requests.ModAckRequest("0", 0, None),
110+
requests.ModAckRequest("0", 1, None),
111+
],
112+
"modify_ack_deadline",
113+
),
114+
(
115+
[
116+
requests.NackRequest("0", 0, "", None),
117+
requests.NackRequest("0", 1, "", None),
118+
],
119+
"nack",
120+
),
121+
],
122+
)
123+
def test_dispatch_duplicate_items_callback_active_manager_no_futures(
124+
items, method_name
125+
):
126+
manager = mock.create_autospec(
127+
streaming_pull_manager.StreamingPullManager, instance=True
128+
)
129+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
130+
131+
manager._exactly_once_delivery_enabled.return_value = False
132+
with mock.patch.object(dispatcher_, method_name) as method:
133+
dispatcher_.dispatch_callback(items)
134+
135+
method.assert_called_once_with([items[0]])
136+
manager._exactly_once_delivery_enabled.assert_called()
137+
138+
139+
@pytest.mark.parametrize(
140+
"items,method_name",
141+
[
142+
(
143+
[
144+
requests.AckRequest("0", 0, 0, "", None),
145+
requests.AckRequest("0", 0, 1, "", futures.Future()),
146+
],
147+
"ack",
148+
),
149+
(
150+
[
151+
requests.DropRequest("0", 0, ""),
152+
requests.DropRequest("0", 1, ""),
153+
],
154+
"drop",
155+
),
156+
(
157+
[
158+
requests.LeaseRequest("0", 0, ""),
159+
requests.LeaseRequest("0", 1, ""),
160+
],
161+
"lease",
162+
),
163+
(
164+
[
165+
requests.ModAckRequest("0", 0, None),
166+
requests.ModAckRequest("0", 1, futures.Future()),
167+
],
168+
"modify_ack_deadline",
169+
),
170+
(
171+
[
172+
requests.NackRequest("0", 0, "", None),
173+
requests.NackRequest("0", 1, "", futures.Future()),
174+
],
175+
"nack",
176+
),
177+
],
178+
)
179+
def test_dispatch_duplicate_items_callback_active_manager_with_futures_no_eod(
180+
items, method_name
181+
):
182+
manager = mock.create_autospec(
183+
streaming_pull_manager.StreamingPullManager, instance=True
184+
)
185+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
186+
187+
manager._exactly_once_delivery_enabled.return_value = False
188+
with mock.patch.object(dispatcher_, method_name) as method:
189+
dispatcher_.dispatch_callback(items)
190+
191+
method.assert_called_once_with([items[0]])
192+
manager._exactly_once_delivery_enabled.assert_called()
193+
194+
if method_name != "drop" and method_name != "lease":
195+
assert items[1].future.result() == AcknowledgeStatus.SUCCESS
196+
197+
198+
@pytest.mark.parametrize(
199+
"items,method_name",
200+
[
201+
(
202+
[
203+
requests.AckRequest("0", 0, 0, "", None),
204+
requests.AckRequest("0", 0, 1, "", futures.Future()),
205+
],
206+
"ack",
207+
),
208+
(
209+
[
210+
requests.DropRequest("0", 0, ""),
211+
requests.DropRequest("0", 1, ""),
212+
],
213+
"drop",
214+
),
215+
(
216+
[
217+
requests.LeaseRequest("0", 0, ""),
218+
requests.LeaseRequest("0", 1, ""),
219+
],
220+
"lease",
221+
),
222+
(
223+
[
224+
requests.ModAckRequest("0", 0, None),
225+
requests.ModAckRequest("0", 1, futures.Future()),
226+
],
227+
"modify_ack_deadline",
228+
),
229+
(
230+
[
231+
requests.NackRequest("0", 0, "", None),
232+
requests.NackRequest("0", 1, "", futures.Future()),
233+
],
234+
"nack",
235+
),
236+
],
237+
)
238+
def test_dispatch_duplicate_items_callback_active_manager_with_futures_eod(
239+
items, method_name
240+
):
241+
manager = mock.create_autospec(
242+
streaming_pull_manager.StreamingPullManager, instance=True
243+
)
244+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
245+
246+
manager._exactly_once_delivery_enabled.return_value = True
247+
with mock.patch.object(dispatcher_, method_name) as method:
248+
dispatcher_.dispatch_callback(items)
249+
250+
method.assert_called_once_with([items[0]])
251+
manager._exactly_once_delivery_enabled.assert_called()
252+
253+
if method_name != "drop" and method_name != "lease":
254+
with pytest.raises(ValueError) as err:
255+
items[1].future.result()
256+
assert err.errisinstance(ValueError)
257+
258+
259+
def test_dispatch_duplicate_items_diff_types_callback_active_manager_with_futures_eod():
260+
ack_future = futures.Future()
261+
ack_request = requests.AckRequest("0", 0, 1, "", ack_future)
262+
drop_request = requests.DropRequest("0", 1, "")
263+
lease_request = requests.LeaseRequest("0", 1, "")
264+
nack_future = futures.Future()
265+
nack_request = requests.NackRequest("0", 1, "", nack_future)
266+
modack_future = futures.Future()
267+
modack_request = requests.ModAckRequest("0", 1, modack_future)
268+
269+
items = [ack_request, drop_request, lease_request, nack_request, modack_request]
270+
271+
manager = mock.create_autospec(
272+
streaming_pull_manager.StreamingPullManager, instance=True
273+
)
274+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
275+
276+
manager._exactly_once_delivery_enabled.return_value = True
277+
with mock.patch.multiple(
278+
dispatcher_,
279+
ack=mock.DEFAULT,
280+
nack=mock.DEFAULT,
281+
drop=mock.DEFAULT,
282+
lease=mock.DEFAULT,
283+
modify_ack_deadline=mock.DEFAULT,
284+
):
285+
dispatcher_.dispatch_callback(items)
286+
manager._exactly_once_delivery_enabled.assert_called()
287+
dispatcher_.ack.assert_called_once_with([ack_request])
288+
dispatcher_.drop.assert_called_once_with([drop_request])
289+
dispatcher_.lease.assert_called_once_with([lease_request])
290+
dispatcher_.nack.assert_called_once_with([nack_request])
291+
dispatcher_.modify_ack_deadline.assert_called_once_with([modack_request])
292+
293+
294+
@pytest.mark.parametrize(
295+
"items,method_name",
296+
[
297+
(
298+
[
299+
requests.AckRequest("0", 0, 0, "", None),
300+
requests.AckRequest("0", 0, 1, "", None),
301+
],
302+
"ack",
303+
),
304+
(
305+
[
306+
requests.DropRequest("0", 0, ""),
307+
requests.DropRequest("0", 1, ""),
308+
],
309+
"drop",
310+
),
311+
(
312+
[
313+
requests.LeaseRequest("0", 0, ""),
314+
requests.LeaseRequest("0", 1, ""),
315+
],
316+
"lease",
317+
),
318+
(
319+
[
320+
requests.ModAckRequest("0", 0, None),
321+
requests.ModAckRequest("0", 1, None),
322+
],
323+
"modify_ack_deadline",
324+
),
325+
(
326+
[
327+
requests.NackRequest("0", 0, "", None),
328+
requests.NackRequest("0", 1, "", None),
329+
],
330+
"nack",
331+
),
332+
],
333+
)
334+
def test_dispatch_duplicate_items_callback_active_manager_no_futures_eod(
335+
items, method_name
336+
):
337+
manager = mock.create_autospec(
338+
streaming_pull_manager.StreamingPullManager, instance=True
339+
)
340+
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)
341+
342+
manager._exactly_once_delivery_enabled.return_value = True
343+
with mock.patch.object(dispatcher_, method_name) as method:
344+
dispatcher_.dispatch_callback(items)
345+
346+
method.assert_called_once_with([items[0]])
347+
manager._exactly_once_delivery_enabled.assert_called()
76348

77349

78350
def test_unknown_request_type():

0 commit comments

Comments
 (0)