Skip to content

Commit 8fadf5e

Browse files
authored
fix: Add an exception interceptor to convert RST_STREAM errors to unavailable (#3701)
* fix: fix rst error in mutations * fix beam test * use interceptor * remove test for checkAndMutate * fix format * fix integration test * remove running in parallel * remove extra config * increase timeout
1 parent 2ec77be commit 8fadf5e

File tree

8 files changed

+218
-40
lines changed

8 files changed

+218
-40
lines changed

bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/BigtableSession.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.cloud.bigtable.grpc.async.BulkMutation;
3737
import com.google.cloud.bigtable.grpc.async.BulkMutationWrapper;
3838
import com.google.cloud.bigtable.grpc.async.BulkRead;
39+
import com.google.cloud.bigtable.grpc.async.ConvertExceptionInterceptor;
3940
import com.google.cloud.bigtable.grpc.async.ResourceLimiter;
4041
import com.google.cloud.bigtable.grpc.async.ResourceLimiterStats;
4142
import com.google.cloud.bigtable.grpc.async.ThrottlingClientInterceptor;
@@ -339,6 +340,7 @@ private List<ClientInterceptor> createDataApiInterceptors(
339340
if (options.getInstanceName() != null) {
340341
interceptors.add(
341342
new GoogleCloudResourcePrefixInterceptor(options.getInstanceName().toString()));
343+
interceptors.add(new ConvertExceptionInterceptor());
342344
}
343345

344346
interceptors.add(createGaxHeaderInterceptor());

bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/async/AbstractRetryingOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ protected void onError(Status status, Metadata trailers) {
206206
|| !isStatusRetryable(status)
207207
// Unauthenticated is special because the request never made it to
208208
// to the server, so all requests are retryable
209-
|| !(isRequestRetryable() || code == Code.UNAUTHENTICATED || code == Code.UNAVAILABLE)) {
209+
|| !(isRequestRetryable() || code == Code.UNAUTHENTICATED)) {
210210
LOG.error(
211211
"Could not complete RPC. Failure #%d, got: %s on channel %s.\nTrailers: %s",
212212
status.getCause(), failedCount, status, channelId, trailers);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2022 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.grpc.async;
17+
18+
import io.grpc.CallOptions;
19+
import io.grpc.Channel;
20+
import io.grpc.ClientCall;
21+
import io.grpc.ClientInterceptor;
22+
import io.grpc.ForwardingClientCall;
23+
import io.grpc.ForwardingClientCallListener;
24+
import io.grpc.Metadata;
25+
import io.grpc.MethodDescriptor;
26+
import io.grpc.Status;
27+
28+
public class ConvertExceptionInterceptor implements ClientInterceptor {
29+
@Override
30+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
31+
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
32+
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
33+
channel.newCall(methodDescriptor, callOptions)) {
34+
@Override
35+
public void start(Listener<RespT> responseListener, Metadata headers) {
36+
this.delegate()
37+
.start(
38+
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
39+
responseListener) {
40+
@Override
41+
public void onClose(Status status, Metadata trailers) {
42+
if (status.getCode() == Status.INTERNAL.getCode()) {
43+
String description = status.getDescription();
44+
if (description != null
45+
&& (description.toLowerCase().contains("rst_stream")
46+
|| description.toLowerCase().contains("rst stream"))) {
47+
this.delegate()
48+
.onClose(
49+
Status.UNAVAILABLE.withDescription(status.getDescription()),
50+
trailers);
51+
return;
52+
}
53+
}
54+
this.delegate().onClose(status, trailers);
55+
}
56+
},
57+
headers);
58+
}
59+
};
60+
}
61+
}

bigtable-client-core-parent/bigtable-client-core/src/main/java/com/google/cloud/bigtable/grpc/scanner/RetryingReadRowsOperation.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.google.protobuf.ByteString;
3232
import io.grpc.Metadata;
3333
import io.grpc.Status;
34-
import io.grpc.Status.Code;
3534
import io.grpc.stub.ClientResponseObserver;
3635
import io.grpc.stub.StreamObserver;
3736
import io.opencensus.trace.AttributeValue;
@@ -244,20 +243,6 @@ protected boolean isRequestRetryable() {
244243
return true;
245244
}
246245

