Skip to content

Commit 20ae0ba

Browse files
authored
feat: Reset Consumer upon out-of-band seek (#172)
Resets the Consumer state to handle out-of-band seeks pushed from the server.
1 parent 783b3fc commit 20ae0ba

File tree

7 files changed

+416
-90
lines changed

7 files changed

+416
-90
lines changed

.readme-partials.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,9 @@ about: |
8080
- Producers operate on a single topic, and Consumers on a single subscription.
8181
- ProducerRecord may not specify partition explicitly.
8282
- Consumers may not dynamically create consumer groups (subscriptions).
83+
84+
Note:
85+
- In order to use Pub/Sub Lite [seek operations](https://cloud.google.com/pubsub/lite/docs/seek),
86+
Consumers must have auto-commit enabled. Consumer seek methods are client-initiated, whereas
87+
Pub/Sub Lite seek operations are initiated out-of-band and pushed to Consumers. Both types of
88+
seeks should not be used concurrently, as they would interfere with one another.

src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
128128
}
129129
};
130130
PullSubscriberFactory pullSubscriberFactory =
131-
(partition, initialSeek) -> {
131+
(partition, initialSeek, resetHandler) -> {
132132
SubscriberFactory subscriberFactory =
133133
consumer -> {
134134
try {
@@ -145,6 +145,7 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
145145
RoutingMetadata.of(subscriptionPath(), partition),
146146
SubscriberServiceSettings.newBuilder()))))
147147
.setInitialLocation(initialSeek)
148+
.setResetHandler(resetHandler)
148149
.build();
149150
} catch (Throwable t) {
150151
throw toCanonical(t).underlying;

src/main/java/com/google/cloud/pubsublite/kafka/PullSubscriberFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import com.google.cloud.pubsublite.Partition;
2020
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
2121
import com.google.cloud.pubsublite.internal.CheckedApiException;
22+
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
2223
import com.google.cloud.pubsublite.proto.SeekRequest;
2324

2425
/** A factory for making new PullSubscribers for a given partition of a subscription. */
2526
interface PullSubscriberFactory {
26-
BlockingPullSubscriber newPullSubscriber(Partition partition, SeekRequest initial)
27+
BlockingPullSubscriber newPullSubscriber(
28+
Partition partition, SeekRequest initial, SubscriberResetHandler resetHandler)
2729
throws CheckedApiException;
2830
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.kafka;
18+
19+
import static com.google.common.base.Preconditions.checkState;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.cloud.pubsublite.Offset;
24+
import com.google.cloud.pubsublite.Partition;
25+
import com.google.cloud.pubsublite.SequencedMessage;
26+
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
27+
import com.google.cloud.pubsublite.internal.CheckedApiException;
28+
import com.google.cloud.pubsublite.internal.CloseableMonitor;
29+
import com.google.cloud.pubsublite.internal.ProxyService;
30+
import com.google.cloud.pubsublite.internal.wire.Committer;
31+
import com.google.cloud.pubsublite.proto.SeekRequest;
32+
import com.google.common.collect.Iterables;
33+
import com.google.common.util.concurrent.MoreExecutors;
34+
import com.google.errorprone.annotations.concurrent.GuardedBy;
35+
import java.util.ArrayDeque;
36+
import java.util.Optional;
37+
38+
/** Pulls messages and manages commits for a single partition of a subscription. */
39+
class SinglePartitionSubscriber extends ProxyService {
40+
private final PullSubscriberFactory subscriberFactory;
41+
private final Partition partition;
42+
private final Committer committer;
43+
private final boolean enableReset;
44+
45+
private final CloseableMonitor monitor = new CloseableMonitor();
46+
47+
@GuardedBy("monitor.monitor")
48+
private BlockingPullSubscriber subscriber;
49+
50+
@GuardedBy("monitor.monitor")
51+
private boolean needsCommitting = false;
52+
53+
@GuardedBy("monitor.monitor")
54+
private Optional<Offset> lastReceived = Optional.empty();
55+
56+
SinglePartitionSubscriber(
57+
PullSubscriberFactory subscriberFactory,
58+
Partition partition,
59+
SeekRequest initialSeek,
60+
Committer committer,
61+
boolean enableReset)
62+
throws CheckedApiException {
63+
this.subscriberFactory = subscriberFactory;
64+
this.partition = partition;
65+
this.committer = committer;
66+
this.enableReset = enableReset;
67+
this.subscriber =
68+
subscriberFactory.newPullSubscriber(partition, initialSeek, this::onSubscriberReset);
69+
addServices(committer);
70+
}
71+
72+
// ProxyService implementation.
73+
@Override
74+
protected void start() {}
75+
76+
@Override
77+
protected void stop() {
78+
try (CloseableMonitor.Hold h = monitor.enter()) {
79+
subscriber.close();
80+
}
81+
}
82+
83+
@Override
84+
protected void handlePermanentError(CheckedApiException error) {
85+
stop();
86+
}
87+
88+
/** Executes a client-initiated seek. */
89+
void clientSeek(SeekRequest request) throws CheckedApiException {
90+
try (CloseableMonitor.Hold h = monitor.enter()) {
91+
subscriber.close();
92+
subscriber = subscriberFactory.newPullSubscriber(partition, request, this::onSubscriberReset);
93+
}
94+
}
95+
96+
ApiFuture<Void> onData() {
97+
try (CloseableMonitor.Hold h = monitor.enter()) {
98+
return subscriber.onData();
99+
}
100+
}
101+
102+
@GuardedBy("monitor.monitor")
103+
private ArrayDeque<SequencedMessage> pullMessages() throws CheckedApiException {
104+
ArrayDeque<SequencedMessage> messages = new ArrayDeque<>();
105+
for (Optional<SequencedMessage> message = subscriber.messageIfAvailable();
106+
message.isPresent();
107+
message = subscriber.messageIfAvailable()) {
108+
messages.add(message.get());
109+
}
110+
return messages;
111+
}
112+
113+
/** Pulls all available messages. */
114+
ArrayDeque<SequencedMessage> getMessages() throws CheckedApiException {
115+
try (CloseableMonitor.Hold h = monitor.enter()) {
116+
ArrayDeque<SequencedMessage> messages = pullMessages();
117+
if (!messages.isEmpty()) {
118+
lastReceived = Optional.of(Iterables.getLast(messages).offset());
119+
needsCommitting = true;
120+
}
121+
return messages;
122+
}
123+
}
124+
125+
Optional<Long> position() {
126+
try (CloseableMonitor.Hold h = monitor.enter()) {
127+
return lastReceived.map(lastReceived -> lastReceived.value() + 1);
128+
}
129+
}
130+
131+
/** Executes a client-initiated commit. */
132+
ApiFuture<Void> commitOffset(Offset offset) {
133+
return committer.commitOffset(offset);
134+
}
135+
136+
/** Auto-commits the offset of the last received message. */
137+
Optional<ApiFuture<Offset>> autoCommit() {
138+
try (CloseableMonitor.Hold h = monitor.enter()) {
139+
if (!needsCommitting) return Optional.empty();
140+
checkState(lastReceived.isPresent());
141+
needsCommitting = false;
142+
// The Pub/Sub Lite commit offset is one more than the last received.
143+
Offset toCommit = Offset.of(lastReceived.get().value() + 1);
144+
return Optional.of(
145+
ApiFutures.transform(
146+
committer.commitOffset(toCommit),
147+
ignored -> toCommit,
148+
MoreExecutors.directExecutor()));
149+
}
150+
}
151+
152+
private boolean onSubscriberReset() throws CheckedApiException {
153+
if (!enableReset) {
154+
return false;
155+
}
156+
157+
// Handle an out-of-band seek notification from the server. There must be no pending commits
158+
// after this function returns.
159+
try (CloseableMonitor.Hold h = monitor.enter()) {
160+
// Discard undelivered messages.
161+
pullMessages();
162+
// Prevent further auto-commits until post-seek messages are received.
163+
needsCommitting = false;
164+
}
165+
committer.waitUntilEmpty();
166+
return true;
167+
}
168+
}

0 commit comments

Comments
 (0)