1717package com .google .cloud .pubsublite .kafka ;
1818
1919import static com .google .cloud .pubsublite .kafka .KafkaExceptionUtils .toKafka ;
20+ import static com .google .common .base .Preconditions .checkState ;
21+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
2022
2123import com .google .api .core .ApiFuture ;
2224import com .google .api .core .ApiFutureCallback ;
2325import com .google .api .core .ApiFutures ;
26+ import com .google .api .core .SettableApiFuture ;
2427import com .google .cloud .pubsublite .Offset ;
2528import com .google .cloud .pubsublite .Partition ;
2629import com .google .cloud .pubsublite .SequencedMessage ;
2730import com .google .cloud .pubsublite .TopicPath ;
31+ import com .google .cloud .pubsublite .internal .BlockingPullSubscriber ;
2832import com .google .cloud .pubsublite .internal .CloseableMonitor ;
2933import com .google .cloud .pubsublite .internal .ExtractStatus ;
30- import com .google .cloud .pubsublite .internal .PullSubscriber ;
3134import com .google .cloud .pubsublite .internal .wire .Committer ;
3235import com .google .cloud .pubsublite .proto .SeekRequest ;
3336import com .google .cloud .pubsublite .proto .SeekRequest .NamedTarget ;
4043import com .google .errorprone .annotations .concurrent .GuardedBy ;
4144import java .time .Duration ;
4245import java .util .ArrayDeque ;
43- import java .util .Collections ;
4446import java .util .HashMap ;
4547import java .util .List ;
4648import java .util .Map ;
4749import java .util .Optional ;
4850import java .util .Queue ;
4951import java .util .Set ;
52+ import java .util .concurrent .TimeoutException ;
5053import java .util .stream .Collectors ;
5154import org .apache .kafka .clients .consumer .CommitFailedException ;
5255import org .apache .kafka .clients .consumer .ConsumerRecord ;
@@ -67,17 +70,21 @@ class SingleSubscriptionConsumerImpl implements SingleSubscriptionConsumer {
6770 private final CloseableMonitor monitor = new CloseableMonitor ();
6871
6972 static class SubscriberState {
70- PullSubscriber < SequencedMessage > subscriber ;
73+ BlockingPullSubscriber subscriber ;
7174 Committer committer ;
72- Optional <Offset > lastUncommitted = Optional .empty ();
75+ boolean needsCommitting = false ;
76+ Optional <Offset > lastReceived = Optional .empty ();
7377 }
7478
7579 @ GuardedBy ("monitor.monitor" )
76- private Map <Partition , SubscriberState > partitions = new HashMap <>();
77-
78- // Set to true when wakeup() has been called once .
80+ private final Map <Partition , SubscriberState > partitions = new HashMap <>();
81+ // When the set of assignments changes, this future will be set and swapped with a new future to
82+ // let ongoing pollers know that they should pick up new assignments .
7983 @ GuardedBy ("monitor.monitor" )
80- private boolean wakeupTriggered = false ;
84+ private SettableApiFuture <Void > assignmentChanged = SettableApiFuture .create ();
85+
86+ // Set when wakeup() has been called once.
87+ private final SettableApiFuture <Void > wakeupTriggered = SettableApiFuture .create ();
8188
8289 SingleSubscriptionConsumerImpl (
8390 TopicPath topic ,
@@ -118,6 +125,8 @@ public void setAssignment(Set<Partition> assignment) {
118125 s .committer .startAsync ().awaitRunning ();
119126 partitions .put (partition , s );
120127 }));
128+ assignmentChanged .set (null );
129+ assignmentChanged = SettableApiFuture .create ();
121130 } catch (Throwable t ) {
122131 throw ExtractStatus .toCanonical (t ).underlying ;
123132 }
@@ -136,27 +145,33 @@ private Map<Partition, Queue<SequencedMessage>> fetchAll() {
136145 partitions .forEach (
137146 ExtractStatus .rethrowAsRuntime (
138147 (partition , state ) -> {
139- List <SequencedMessage > messages = state .subscriber .pull ();
140- if (messages .isEmpty ()) return ;
141- partitionQueues .computeIfAbsent (partition , x -> new ArrayDeque <>()).addAll (messages );
148+ ArrayDeque <SequencedMessage > messages = new ArrayDeque <>();
149+ for (Optional <SequencedMessage > message = state .subscriber .messageIfAvailable ();
150+ message .isPresent ();
151+ message = state .subscriber .messageIfAvailable ()) {
152+ messages .add (message .get ());
153+ }
154+ partitionQueues .put (partition , messages );
142155 }));
143156 return partitionQueues ;
144157 }
145158
146159 private Map <Partition , Queue <SequencedMessage >> doPoll (Duration duration ) {
147160 try {
148- while (!duration .isZero ()) {
149- try (CloseableMonitor .Hold h = monitor .enter ()) {
150- if (wakeupTriggered ) throw new WakeupException ();
151- Map <Partition , Queue <SequencedMessage >> partitionQueues = fetchAll ();
152- if (!partitionQueues .isEmpty ()) return partitionQueues ;
153- }
154- Duration sleepFor = Collections .min (ImmutableList .of (duration , Duration .ofMillis (10 )));
155- Thread .sleep (sleepFor .toMillis ());
156- duration = duration .minus (sleepFor );
161+ ImmutableList .Builder <ApiFuture <Void >> stopSleepingSignals = ImmutableList .builder ();
162+ try (CloseableMonitor .Hold h = monitor .enter ()) {
163+ stopSleepingSignals .add (wakeupTriggered );
164+ stopSleepingSignals .add (assignmentChanged );
165+ partitions .values ().forEach (state -> stopSleepingSignals .add (state .subscriber .onData ()));
166+ }
167+ try {
168+ ApiFuturesExtensions .whenFirstDone (stopSleepingSignals .build ())
169+ .get (duration .toMillis (), MILLISECONDS );
170+ } catch (TimeoutException e ) {
171+ return ImmutableMap .of ();
157172 }
158- // Last fetch to handle duration originally being 0 and last time window sleep.
159173 try (CloseableMonitor .Hold h = monitor .enter ()) {
174+ if (wakeupTriggered .isDone ()) throw new WakeupException ();
160175 return fetchAll ();
161176 }
162177 } catch (Throwable t ) {
@@ -166,8 +181,6 @@ private Map<Partition, Queue<SequencedMessage>> doPoll(Duration duration) {
166181
167182 @ Override
168183 public ConsumerRecords <byte [], byte []> poll (Duration duration ) {
169- Map <Partition , Queue <SequencedMessage >> partitionQueues = doPoll (duration );
170- Map <TopicPartition , List <ConsumerRecord <byte [], byte []>>> records = new HashMap <>();
171184 if (autocommit ) {
172185 ApiFuture <?> future = commitAll ();
173186 ApiFutures .addCallback (
@@ -183,14 +196,16 @@ public void onSuccess(Object result) {}
183196 },
184197 MoreExecutors .directExecutor ());
185198 }
199+ Map <Partition , Queue <SequencedMessage >> partitionQueues = doPoll (duration );
200+ Map <TopicPartition , List <ConsumerRecord <byte [], byte []>>> records = new HashMap <>();
186201 partitionQueues .forEach (
187202 (partition , queue ) -> {
188203 if (queue .isEmpty ()) return ;
189204 try (CloseableMonitor .Hold h = monitor .enter ()) {
190205 SubscriberState state = partitions .getOrDefault (partition , null );
191- if (state != null ) {
192- state .lastUncommitted = Optional .of (Iterables .getLast (queue ).offset ());
193- }
206+ if (state == null ) return ;
207+ state .lastReceived = Optional .of (Iterables .getLast (queue ).offset ());
208+ state . needsCommitting = true ;
194209 }
195210 List <ConsumerRecord <byte [], byte []>> partitionRecords =
196211 queue .stream ()
@@ -207,17 +222,16 @@ public ApiFuture<Map<Partition, Offset>> commitAll() {
207222 try (CloseableMonitor .Hold h = monitor .enter ()) {
208223 ImmutableMap .Builder <Partition , Offset > builder = ImmutableMap .builder ();
209224 ImmutableList .Builder <ApiFuture <?>> commitFutures = ImmutableList .builder ();
210- partitions .entrySet ().stream ()
211- .filter (entry -> entry .getValue ().lastUncommitted .isPresent ())
212- .forEach (
213- entry -> {
214- // The Pub/Sub Lite commit offset is the next to be received.
215- Offset lastUncommitted = entry .getValue ().lastUncommitted .get ();
216- entry .getValue ().lastUncommitted = Optional .empty ();
217- Offset toCommit = Offset .of (lastUncommitted .value () + 1 );
218- builder .put (entry .getKey (), toCommit );
219- commitFutures .add (entry .getValue ().committer .commitOffset (toCommit ));
220- });
225+ partitions .forEach (
226+ (partition , state ) -> {
227+ if (!state .needsCommitting ) return ;
228+ checkState (state .lastReceived .isPresent ());
229+ state .needsCommitting = false ;
230+ // The Pub/Sub Lite commit offset is one more than the last received.
231+ Offset toCommit = Offset .of (state .lastReceived .get ().value () + 1 );
232+ builder .put (partition , toCommit );
233+ commitFutures .add (state .committer .commitOffset (toCommit ));
234+ });
221235 Map <Partition , Offset > map = builder .build ();
222236 return ApiFutures .transform (
223237 ApiFutures .allAsList (commitFutures .build ()),
@@ -269,7 +283,7 @@ public void doSeek(Partition partition, SeekRequest request) throws KafkaExcepti
269283 @ Override
270284 public Optional <Long > position (Partition partition ) {
271285 if (!partitions .containsKey (partition )) return Optional .empty ();
272- return partitions .get (partition ).subscriber . nextOffset (). map (Offset :: value );
286+ return partitions .get (partition ).lastReceived . map (lastReceived -> lastReceived . value () + 1 );
273287 }
274288
275289 @ Override
@@ -286,8 +300,6 @@ public void close(Duration duration) {
286300
287301 @ Override
288302 public void wakeup () {
289- try (CloseableMonitor .Hold h = monitor .enter ()) {
290- wakeupTriggered = true ;
291- }
303+ wakeupTriggered .set (null );
292304 }
293305}
0 commit comments