Skip to content

Commit 12a2498

Browse files
feat: add an experimental feature to skip waiting for trailers for unary ops
This is off by default and can be enabled using an environment variable. When enabled, BigtableUnaryOperationCallable will resolve the user visible future immediately when a response is available and will tell metrics to freeze all timers. Metrics will still wait for the trailers in the background for necessary metadata to publish the frozen timer values. Change-Id: I2101ff375de711693720af4fd2e9535aa5355f9d
1 parent 066ae08 commit 12a2498

File tree

6 files changed

+105
-41
lines changed

6 files changed

+105
-41
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,16 @@
2424
import com.google.api.gax.rpc.ServerStreamingCallable;
2525
import com.google.api.gax.rpc.StreamController;
2626
import com.google.api.gax.rpc.UnaryCallable;
27-
import com.google.api.gax.tracing.ApiTracer;
2827
import com.google.api.gax.tracing.ApiTracerFactory;
2928
import com.google.api.gax.tracing.SpanName;
29+
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
3030
import com.google.common.base.Preconditions;
31+
import com.google.common.util.concurrent.Futures;
3132
import io.grpc.Status;
33+
import java.util.concurrent.ExecutionException;
3234
import java.util.concurrent.atomic.AtomicBoolean;
3335
import java.util.logging.Level;
3436
import java.util.logging.Logger;
35-
import javax.annotation.Nullable;
3637

