Skip to content

Commit e5884cc

Browse files
egreco12Evan Greco
andauthored
fix: Create next attempt after first attempt to initialize exponential backoff settings. (#2316)
--------- Co-authored-by: Evan Greco <egreco@google.com>
1 parent 75c2552 commit e5884cc

File tree

4 files changed

+74
-10
lines changed

4 files changed

+74
-10
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -971,15 +971,22 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
971971
try {
972972
requestWrapper.retryCount++;
973973
if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) {
974-
// Trigger exponential backoff in append loop when request is resent for quota errors
975-
if (requestWrapper.attemptSettings == null) {
976-
requestWrapper.attemptSettings = requestWrapper.retryAlgorithm.createFirstAttempt();
977-
} else {
978-
requestWrapper.attemptSettings =
979-
requestWrapper.retryAlgorithm.createNextAttempt(requestWrapper.attemptSettings);
980-
}
974+
// Trigger exponential backoff in append loop when request is resent for quota errors.
975+
// createNextAttempt correctly initializes the retry delay; createfirstAttempt does not
976+
// include a positive delay, just 0.
977+
requestWrapper.attemptSettings =
978+
requestWrapper.retryAlgorithm.createNextAttempt(
979+
requestWrapper.attemptSettings == null
980+
? requestWrapper.retryAlgorithm.createFirstAttempt()
981+
: requestWrapper.attemptSettings);
981982
requestWrapper.blockMessageSendDeadline =
982983
Instant.now().plusMillis(requestWrapper.attemptSettings.getRetryDelay().toMillis());
984+
log.info(
985+
"Messages blocked for retry for "
986+
+ java.time.Duration.between(
987+
java.time.Instant.now(), requestWrapper.blockMessageSendDeadline)
988+
+ " until "
989+
+ requestWrapper.blockMessageSendDeadline);
983990
}
984991

985992
Long offset =

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWrite.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.protobuf.AbstractMessage;
2020
import io.grpc.ServerServiceDefinition;
2121
import io.grpc.Status;
22+
import java.time.Instant;
23+
import java.util.ArrayList;
2224
import java.util.LinkedList;
2325
import java.util.List;
2426
import java.util.concurrent.ScheduledExecutorService;
@@ -128,4 +130,8 @@ public void setReturnErrorDuringExclusiveStreamRetry(boolean retryOnError) {
128130
public void setVerifyOffset(boolean verifyOffset) {
129131
serviceImpl.setVerifyOffset(verifyOffset);
130132
}
133+
134+
public ArrayList<Instant> getLatestRequestReceivedInstants() {
135+
return serviceImpl.getLatestRequestReceivedInstants();
136+
}
131137
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/FakeBigQueryWriteImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.rpc.Code;
2121
import io.grpc.Status;
2222
import io.grpc.stub.StreamObserver;
23+
import java.time.Instant;
2324
import java.util.ArrayList;
2425
import java.util.Collections;
2526
import java.util.List;
@@ -73,6 +74,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
7374
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
7475
new ConcurrentHashMap<>();
7576
private Status failedStatus = Status.ABORTED;
77+
private ArrayList<Instant> requestReceivedInstants = new ArrayList<>();
7678

7779
/** Class used to save the state of a possible response. */
7880
public static class Response {
@@ -111,6 +113,10 @@ public String toString() {
111113
}
112114
}
113115

116+
public ArrayList<Instant> getLatestRequestReceivedInstants() {
117+
return requestReceivedInstants;
118+
}
119+
114120
@Override
115121
public void getWriteStream(
116122
GetWriteStreamRequest request, StreamObserver<WriteStream> responseObserver) {
@@ -197,6 +203,7 @@ public StreamObserver<AppendRowsRequest> appendRows(
197203
new StreamObserver<AppendRowsRequest>() {
198204
@Override
199205
public void onNext(AppendRowsRequest value) {
206+
requestReceivedInstants.add(Instant.now());
200207
recordCount++;
201208
requests.add(value);
202209
long offset = value.getOffset().getValue();

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import io.grpc.Status;
5555
import io.grpc.StatusRuntimeException;
5656
import java.io.IOException;
57+
import java.time.Instant;
5758
import java.util.ArrayList;
5859
import java.util.Arrays;
5960
import java.util.HashMap;
@@ -86,12 +87,15 @@ public class StreamWriterTest {
8687
private static final String EXPLICIT_STREAM = "projects/p/datasets/d1/tables/t1/streams/s1";
8788
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
8889
private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
90+
private static final long INITIAL_RETRY_MILLIS = 500;
91+
private static final double RETRY_MULTIPLIER = 1.3;
92+
private static final int MAX_RETRY_DELAY_MINUTES = 5;
8993
private static final RetrySettings retrySettings =
9094
RetrySettings.newBuilder()
91-
.setInitialRetryDelay(Duration.ofMillis(500))
92-
.setRetryDelayMultiplier(1.1)
95+
.setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS))
96+
.setRetryDelayMultiplier(RETRY_MULTIPLIER)
9397
.setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS)
94-
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(5))
98+
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES))
9599
.build();
96100
private FakeScheduledExecutorService fakeExecutor;
97101
private FakeBigQueryWrite testBigQueryWrite;
@@ -2003,6 +2007,46 @@ public void testExclusiveAppendSuccessAndQuotaErrorRetryMaxRetry() throws Except
20032007
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
20042008
}
20052009

2010+
@Test
2011+
public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Exception {
2012+
testBigQueryWrite.setReturnErrorDuringExclusiveStreamRetry(true);
2013+
StreamWriter writer = getTestStreamWriterExclusiveRetryEnabled();
2014+
2015+
testBigQueryWrite.addResponse(
2016+
new DummyResponseSupplierWillFailThenSucceed(
2017+
new FakeBigQueryWriteImpl.Response(createAppendResponse(0)),
2018+
/* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1,
2019+
com.google.rpc.Status.newBuilder().setCode(Code.RESOURCE_EXHAUSTED.ordinal()).build()));
2020+
2021+
ApiFuture<AppendRowsResponse> future =
2022+
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);
2023+
2024+
ExecutionException ex =
2025+
assertThrows(
2026+
ExecutionException.class,
2027+
() -> {
2028+
future.get();
2029+
});
2030+
assertEquals(
2031+
Status.Code.RESOURCE_EXHAUSTED,
2032+
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
2033+
2034+
ArrayList<Instant> instants = testBigQueryWrite.getLatestRequestReceivedInstants();
2035+
Instant previousInstant = instants.get(0);
2036+
// Include initial attempt
2037+
assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1);
2038+
double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95;
2039+
for (int i = 1; i < instants.size(); i++) {
2040+
Instant currentInstant = instants.get(i);
2041+
double differenceInMillis =
2042+
java.time.Duration.between(previousInstant, currentInstant).toMillis();
2043+
assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS);
2044+
assertThat(differenceInMillis).isGreaterThan(minExpectedDelay);
2045+
minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER;
2046+
previousInstant = currentInstant;
2047+
}
2048+
}
2049+
20062050
@Test
20072051
public void testAppendSuccessAndNonRetryableError() throws Exception {
20082052
StreamWriter writer = getTestStreamWriterRetryEnabled();

0 commit comments

Comments
 (0)