Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberServiceStub;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.Optional;
Expand All @@ -56,11 +57,24 @@ public class SubscriberImpl extends ProxyService
private final TokenCounter tokenCounter = new TokenCounter();

@GuardedBy("monitor.monitor")
private Optional<SettableApiFuture<Offset>> inFlightSeek = Optional.empty();
private Optional<InFlightSeek> inFlightSeek = Optional.empty();

@GuardedBy("monitor.monitor")
private boolean internalSeekInFlight = false;

@GuardedBy("monitor.monitor")
private boolean shutdown = false;

private static class InFlightSeek {
final SeekRequest seekRequest;
final SettableApiFuture<Offset> seekFuture;

InFlightSeek(SeekRequest request, SettableApiFuture<Offset> future) {
seekRequest = request;
seekFuture = future;
}
}

@VisibleForTesting
SubscriberImpl(
SubscriberServiceStub stub,
Expand Down Expand Up @@ -91,7 +105,7 @@ public SubscriberImpl(
protected void handlePermanentError(StatusException error) {
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
inFlightSeek.ifPresent(inFlight -> inFlight.setException(error));
inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException(error));
inFlightSeek = Optional.empty();
onPermanentError(error);
}
Expand All @@ -106,7 +120,7 @@ protected void stop() {
shutdown = true;
inFlightSeek.ifPresent(
inFlight ->
inFlight.setException(
inFlight.seekFuture.setException(
Status.ABORTED
.withDescription("Client stopped while seek in flight.")
.asException()));
Expand All @@ -115,13 +129,21 @@ protected void stop() {

@Override
public ApiFuture<Offset> seek(SeekRequest request) {
try (CloseableMonitor.Hold h = monitor.enter()) {
try (CloseableMonitor.Hold h =
monitor.enterWhenUninterruptibly(
new Monitor.Guard(monitor.monitor) {
@Override
public boolean isSatisfied() {
return !internalSeekInFlight || shutdown;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shutdown == true, we'll checkState-fail right below this. I think we should return an immediately-failed Future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should already happen. The catch block will catch the checkState failure and will return an immediateFailedFuture.

}
})) {
checkArgument(
Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set.");
checkState(!shutdown, "Seeked after the stream shut down.");
checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight.");
SettableApiFuture<Offset> future = SettableApiFuture.create();
inFlightSeek = Optional.of(future);
inFlightSeek = Optional.of(new InFlightSeek(request, future));
tokenCounter.onClientSeek();
connection.modifyConnection(
connectedSubscriber ->
connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request)));
Expand Down Expand Up @@ -164,13 +186,17 @@ public void triggerReinitialize() {
connectedSubscriber -> {
checkArgument(monitor.monitor.isOccupiedByCurrentThread());
checkArgument(connectedSubscriber.isPresent());
nextOffsetTracker
.requestForRestart()
.ifPresent(
request -> {
inFlightSeek = Optional.of(SettableApiFuture.create());
connectedSubscriber.get().seek(request);
});
if (inFlightSeek.isPresent()) {
connectedSubscriber.get().seek(inFlightSeek.get().seekRequest);
} else {
nextOffsetTracker
.requestForRestart()
.ifPresent(
request -> {
internalSeekInFlight = true;
connectedSubscriber.get().seek(request);
});
}
tokenCounter
.requestForRestart()
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
Expand Down Expand Up @@ -212,9 +238,13 @@ private Status onSeekResponse(Offset seekOffset) {
if (shutdown) {
return Status.OK;
}
if (internalSeekInFlight) {
internalSeekInFlight = false;
return Status.OK;
}
checkState(inFlightSeek.isPresent(), "No in flight seek, but received a seek response.");
nextOffsetTracker.onClientSeek(seekOffset);
inFlightSeek.get().set(seekOffset);
inFlightSeek.get().seekFuture.set(seekOffset);
inFlightSeek = Optional.empty();
return Status.OK;
} catch (StatusException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ void onMessages(Collection<SequencedMessage> received) throws StatusException {
messages -= received.size();
}

void onClientSeek() {
bytes = 0;
messages = 0;
}

Optional<FlowControlRequest> requestForRestart() {
if (bytes == 0 && messages == 0) return Optional.empty();
return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.pubsublite.internal.StatusExceptionMatcher.assertFutureThrowsCode;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
Expand All @@ -40,6 +41,7 @@
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.internal.StatusExceptionMatcher;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber.Response;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
Expand Down Expand Up @@ -96,7 +98,7 @@ private static SubscribeRequest initialRequest() {

private final Listener permanentErrorHandler = mock(Listener.class);

private Subscriber subscriber;
private SubscriberImpl subscriber;
private StreamObserver<Response> leakedResponseObserver;

@Before
Expand Down Expand Up @@ -222,4 +224,37 @@ public void messageResponseSubtracts() {
verify(permanentErrorHandler)
.failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION)));
}

@Test
public void reinitialize_resendsInFlightSeek() {
Offset offset = Offset.of(1);
SeekRequest seekRequest =
SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset.value())).build();
ApiFuture<Offset> future = subscriber.seek(seekRequest);
assertThat(subscriber.seekInFlight()).isTrue();

subscriber.triggerReinitialize();
verify(mockConnectedSubscriber, times(2)).seek(seekRequest);

leakedResponseObserver.onNext(Response.ofSeekOffset(offset));
assertTrue(future.isDone());
assertThat(subscriber.seekInFlight()).isFalse();
}

@Test
public void reinitialize_sendsNextOffsetSeek() {
subscriber.allowFlow(bigFlowControlRequest());
ImmutableList<SequencedMessage> messages =
ImmutableList.of(
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10),
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
leakedResponseObserver.onNext(Response.ofMessages(messages));
verify(mockMessageConsumer).accept(messages);

subscriber.triggerReinitialize();
verify(mockConnectedSubscriber)
.seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(2)).build());
assertThat(subscriber.seekInFlight()).isFalse();
leakedResponseObserver.onNext(Response.ofSeekOffset(Offset.of(2)));
}
}