Skip to content

Commit a7b0f87

Browse files
committed
fix: recalculate remaining statement timeout after retry
If a connection has a statement timeout and a statement is executed in a read/write transaction, the statement timeout would be reset to the original value during each retry, instead of being reduced after each attempt. This could cause multiple transaction retries to make the execution of a single statement take much longer than the configured timeout value.
1 parent 468b7c0 commit a7b0f87

File tree

4 files changed

+89
-12
lines changed

4 files changed

+89
-12
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AbstractBaseUnitOfWork.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,18 @@
3939
import com.google.cloud.spanner.Struct;
4040
import com.google.cloud.spanner.Type.StructField;
4141
import com.google.cloud.spanner.connection.AbstractStatementParser.ParsedStatement;
42-
import com.google.cloud.spanner.connection.ReadWriteTransaction.Builder;
4342
import com.google.cloud.spanner.connection.StatementExecutor.StatementTimeout;
4443
import com.google.common.base.Preconditions;
4544
import com.google.common.collect.ImmutableList;
4645
import com.google.common.util.concurrent.MoreExecutors;
4746
import io.grpc.Context;
47+
import io.grpc.Deadline;
4848
import io.grpc.MethodDescriptor;
4949
import io.grpc.Status;
5050
import io.opentelemetry.api.common.AttributeKey;
5151
import io.opentelemetry.api.trace.Span;
5252
import io.opentelemetry.context.Scope;
53+
import java.time.Duration;
5354
import java.util.Collection;
5455
import java.util.Collections;
5556
import java.util.HashSet;
@@ -358,6 +359,9 @@ <T> ApiFuture<T> executeStatementAsync(
358359
}
359360
Context context = Context.current();
360361
if (statementTimeout.hasTimeout() && !applyStatementTimeoutToMethods.isEmpty()) {
362+
Deadline deadline =
363+
Deadline.after(
364+
statementTimeout.getTimeoutValue(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
361365
context =
362366
context.withValue(
363367
SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY,
@@ -367,8 +371,14 @@ public <ReqT, RespT> ApiCallContext configure(
367371
ApiCallContext context, ReqT request, MethodDescriptor<ReqT, RespT> method) {
368372
if (statementTimeout.hasTimeout()
369373
&& applyStatementTimeoutToMethods.contains(method)) {
374+
// Calculate the remaining timeout. This method could be called multiple times
375+
// if the transaction is retried.
376+
long remainingTimeout = deadline.timeRemaining(TimeUnit.MILLISECONDS);
377+
if (remainingTimeout <= 0) {
378+
remainingTimeout = 1;
379+
}
370380
return GrpcCallContext.createDefault()
371-
.withTimeoutDuration(statementTimeout.asDuration());
381+
.withTimeoutDuration(Duration.ofMillis(remainingTimeout));
372382
}
373383
return null;
374384
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbortedTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.cloud.Timestamp;
2828
import com.google.cloud.spanner.AbortedDueToConcurrentModificationException;
2929
import com.google.cloud.spanner.ErrorCode;
30+
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
3031
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
3132
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
3233
import com.google.cloud.spanner.ResultSet;
@@ -52,6 +53,7 @@
5253
import io.grpc.StatusRuntimeException;
5354
import java.util.Collections;
5455
import java.util.List;
56+
import java.util.concurrent.TimeUnit;
5557
import java.util.stream.Collectors;
5658
import java.util.stream.LongStream;
5759
import org.junit.Test;
@@ -583,6 +585,29 @@ public void testAbortedWithBitReversedSequence() {
583585
}
584586
}
585587

588+
@Test
589+
public void testTimeoutWithRetries() {
590+
// Verifies that even though a single execution of a statement does not exceed the deadline,
591+
// repeated retries of the statement does cause the deadline to be exceeded.
592+
try (ITConnection connection = createConnection()) {
593+
for (boolean autoCommit : new boolean[] {true, false}) {
594+
connection.setAutocommit(autoCommit);
595+
mockSpanner.setAbortProbability(1.0);
596+
mockSpanner.setExecuteSqlExecutionTime(SimulatedExecutionTime.ofMinimumAndRandomTime(1, 0));
597+
598+
connection.setStatementTimeout(10, TimeUnit.MILLISECONDS);
599+
SpannerException exception =
600+
assertThrows(SpannerException.class, () -> connection.execute(INSERT_STATEMENT));
601+
assertEquals(ErrorCode.DEADLINE_EXCEEDED, exception.getErrorCode());
602+
if (!autoCommit) {
603+
connection.rollback();
604+
}
605+
}
606+
} finally {
607+
mockSpanner.setAbortProbability(0.0);
608+
}
609+
}
610+
586611
static com.google.spanner.v1.ResultSet createBitReversedSequenceResultSet(
587612
long startValue, long endValue) {
588613
return com.google.spanner.v1.ResultSet.newBuilder()

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/AbstractMockServerTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ public static void stopServer() {
249249
@Before
250250
public void setupResults() {
251251
mockSpanner.clearRequests();
252+
mockSpanner.removeAllExecutionTimes();
252253
mockDatabaseAdmin.getRequests().clear();
253254
mockInstanceAdmin.getRequests().clear();
254255
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionAsyncApiTest.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.spanner.SpannerApiFutures.get;
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertFalse;
2223
import static org.junit.Assert.assertNull;
2324
import static org.junit.Assert.assertThrows;
2425
import static org.junit.Assert.fail;
@@ -535,8 +536,16 @@ private void testExecuteQuery(Function<Connection, Void> connectionConfigurator)
535536
}
536537
assertThat(timeout).isFalse();
537538
} catch (SpannerException e) {
538-
assertThat(timeout).isTrue();
539-
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
539+
if (e.getErrorCode() == ErrorCode.FAILED_PRECONDITION
540+
&& e.getMessage()
541+
.contains(
542+
"This transaction has been invalidated by a later transaction in the same session")) {
543+
// Ignore for regular sessions.
544+
assertFalse(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
545+
} else {
546+
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
547+
assertThat(timeout).isTrue();
548+
}
540549
// Start a new transaction if a timeout occurred on a read/write transaction, as that will
541550
// invalidate that transaction.
542551
if (!connection.isReadOnly() && connection.isInTransaction()) {
@@ -577,8 +586,16 @@ private void testExecuteUpdateAsync(
577586
assertThat(connection.getCommitTimestamp()).isNotNull();
578587
assertThat(timeout).isFalse();
579588
} catch (SpannerException e) {
580-
assertThat(timeout).isTrue();
581-
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
589+
if (e.getErrorCode() == ErrorCode.FAILED_PRECONDITION
590+
&& e.getMessage()
591+
.contains(
592+
"This transaction has been invalidated by a later transaction in the same session")) {
593+
// Ignore for regular sessions.
594+
assertFalse(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
595+
} else {
596+
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
597+
assertThat(timeout).isTrue();
598+
}
582599
// Start a new transaction if a timeout occurred on a read/write transaction, as that will
583600
// invalidate that transaction.
584601
if (!connection.isReadOnly() && connection.isInTransaction()) {
@@ -611,8 +628,16 @@ private void testExecuteUpdate(Function<Connection, Void> connectionConfigurator
611628
assertThat(connection.getCommitTimestamp()).isNotNull();
612629
assertThat(timeout).isFalse();
613630
} catch (SpannerException e) {
614-
assertThat(timeout).isTrue();
615-
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
631+
if (e.getErrorCode() == ErrorCode.FAILED_PRECONDITION
632+
&& e.getMessage()
633+
.contains(
634+
"This transaction has been invalidated by a later transaction in the same session")) {
635+
// Ignore for regular sessions.
636+
assertFalse(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
637+
} else {
638+
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
639+
assertThat(timeout).isTrue();
640+
}
616641
// Start a new transaction if a timeout occurred on a read/write transaction, as that will
617642
// invalidate that transaction.
618643
if (!connection.isReadOnly() && connection.isInTransaction()) {
@@ -647,8 +672,16 @@ private void testExecuteBatchUpdateAsync(Function<Connection, Void> connectionCo
647672
assertThat(connection.getCommitTimestamp()).isNotNull();
648673
assertThat(timeout).isFalse();
649674
} catch (SpannerException e) {
650-
assertThat(timeout).isTrue();
651-
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
675+
if (e.getErrorCode() == ErrorCode.FAILED_PRECONDITION
676+
&& e.getMessage()
677+
.contains(
678+
"This transaction has been invalidated by a later transaction in the same session")) {
679+
// Ignore for regular sessions.
680+
assertFalse(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
681+
} else {
682+
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
683+
assertThat(timeout).isTrue();
684+
}
652685
// Start a new transaction if a timeout occurred on a read/write transaction, as that will
653686
// invalidate that transaction.
654687
if (!connection.isReadOnly() && connection.isInTransaction()) {
@@ -682,8 +715,16 @@ private void testExecuteBatchUpdate(Function<Connection, Void> connectionConfigu
682715
assertThat(connection.getCommitTimestamp()).isNotNull();
683716
assertThat(timeout).isFalse();
684717
} catch (SpannerException e) {
685-
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
686-
assertThat(timeout).isTrue();
718+
if (e.getErrorCode() == ErrorCode.FAILED_PRECONDITION
719+
&& e.getMessage()
720+
.contains(
721+
"This transaction has been invalidated by a later transaction in the same session")) {
722+
// Ignore for regular sessions.
723+
assertFalse(isMultiplexedSessionsEnabledForRW(connection.getSpanner()));
724+
} else {
725+
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED);
726+
assertThat(timeout).isTrue();
727+
}
687728
// Start a new transaction if a timeout occurred on a read/write transaction, as that will
688729
// invalidate that transaction.
689730
if (!connection.isReadOnly() && connection.isInTransaction()) {

0 commit comments

Comments
 (0)