Skip to content

Commit 7f88458

Browse files
feat: Implement AckSetTracker which tracks message acknowledgements. (#19)
* feat: Implement AckSetTracker which tracks message acknowledgements. Note that it is awkward to structure this like the java version, as there is no "AsyncCallable" type in python. * fix: Fix comments on ack_set_tracker.
1 parent 3068da5 commit 7f88458

File tree

7 files changed

+130
-0
lines changed

7 files changed

+130
-0
lines changed

google/cloud/pubsublite/cloudpubsub/__init__.py

Whitespace-only changes.

google/cloud/pubsublite/cloudpubsub/internal/__init__.py

Whitespace-only changes.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from abc import abstractmethod
2+
from typing import AsyncContextManager
3+
4+
5+
class AckSetTracker(AsyncContextManager):
6+
"""
7+
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
8+
is aggregated.
9+
"""
10+
@abstractmethod
11+
def track(self, offset: int):
12+
"""
13+
Track the provided offset.
14+
15+
Args:
16+
offset: the offset to track.
17+
18+
Raises:
19+
GoogleAPICallError: On an invalid offset to track.
20+
"""
21+
22+
@abstractmethod
23+
async def ack(self, offset: int):
24+
"""
25+
Acknowledge the message with the provided offset. The offset must have previously been tracked.
26+
27+
Args:
28+
offset: the offset to acknowledge.
29+
30+
Returns:
31+
GoogleAPICallError: On a commit failure.
32+
"""
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import queue
2+
from collections import deque
3+
from typing import Optional
4+
5+
from google.api_core.exceptions import FailedPrecondition
6+
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
7+
from google.cloud.pubsublite.internal.wire.committer import Committer
8+
from google.cloud.pubsublite_v1 import Cursor
9+
10+
11+
class AckSetTrackerImpl(AckSetTracker):
12+
_committer: Committer
13+
14+
_receipts: "deque[int]"
15+
_acks: "queue.PriorityQueue[int]"
16+
17+
def __init__(self, committer: Committer):
18+
self._committer = committer
19+
self._receipts = deque()
20+
self._acks = queue.PriorityQueue()
21+
22+
def track(self, offset: int):
23+
if len(self._receipts) > 0:
24+
last = self._receipts[0]
25+
if last >= offset:
26+
raise FailedPrecondition(f"Tried to track message {offset} which is before last tracked message {last}.")
27+
self._receipts.append(offset)
28+
29+
async def ack(self, offset: int):
30+
# Note: put_nowait is used here and below to ensure that the below logic is executed without yielding
31+
# to another coroutine in the event loop. The queue is unbounded so it will never throw.
32+
self._acks.put_nowait(offset)
33+
prefix_acked_offset: Optional[int] = None
34+
while len(self._receipts) != 0 and not self._acks.empty():
35+
receipt = self._receipts.popleft()
36+
ack = self._acks.get_nowait()
37+
if receipt == ack:
38+
prefix_acked_offset = receipt
39+
continue
40+
self._receipts.append(receipt)
41+
self._acks.put(ack)
42+
break
43+
if prefix_acked_offset is None:
44+
return
45+
# Convert from last acked to first unacked.
46+
await self._committer.commit(Cursor(offset=prefix_acked_offset+1))
47+
48+
async def __aenter__(self):
49+
await self._committer.__aenter__()
50+
51+
async def __aexit__(self, exc_type, exc_value, traceback):
52+
await self._committer.__aexit__(exc_type, exc_value, traceback)

tests/unit/pubsublite/cloudpubsub/__init__.py

Whitespace-only changes.

tests/unit/pubsublite/cloudpubsub/internal/__init__.py

Whitespace-only changes.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from asynctest.mock import MagicMock, call
2+
import pytest
3+
4+
# All test coroutines will be treated as marked.
5+
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
6+
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl
7+
from google.cloud.pubsublite.internal.wire.committer import Committer
8+
from google.cloud.pubsublite_v1 import Cursor
9+
10+
pytestmark = pytest.mark.asyncio
11+
12+
13+
@pytest.fixture()
14+
def committer():
15+
committer = MagicMock(spec=Committer)
16+
committer.__aenter__.return_value = committer
17+
return committer
18+
19+
20+
@pytest.fixture()
21+
def tracker(committer):
22+
return AckSetTrackerImpl(committer)
23+
24+
25+
async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker):
26+
async with tracker:
27+
committer.__aenter__.assert_called_once()
28+
tracker.track(offset=1)
29+
tracker.track(offset=3)
30+
tracker.track(offset=5)
31+
tracker.track(offset=7)
32+
33+
committer.commit.assert_has_calls([])
34+
await tracker.ack(offset=3)
35+
committer.commit.assert_has_calls([])
36+
await tracker.ack(offset=5)
37+
committer.commit.assert_has_calls([])
38+
await tracker.ack(offset=1)
39+
committer.commit.assert_has_calls([call(Cursor(offset=6))])
40+
41+
tracker.track(offset=8)
42+
await tracker.ack(offset=7)
43+
committer.commit.assert_has_calls([call(Cursor(offset=6)), call(Cursor(offset=8))])
44+
committer.__aexit__.assert_called_once()
45+
46+

0 commit comments

Comments
 (0)