Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,11 @@
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isDelayTransactionStartUntilFirstWrite()</method>
</difference>

<!-- (Internal change, use stream timeout) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc$StreamingCall</className>
<method>com.google.api.gax.rpc.ApiCallContext getCallContext()</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -74,6 +75,7 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.threeten.bp.Duration;

/** Implementation of {@link ResultSet}. */
abstract class AbstractResultSet<R> extends AbstractStructReader implements ResultSet {
Expand Down Expand Up @@ -944,6 +946,8 @@ static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>

private SpannerRpc.StreamingCall call;
private volatile boolean withBeginTransaction;
private TimeUnit streamWaitTimeoutUnit;
private long streamWaitTimeoutValue;
private SpannerException error;

@VisibleForTesting
Expand All @@ -965,6 +969,20 @@ protected final SpannerRpc.ResultStreamConsumer consumer() {
public void setCall(SpannerRpc.StreamingCall call, boolean withBeginTransaction) {
this.call = call;
this.withBeginTransaction = withBeginTransaction;
ApiCallContext callContext = call.getCallContext();
Duration streamWaitTimeout = callContext == null ? null : callContext.getStreamWaitTimeout();
if (streamWaitTimeout != null) {
// Determine the timeout unit to use. This reduces the precision to seconds if the timeout
// value is more than 1 second, which is lower than the precision that would normally be
// used by the stream watchdog (which uses a precision of 10 seconds by default).
if (streamWaitTimeout.getSeconds() > 0L) {
streamWaitTimeoutValue = streamWaitTimeout.getSeconds();
streamWaitTimeoutUnit = TimeUnit.SECONDS;
} else if (streamWaitTimeout.getNano() > 0) {
streamWaitTimeoutValue = streamWaitTimeout.getNano();
streamWaitTimeoutUnit = TimeUnit.NANOSECONDS;
}
}
}

@Override
Expand All @@ -983,11 +1001,15 @@ public boolean isWithBeginTransaction() {
protected final PartialResultSet computeNext() {
PartialResultSet next;
try {
// TODO: Ideally honor io.grpc.Context while blocking here. In practice,
// cancellation/deadline results in an error being delivered to "stream", which
// should mean that we do not block significantly longer afterwards, but it would
// be more robust to use poll() with a timeout.
next = stream.take();
if (streamWaitTimeoutUnit != null) {
next = stream.poll(streamWaitTimeoutValue, streamWaitTimeoutUnit);
if (next == null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.DEADLINE_EXCEEDED, "stream wait timeout");
}
} else {
next = stream.take();
}
} catch (InterruptedException e) {
// Treat interrupt as a request to cancel the read.
throw SpannerExceptionFactory.propagateInterrupt(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1596,20 +1596,7 @@ public StreamingCall read(
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.streamingReadCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
public void request(int numMessage) {
controller.request(numMessage);
}

// TODO(hzyi): streamController currently does not support cancel with message. Add
// this in gax and update this method later
@Override
public void cancel(String message) {
controller.cancel();
}
};
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
Expand Down Expand Up @@ -1673,22 +1660,10 @@ public StreamingCall executeQuery(
request,
SpannerGrpc.getExecuteStreamingSqlMethod(),
routeToLeader);

SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
final StreamController controller = responseObserver.getController();
return new StreamingCall() {
@Override
public void request(int numMessage) {
controller.request(numMessage);
}

// TODO(hzyi): streamController currently does not support cancel with message. Add
// this in gax and update this method later
@Override
public void cancel(String message) {
controller.cancel();
}
};
return new GrpcStreamingCall(context, responseObserver.getController());
}

@Override
Expand Down Expand Up @@ -1957,6 +1932,31 @@ public boolean isClosed() {
return rpcIsClosed;
}

private static final class GrpcStreamingCall implements StreamingCall {
private final ApiCallContext callContext;
private final StreamController controller;

GrpcStreamingCall(ApiCallContext callContext, StreamController controller) {
this.callContext = callContext;
this.controller = controller;
}

@Override
public ApiCallContext getCallContext() {
return callContext;
}

@Override
public void request(int numMessages) {
controller.request(numMessages);
}

@Override
public void cancel(@Nullable String message) {
controller.cancel();
}
}

/**
* A {@code ResponseObserver} that exposes the {@code StreamController} and delegates callbacks to
* the {@link ResultStreamConsumer}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.InternalApi;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceRpc;
import com.google.cloud.spanner.BackupId;
Expand Down Expand Up @@ -150,6 +151,9 @@ interface ResultStreamConsumer {
/** Handle for cancellation of a streaming read or query call. */
interface StreamingCall {

/** Returns the {@link ApiCallContext} that is used for this streaming call. */
ApiCallContext getCallContext();

/**
* Requests more messages from the stream. We disable the auto flow control mechanism in grpc,
* so we need to request messages ourself. This gives us more control over how much buffer we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
Expand All @@ -51,6 +52,7 @@
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator;
import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
Expand All @@ -77,6 +79,7 @@
import com.google.spanner.v1.TypeAnnotationCode;
import com.google.spanner.v1.TypeCode;
import io.grpc.Context;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -2963,6 +2966,37 @@ public void testStatementWithBytesArrayParameter() {
}
}

@Test
public void testStreamWaitTimeout() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// Add a wait time to the mock server. Note that the test won't actually wait 100ms, as it uses
// a 1ns time out.
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofMinimumAndRandomTime(100, 0));
// Create a custom call configuration that uses a 1 nanosecond stream timeout value. This will
// always time out, as a call to the mock server will always take more than 1 nanosecond.
CallContextConfigurator configurator =
new CallContextConfigurator() {
@Override
public <ReqT, RespT> ApiCallContext configure(
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
return context.withStreamWaitTimeout(Duration.ofNanos(1L));
}
};
Context context =
Context.current().withValue(SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY, configurator);
context.run(
() -> {
try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) {
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
assertTrue(
exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
}
});
}

static void assertAsString(String expected, ResultSet resultSet, int col) {
assertEquals(expected, resultSet.getValue(col).getAsString());
assertEquals(ImmutableList.of(expected), resultSet.getValue(col).getAsStringList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
Expand Down Expand Up @@ -50,6 +52,7 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;

/** Unit tests for {@link com.google.cloud.spanner.AbstractResultSet.GrpcResultSet}. */
@RunWith(JUnit4.class)
Expand All @@ -58,6 +61,7 @@ public class GrpcResultSetTest {
private AbstractResultSet.GrpcResultSet resultSet;
private SpannerRpc.ResultStreamConsumer consumer;
private AbstractResultSet.GrpcStreamIterator stream;
private final Duration streamWaitTimeout = Duration.ofNanos(1L);

private static class NoOpListener implements AbstractResultSet.Listener {
@Override
Expand All @@ -78,6 +82,11 @@ public void setUp() {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault().withStreamWaitTimeout(streamWaitTimeout);
}

@Override
public void cancel(@Nullable String message) {}

Expand All @@ -93,6 +102,14 @@ public AbstractResultSet.GrpcResultSet resultSetWithMode(QueryMode queryMode) {
return new AbstractResultSet.GrpcResultSet(stream, new NoOpListener());
}

@Test
public void testStreamTimeout() {
// We don't add any results to the stream. That means that it will time out after 1ns.
SpannerException exception = assertThrows(SpannerException.class, resultSet::next);
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
assertTrue(exception.getMessage(), exception.getMessage().contains("stream wait timeout"));
}

@Test
public void metadata() {
Type rowType = Type.struct(Type.StructField.of("f", Type.string()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.ByteArray;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.io.Resources;
Expand Down Expand Up @@ -115,6 +117,11 @@ private void run() throws Exception {
stream = new AbstractResultSet.GrpcStreamIterator(10);
stream.setCall(
new SpannerRpc.StreamingCall() {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault();
}

@Override
public void cancel(@Nullable String message) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.NanoClock;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.cloud.Timestamp;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
Expand Down Expand Up @@ -407,6 +409,11 @@ public void singleUseReadOnlyTransactionReturnsEmptyTransactionMetadata() {
}

private static class NoOpStreamingCall implements SpannerRpc.StreamingCall {
@Override
public ApiCallContext getCallContext() {
return GrpcCallContext.createDefault();
}

@Override
public void cancel(@Nullable String message) {}

Expand Down