Skip to content

Commit 9a18523

Browse files
egreco12Evan Greco
andauthored
feat: Enable instream retry for default streams when Multiplexing. (#2376)
* feat: Enable instream retry for default streams when Multiplexing. * enable retrysettings for multiplexing test writers --------- Co-authored-by: Evan Greco <egreco@google.com>
1 parent c00661c commit 9a18523

File tree

4 files changed

+24
-30
lines changed

4 files changed

+24
-30
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public class ConnectionWorkerPool {
6666
*/
6767
private final java.time.Duration maxRetryDuration;
6868

69+
/*
70+
* Retry settings for in-stream retries.
71+
*/
6972
private RetrySettings retrySettings;
7073

7174
/*
@@ -208,7 +211,8 @@ public abstract static class Builder {
208211
FlowController.LimitExceededBehavior limitExceededBehavior,
209212
String traceId,
210213
@Nullable String comperssorName,
211-
BigQueryWriteSettings clientSettings) {
214+
BigQueryWriteSettings clientSettings,
215+
RetrySettings retrySettings) {
212216
this.maxInflightRequests = maxInflightRequests;
213217
this.maxInflightBytes = maxInflightBytes;
214218
this.maxRetryDuration = maxRetryDuration;
@@ -217,8 +221,7 @@ public abstract static class Builder {
217221
this.compressorName = comperssorName;
218222
this.clientSettings = clientSettings;
219223
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
220-
// In-stream retry is not enabled for multiplexing.
221-
this.retrySettings = null;
224+
this.retrySettings = retrySettings;
222225
}
223226

224227
/**

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,6 @@ private StreamWriter(Builder builder) throws IOException {
237237
"Trying to enable connection pool in non-default stream.");
238238
}
239239

240-
if (builder.retrySettings != null) {
241-
log.warning("Retry settings is only allowed when connection pool is not enabled.");
242-
throw new IllegalArgumentException(
243-
"Trying to enable connection pool while providing retry settings.");
244-
}
245-
246240
// We need a client to perform some getWriteStream calls.
247241
BigQueryWriteClient client =
248242
builder.client != null ? builder.client : new BigQueryWriteClient(clientSettings);
@@ -295,7 +289,8 @@ private StreamWriter(Builder builder) throws IOException {
295289
builder.limitExceededBehavior,
296290
builder.traceId,
297291
builder.compressorName,
298-
client.getSettings());
292+
client.getSettings(),
293+
builder.retrySettings);
299294
}));
300295
validateFetchedConnectonPool(builder);
301296
// If the client is not from outside, then shutdown the client we created.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.api.gax.core.NoCredentialsProvider;
2424
import com.google.api.gax.grpc.testing.MockGrpcService;
2525
import com.google.api.gax.grpc.testing.MockServiceHelper;
26+
import com.google.api.gax.retrying.RetrySettings;
2627
import com.google.cloud.bigquery.storage.test.Test.FooType;
2728
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
2829
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -58,6 +59,17 @@ public class ConnectionWorkerPoolTest {
5859
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
5960
private static final String TEST_STREAM_1 = "projects/p1/datasets/d1/tables/t1/streams/_default";
6061
private static final String TEST_STREAM_2 = "projects/p1/datasets/d1/tables/t2/streams/_default";
62+
private static final int MAX_RETRY_NUM_ATTEMPTS = 3;
63+
private static final long INITIAL_RETRY_MILLIS = 500;
64+
private static final double RETRY_MULTIPLIER = 1.3;
65+
private static final int MAX_RETRY_DELAY_MINUTES = 5;
66+
private static final RetrySettings retrySettings =
67+
RetrySettings.newBuilder()
68+
.setInitialRetryDelay(Duration.ofMillis(INITIAL_RETRY_MILLIS))
69+
.setRetryDelayMultiplier(RETRY_MULTIPLIER)
70+
.setMaxAttempts(MAX_RETRY_NUM_ATTEMPTS)
71+
.setMaxRetryDelay(org.threeten.bp.Duration.ofMinutes(MAX_RETRY_DELAY_MINUTES))
72+
.build();
6173

6274
@Before
6375
public void setUp() throws Exception {
@@ -398,6 +410,7 @@ public void testCloseExternalClient()
398410
.setWriterSchema(createProtoSchema())
399411
.setTraceId(TEST_TRACE_ID)
400412
.setLocation("us")
413+
.setRetrySettings(retrySettings)
401414
.build());
402415
}
403416

@@ -483,6 +496,7 @@ ConnectionWorkerPool createConnectionWorkerPool(
483496
FlowController.LimitExceededBehavior.Block,
484497
TEST_TRACE_ID,
485498
null,
486-
clientSettings);
499+
clientSettings,
500+
retrySettings);
487501
}
488502
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -536,25 +536,6 @@ public void testShortenStreamNameAllowed() throws Exception {
536536
.build();
537537
}
538538

539-
@Test
540-
public void testNoRetryWhenConnectionPoolEnabled() throws Exception {
541-
IllegalArgumentException ex =
542-
assertThrows(
543-
IllegalArgumentException.class,
544-
new ThrowingRunnable() {
545-
@Override
546-
public void run() throws Throwable {
547-
StreamWriter.newBuilder(TEST_STREAM_SHORTEN, client)
548-
.setEnableConnectionPool(true)
549-
.setRetrySettings(RetrySettings.newBuilder().build())
550-
.build();
551-
}
552-
});
553-
assertTrue(
554-
ex.getMessage()
555-
.contains("Trying to enable connection pool while providing retry settings."));
556-
}
557-
558539
@Test
559540
public void testAppendSuccessAndConnectionError() throws Exception {
560541
StreamWriter writer =
@@ -1429,6 +1410,7 @@ public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOExce
14291410
.setMaxInflightRequests(10)
14301411
.setLocation("US")
14311412
.setMaxRetryDuration(java.time.Duration.ofMillis(100))
1413+
.setRetrySettings(retrySettings)
14321414
.build();
14331415
}
14341416

0 commit comments

Comments
 (0)