3738
/**
3839
* Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link
@@ -73,9 +74,10 @@ public BigtableUnaryOperationCallable(
7374
public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
7475
apiCallContext = defaultCallContext.merge(apiCallContext);
7576

76-
ApiTracer apiTracer =
77-
tracerFactory.newTracer(
78-
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);
77+
BigtableTracer apiTracer =
78+
(BigtableTracer)
79+
tracerFactory.newTracer(
80+
apiCallContext.getTracer(), spanName, ApiTracerFactory.OperationType.Unary);
7981

8082
apiCallContext = apiCallContext.withTracer(apiTracer);
8183

@@ -85,18 +87,15 @@ public ApiFuture<RespT> futureCall(ReqT req, ApiCallContext apiCallContext) {
8587
}
8688

8789
class UnaryFuture extends AbstractApiFuture<RespT> implements ResponseObserver<RespT> {
88-
private final ApiTracer tracer;
90+
private final BigtableTracer tracer;
8991
private final boolean allowNoResponse;
9092

9193
private StreamController controller;
9294
private final AtomicBoolean upstreamCancelled = new AtomicBoolean();
93-
private boolean responseReceived;
94-
private @Nullable RespT response;
9595

96-
private UnaryFuture(ApiTracer tracer, boolean allowNoResponse) {
96+
private UnaryFuture(BigtableTracer tracer, boolean allowNoResponse) {
9797
this.tracer = Preconditions.checkNotNull(tracer, "tracer can't be null");
9898
this.allowNoResponse = allowNoResponse;
99-
this.responseReceived = false;
10099
}
101100

102101
@Override
@@ -130,23 +129,39 @@ private void cancelUpstream() {
130129
public void onResponse(RespT resp) {
131130
tracer.responseReceived();
132131

133-
// happy path - buffer the only responsse
134-
if (!responseReceived) {
135-
responseReceived = true;
136-
this.response = resp;
132+
if (set(resp)) {
133+
tracer.operationFinishEarly();
137134
return;
138135
}
139136

140-
String msg =
141-
String.format(
142-
"Received multiple responses for a %s unary operation. Previous: %s, New: %s",
143-
spanName, response, resp);
144-
logger.log(Level.WARNING, msg);
137+
// At this point we are guaranteed that the future has been resolved. However we need to check
138+
// why.
139+
// We know it's not because it was resolved with the current response. Moreover, since the
140+
// future
141+
// is resolved, our only means to flag the error is to log.
142+
// So there are 3 possibilities:
143+
// 1. user cancelled the future
144+
// 2. this is an extra response and the previous one resolved the future
145+
// 3. we got a response after the rpc failed (this should never happen and would be a bad bug)
145146

146-
InternalException error =
147-
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
148-
if (setException(error)) {
149-
tracer.operationFailed(error);
147+
if (isCancelled()) {
148+
return;
149+
}
150+
151+
try {
152+
RespT prev = Futures.getDone(this);
153+
String msg =
154+
String.format(
155+
"Received response after future is resolved for a %s unary operation. previous: %s, New response: %s",
156+
spanName, prev, resp);
157+
logger.log(Level.WARNING, msg);
158+
} catch (ExecutionException e) {
159+
// Should never happen
160+
String msg =
161+
String.format(
162+
"Received response after future resolved as a failure for a %s unary operation. New response: %s",
163+
spanName, resp);
164+
logger.log(Level.WARNING, msg, e.getCause());
150165
}
151166

152167
cancelUpstream();
@@ -158,18 +173,24 @@ public void onError(Throwable throwable) {
158173
tracer.operationFailed(throwable);
159174
} else if (isCancelled()) {
160175
tracer.operationCancelled();
176+
} else {
177+
// At this point the has been resolved, so we ignore the error
178+
tracer.operationSucceeded();
161179
}
162-
// The future might've been resolved due to double response
163180
}
164181

165182
@Override
166183
public void onComplete() {
167-
if (allowNoResponse || responseReceived) {
168-
if (set(response)) {
169-
tracer.operationSucceeded();
170-
return;
171-
}
172-
} else {
184+
if (allowNoResponse && set(null)) {
185+
tracer.operationSucceeded();
186+
return;
187+
188+
// Under normal circumstances the future wouldve been resolved in onResponse or via
189+
// set(null) if it expected for
190+
// the rpc to not have a response. So if aren't done, the only reason is that we didn't get
191+
// a response
192+
// but were expecting one
193+
} else if (!isDone()) {
173194
String msg = spanName + " unary operation completed without a response message";
174195
InternalException e =
175196
new InternalException(msg, null, GrpcStatusCode.of(Status.Code.INTERNAL), false);
@@ -183,7 +204,10 @@ public void onComplete() {
183204
// check cancellation race
184205
if (isCancelled()) {
185206
tracer.operationCancelled();
207+
return;
186208
}
209+
210+
tracer.operationSucceeded();
187211
}
188212
}
189213
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BigtableTracer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ public void afterResponse(long applicationLatency) {
5252
// noop
5353
}
5454

55+
public void operationFinishEarly() {}
56+
5557
/**
5658
* Get the attempt number of the current call. Attempt number for the current call is passed in
5759
* and should be recorded in {@link #attemptStarted(int)}. With the getter we can access it from

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsTracer.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class BuiltinMetricsTracer extends BigtableTracer {
5151
private final SpanName spanName;
5252

5353
// Operation level metrics
54+
private final AtomicBoolean operationFinishedEarly = new AtomicBoolean();
5455
private final AtomicBoolean opFinished = new AtomicBoolean();
5556
private final Stopwatch operationTimer = Stopwatch.createStarted();
5657
private final Stopwatch firstResponsePerOpTimer = Stopwatch.createStarted();
@@ -132,6 +133,13 @@ public void close() {}
132133
};
133134
}
134135

136+
@Override
137+
public void operationFinishEarly() {
138+
operationFinishedEarly.set(true);
139+
attemptTimer.stop();
140+
operationTimer.stop();
141+
}
142+
135143
@Override
136144
public void operationSucceeded() {
137145
recordOperationCompletion(null);
@@ -192,6 +200,11 @@ public void attemptPermanentFailure(Throwable throwable) {
192200
@Override
193201
public void onRequest(int requestCount) {
194202
requestLeft.accumulateAndGet(requestCount, IntMath::saturatedAdd);
203+
204+
if (operationFinishedEarly.get()) {
205+
return;
206+
}
207+
195208
if (flowControlIsDisabled) {
196209
// On request is only called when auto flow control is disabled. When auto flow control is
197210
// disabled, server latency is measured between onRequest and onResponse.
@@ -205,6 +218,10 @@ public void onRequest(int requestCount) {
205218

206219
@Override
207220
public void responseReceived() {
221+
if (operationFinishedEarly.get()) {
222+
return;
223+
}
224+
208225
if (firstResponsePerOpTimer.isRunning()) {
209226
firstResponsePerOpTimer.stop();
210227
}
@@ -226,6 +243,9 @@ public void responseReceived() {
226243
@Override
227244
public void afterResponse(long applicationLatency) {
228245
if (!flowControlIsDisabled || requestLeft.decrementAndGet() > 0) {
246+
if (operationFinishedEarly.get()) {
247+
return;
248+
}
229249
// When auto flow control is enabled, request will never be called, so server latency is
230250
// measured between after the last response is processed and before the next response is
231251
// received. If flow control is disabled but requestLeft is greater than 0,
@@ -272,10 +292,16 @@ public void disableFlowControl() {
272292
}
273293

274294
private void recordOperationCompletion(@Nullable Throwable status) {
295+
if (operationFinishedEarly.get()) {
296+
status = null; // force an ok
297+
}
298+
275299
if (!opFinished.compareAndSet(false, true)) {
276300
return;
277301
}
278-
operationTimer.stop();
302+
if (operationTimer.isRunning()) {
303+
operationTimer.stop();
304+
}
279305

280306
boolean isStreaming = operationType == OperationType.ServerStreaming;
281307
String statusStr = Util.extractStatus(status);
@@ -316,6 +342,9 @@ private void recordOperationCompletion(@Nullable Throwable status) {
316342
}
317343

318344
private void recordAttemptCompletion(@Nullable Throwable status) {
345+
if (operationFinishedEarly.get()) {
346+
status = null; // force an ok
347+
}
319348
// If the attempt failed, the time spent in retry should be counted in application latency.
320349
// Stop the stopwatch and decrement requestLeft.
321350
synchronized (timerLock) {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/CompositeTracer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,13 @@ public void close() {
6262
};
6363
}
6464

65+
@Override
66+
public void operationFinishEarly() {
67+
for (BigtableTracer tracer : bigtableTracers) {
68+
tracer.operationFinishEarly();
69+
}
70+
}
71+
6572
@Override
6673
public void operationSucceeded() {
6774
for (ApiTracer child : children) {

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/MetricsTracer.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ public void close() {}
8484
};
8585
}
8686

87+
@Override
88+
public void operationFinishEarly() {
89+
attemptTimer.stop();
90+
operationTimer.stop();
91+
}
92+
8793
@Override
8894
public void operationSucceeded() {
8995
recordOperationCompletion(null);
@@ -103,7 +109,11 @@ private void recordOperationCompletion(@Nullable Throwable throwable) {
103109
if (!opFinished.compareAndSet(false, true)) {
104110
return;
105111
}
106-
operationTimer.stop();
112+
113+
// Mightve stopped in operationFinishEarly()
114+
if (operationTimer.isRunning()) {
115+
operationTimer.stop();
116+
}
107117

108118
long elapsed = operationTimer.elapsed(TimeUnit.MILLISECONDS);
109119

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallableTest.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.gax.grpc.GrpcCallContext;
24-
import com.google.api.gax.rpc.InternalException;
2524
import com.google.api.gax.tracing.ApiTracerFactory;
2625
import com.google.api.gax.tracing.SpanName;
2726
import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracer;
@@ -88,18 +87,11 @@ public void testMultipleResponses() throws Exception {
8887
call.getController().getObserver().onResponse("first");
8988
call.getController().getObserver().onResponse("second");
9089

91-
Throwable e = Assert.assertThrows(ExecutionException.class, f::get).getCause();
92-
assertThat(e).isInstanceOf(InternalException.class);
93-
assertThat(e)
94-
.hasMessageThat()
95-
.contains(
96-
"Received multiple responses for a Fake.method unary operation. Previous: first, New: second");
97-
9890
ArgumentCaptor<String> msgCaptor = ArgumentCaptor.forClass(String.class);
9991
verify(callable.logger).log(Mockito.any(), msgCaptor.capture());
10092
assertThat(msgCaptor.getValue())
10193
.isEqualTo(
102-
"Received multiple responses for a Fake.method unary operation. Previous: first, New: second");
94+
"Received response after future is resolved for a Fake.method unary operation. previous: first, New response: second");
10395

10496
assertThat(call.getController().isCancelled()).isTrue();
10597
}

0 commit comments

Comments
 (0)