247-
/** Read rows requests are retryable if the status is a rst stream error. */
248-
@Override
249-
protected boolean isStatusRetryable(Status status) {
250-
return retryOptions.isRetryable(status.getCode()) || isRstStream(status);
251-
}
252-
253-
private boolean isRstStream(Status status) {
254-
if (status.getCode() == Code.INTERNAL && status.getDescription() != null) {
255-
String description = status.getDescription().toLowerCase();
256-
return description.contains("rst stream") || description.contains("rst_stream");
257-
}
258-
return false;
259-
}
260-
261246
/** {@inheritDoc} */
262247
@Override
263248
protected boolean onOK(Metadata trailers) {
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2022 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.grpc;
17+
18+
import com.google.api.gax.grpc.GrpcStatusCode;
19+
import com.google.api.gax.rpc.InternalException;
20+
import com.google.bigtable.v2.BigtableGrpc;
21+
import com.google.bigtable.v2.CheckAndMutateRowRequest;
22+
import com.google.bigtable.v2.CheckAndMutateRowResponse;
23+
import com.google.bigtable.v2.MutateRowsRequest;
24+
import com.google.bigtable.v2.MutateRowsResponse;
25+
import com.google.bigtable.v2.ReadRowsRequest;
26+
import com.google.bigtable.v2.ReadRowsResponse;
27+
import com.google.bigtable.v2.Row;
28+
import com.google.cloud.bigtable.config.BigtableOptions;
29+
import com.google.cloud.bigtable.grpc.scanner.ResultScanner;
30+
import io.grpc.Server;
31+
import io.grpc.ServerBuilder;
32+
import io.grpc.Status;
33+
import io.grpc.StatusRuntimeException;
34+
import io.grpc.stub.StreamObserver;
35+
import java.io.IOException;
36+
import java.util.List;
37+
import java.util.concurrent.atomic.AtomicInteger;
38+
import org.junit.After;
39+
import org.junit.Assert;
40+
import org.junit.Before;
41+
import org.junit.Test;
42+
import org.junit.runner.RunWith;
43+
import org.junit.runners.JUnit4;
44+
45+
@RunWith(JUnit4.class)
46+
public class TestRetryRstStream {
47+
48+
private BigtableSession session;
49+
private ServerBuilder serverBuilder = ServerBuilder.forPort(1234);
50+
private Server server;
51+
private AtomicInteger attemptCount = new AtomicInteger(0);
52+
53+
@Before
54+
public void setup() throws IOException {
55+
server = serverBuilder.addService(new FakeBigtableService()).build();
56+
57+
server.start();
58+
59+
session =
60+
new BigtableSession(
61+
BigtableOptions.builder()
62+
.setProjectId("fake-project")
63+
.setInstanceId("fake-instance")
64+
.setUserAgent("test")
65+
.enableEmulator("localhost:1234")
66+
.build());
67+
}
68+
69+
@Test
70+
public void testReadRowsIsRetried() throws IOException {
71+
try {
72+
ResultScanner<Row> rows =
73+
session.getDataClient().readRows(ReadRowsRequest.getDefaultInstance());
74+
rows.next();
75+
} catch (Exception e) {
76+
Assert.fail("rst errors should be retried");
77+
}
78+
Assert.assertEquals(2, attemptCount.get());
79+
}
80+
81+
@Test
82+
public void testMutateRowsIsRetried() {
83+
try {
84+
List<MutateRowsResponse> responses =
85+
session.getDataClient().mutateRows(MutateRowsRequest.getDefaultInstance());
86+
responses.get(0);
87+
} catch (Exception e) {
88+
Assert.fail("rst errors should be retried");
89+
}
90+
Assert.assertEquals(2, attemptCount.get());
91+
}
92+
93+
@After
94+
public void close() {
95+
server.shutdown();
96+
}
97+
98+
private class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
99+
@Override
100+
public void readRows(
101+
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
102+
if (attemptCount.getAndIncrement() == 0) {
103+
responseObserver.onError(
104+
new InternalException(
105+
new StatusRuntimeException(
106+
Status.INTERNAL.withDescription(
107+
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
108+
GrpcStatusCode.of(Status.Code.INTERNAL),
109+
false));
110+
} else {
111+
responseObserver.onNext(ReadRowsResponse.getDefaultInstance());
112+
responseObserver.onCompleted();
113+
}
114+
}
115+
116+
@Override
117+
public void mutateRows(
118+
MutateRowsRequest request, StreamObserver<MutateRowsResponse> responseObserver) {
119+
if (attemptCount.getAndIncrement() == 0) {
120+
responseObserver.onError(
121+
new InternalException(
122+
new StatusRuntimeException(
123+
Status.INTERNAL.withDescription(
124+
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
125+
GrpcStatusCode.of(Status.Code.INTERNAL),
126+
false));
127+
} else {
128+
responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
129+
responseObserver.onCompleted();
130+
}
131+
}
132+
133+
@Override
134+
public void checkAndMutateRow(
135+
CheckAndMutateRowRequest request,
136+
StreamObserver<CheckAndMutateRowResponse> responseObserver) {
137+
if (attemptCount.getAndIncrement() == 0) {
138+
responseObserver.onError(
139+
new InternalException(
140+
new StatusRuntimeException(
141+
Status.INTERNAL.withDescription(
142+
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
143+
GrpcStatusCode.of(Status.Code.INTERNAL),
144+
false));
145+
} else {
146+
responseObserver.onNext(CheckAndMutateRowResponse.getDefaultInstance());
147+
responseObserver.onCompleted();
148+
}
149+
}
150+
}
151+
}

bigtable-client-core-parent/bigtable-client-core/src/test/java/com/google/cloud/bigtable/grpc/scanner/RetryingReadRowsOperationTest.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -518,27 +518,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
518518
Assert.assertTrue(underTest.getRowMerger().isComplete());
519519
}
520520

521-
@Test
522-
public void testRetryRstStream() throws Exception {
523-
RetryingReadRowsOperation underTest = createOperation();
524-
start(underTest);
525-
526-
ByteString key1 = ByteString.copyFrom("SomeKey1", "UTF-8");
527-
ByteString key2 = ByteString.copyFrom("SomeKey2", "UTF-8");
528-
underTest.onMessage(buildResponse(key1));
529-
underTest.onClose(
530-
Status.INTERNAL.withDescription(
531-
"INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream"),
532-
null);
533-
Assert.assertFalse(underTest.getRowMerger().isComplete());
534-
underTest.onMessage(buildResponse(key2));
535-
verify(mockFlatRowObserver, times(2)).onNext(any(FlatRow.class));
536-
checkRetryRequest(underTest, key2, 8);
537-
verify(mockClientCall, times(4)).request(eq(1));
538-
539-
finishOK(underTest, 1);
540-
}
541-
542521
protected void performTimeout(RetryingReadRowsOperation underTest) {
543522
underTest.onClose(
544523
Status.CANCELLED.withCause(

bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/EndToEndIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public void testHBaseSnapshotImport() throws Exception {
292292

293293
// Validate the counters.
294294
Map<String, Long> counters = getCountMap(result);
295-
Assert.assertEquals(counters.get("ranges_matched"), (Long) 101L);
295+
Assert.assertEquals(counters.get("ranges_matched"), (Long) 100L);
296296
Assert.assertEquals(counters.get("ranges_not_matched"), (Long) 0L);
297297
}
298298

@@ -355,7 +355,7 @@ public void testHBaseSnapshotImportWithCorruptions() throws Exception {
355355

356356
// Assert that the output collection is the right one.
357357
Map<String, Long> counters = getCountMap(result);
358-
Assert.assertEquals(counters.get("ranges_matched"), (Long) 97L);
358+
Assert.assertEquals(counters.get("ranges_matched"), (Long) 96L);
359359
Assert.assertEquals(counters.get("ranges_not_matched"), (Long) 4L);
360360
}
361361
}

bigtable-hbase-1.x-parent/bigtable-hbase-1.x-integration-tests/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ limitations under the License.
3535
<google.bigtable.connection.impl>
3636
com.google.cloud.bigtable.hbase1_x.BigtableConnection
3737
</google.bigtable.connection.impl>
38-
<test.timeout>1800</test.timeout>
38+
<test.timeout>2700</test.timeout>
3939
</properties>
4040

4141
<profiles>

0 commit comments

Comments
 (0)