Skip to content

Commit 7d50f13

Browse files
committed
fix: Refine connecitivity metrics to capture RPCs with no response headers
1 parent 102cb4c commit 7d50f13

File tree

5 files changed

+110
-170
lines changed

5 files changed

+110
-170
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsRecorder.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,21 +99,23 @@ class BuiltInMetricsRecorder extends OpenTelemetryMetricsRecorder {
9999
void recordServerTimingHeaderMetrics(
100100
Float gfeLatency,
101101
Float afeLatency,
102-
Long gfeHeaderMissingCount,
103-
Long afeHeaderMissingCount,
104-
Map<String, String> attributes) {
102+
Map<String, String> attributes,
103+
boolean isDirectPathUsed,
104+
boolean isAfeEnabled) {
105105
io.opentelemetry.api.common.Attributes otelAttributes = toOtelAttributes(attributes);
106-
if (gfeLatency != null) {
107-
gfeLatencyRecorder.record(gfeLatency, otelAttributes);
106+
if (!isDirectPathUsed) {
107+
if (gfeLatency != null) {
108+
gfeLatencyRecorder.record(gfeLatency, otelAttributes);
109+
} else {
110+
gfeHeaderMissingCountRecorder.add(1, otelAttributes);
111+
}
108112
}
109-
if (gfeHeaderMissingCount > 0) {
110-
gfeHeaderMissingCountRecorder.add(gfeHeaderMissingCount, otelAttributes);
111-
}
112-
if (afeLatency != null) {
113-
afeLatencyRecorder.record(afeLatency, otelAttributes);
114-
}
115-
if (afeHeaderMissingCount > 0) {
116-
afeHeaderMissingCountRecorder.add(afeHeaderMissingCount, otelAttributes);
113+
if (isAfeEnabled) {
114+
if (afeLatency != null) {
115+
afeLatencyRecorder.record(afeLatency, otelAttributes);
116+
} else {
117+
afeHeaderMissingCountRecorder.add(1, otelAttributes);
118+
}
117119
}
118120
}
119121

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInMetricsTracer.java

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,10 @@ class BuiltInMetricsTracer extends MetricsTracer implements ApiTracer {
3939
private final Map<String, String> attributes = new HashMap<>();
4040
private Float gfeLatency = null;
4141
private Float afeLatency = null;
42-
private TraceWrapper traceWrapper;
43-
private long gfeHeaderMissingCount = 0;
44-
private long afeHeaderMissingCount = 0;
42+
private final TraceWrapper traceWrapper;
4543
private final ISpan currentSpan;
44+
private boolean isDirectPathUsed;
45+
private boolean isAfeEnabled;
4646

4747
BuiltInMetricsTracer(
4848
MethodName methodName,
@@ -66,7 +66,7 @@ public void attemptSucceeded() {
6666
super.attemptSucceeded();
6767
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.OK.toString());
6868
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
69-
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
69+
gfeLatency, afeLatency, attributes, isDirectPathUsed, isAfeEnabled);
7070
}
7171
}
7272

@@ -80,7 +80,7 @@ public void attemptCancelled() {
8080
super.attemptCancelled();
8181
attributes.put(STATUS_ATTRIBUTE, StatusCode.Code.CANCELLED.toString());
8282
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
83-
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
83+
gfeLatency, afeLatency, attributes, isDirectPathUsed, isAfeEnabled);
8484
}
8585
}
8686

@@ -98,7 +98,7 @@ public void attemptFailedDuration(Throwable error, java.time.Duration delay) {
9898
super.attemptFailedDuration(error, delay);
9999
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
100100
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
101-
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
101+
gfeLatency, afeLatency, attributes, isDirectPathUsed, isAfeEnabled);
102102
}
103103
}
104104

@@ -115,7 +115,7 @@ public void attemptFailedRetriesExhausted(Throwable error) {
115115
super.attemptFailedRetriesExhausted(error);
116116
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
117117
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
118-
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
118+
gfeLatency, afeLatency, attributes, isDirectPathUsed, isAfeEnabled);
119119
}
120120
}
121121

@@ -132,24 +132,16 @@ public void attemptPermanentFailure(Throwable error) {
132132
super.attemptPermanentFailure(error);
133133
attributes.put(STATUS_ATTRIBUTE, extractStatus(error));
134134
builtInOpenTelemetryMetricsRecorder.recordServerTimingHeaderMetrics(
135-
gfeLatency, afeLatency, gfeHeaderMissingCount, afeHeaderMissingCount, attributes);
135+
gfeLatency, afeLatency, attributes, isDirectPathUsed, isAfeEnabled);
136136
}
137137
}
138138

139-
void recordGFELatency(Float gfeLatency) {
139+
public void recordServerTimingHeaderMetrics(
140+
Float gfeLatency, Float afeLatency, boolean isDirectPathUsed, boolean isAfeEnabled) {
140141
this.gfeLatency = gfeLatency;
141-
}
142-
143-
void recordAFELatency(Float afeLatency) {
142+
this.isDirectPathUsed = isDirectPathUsed;
144143
this.afeLatency = afeLatency;
145-
}
146-
147-
void recordGfeHeaderMissingCount(Long value) {
148-
this.gfeHeaderMissingCount = value;
149-
}
150-
151-
void recordAfeHeaderMissingCount(Long value) {
152-
this.afeHeaderMissingCount = value;
144+
this.isAfeEnabled = isAfeEnabled;
153145
}
154146

155147
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java

Lines changed: 5 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -191,50 +191,13 @@ public void addAttributes(Map<String, String> attributes) {
191191
}
192192
}
193193

194-
public void recordGFELatency(Long gfeLatency) {
194+
public void recordServerTimingHeaderMetrics(
195+
Float gfeLatency, Float afeLatency, boolean isDirectPathUsed, boolean isAfeEnabled) {
195196
for (ApiTracer child : children) {
196197
if (child instanceof BuiltInMetricsTracer) {
197-
((BuiltInMetricsTracer) child).recordGFELatency(Float.valueOf(gfeLatency));
198-
}
199-
}
200-
}
201-
202-
public void recordGfeHeaderMissingCount(Long value) {
203-
for (ApiTracer child : children) {
204-
if (child instanceof BuiltInMetricsTracer) {
205-
((BuiltInMetricsTracer) child).recordGfeHeaderMissingCount(value);
206-
}
207-
}
208-
}
209-
210-
public void recordAFELatency(Long afeLatency) {
211-
for (ApiTracer child : children) {
212-
if (child instanceof BuiltInMetricsTracer) {
213-
((BuiltInMetricsTracer) child).recordAFELatency(Float.valueOf(afeLatency));
214-
}
215-
}
216-
}
217-
218-
public void recordAfeHeaderMissingCount(Long value) {
219-
for (ApiTracer child : children) {
220-
if (child instanceof BuiltInMetricsTracer) {
221-
((BuiltInMetricsTracer) child).recordAfeHeaderMissingCount(value);
222-
}
223-
}
224-
}
225-
226-
public void recordGFELatency(Float gfeLatency) {
227-
for (ApiTracer child : children) {
228-
if (child instanceof BuiltInMetricsTracer) {
229-
((BuiltInMetricsTracer) child).recordGFELatency(gfeLatency);
230-
}
231-
}
232-
}
233-
234-
public void recordAFELatency(Float afeLatency) {
235-
for (ApiTracer child : children) {
236-
if (child instanceof BuiltInMetricsTracer) {
237-
((BuiltInMetricsTracer) child).recordAFELatency(afeLatency);
198+
((BuiltInMetricsTracer) child)
199+
.recordServerTimingHeaderMetrics(
200+
gfeLatency, afeLatency, isDirectPathUsed, isAfeEnabled);
238201
}
239202
}
240203
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/HeaderInterceptor.java

Lines changed: 67 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,9 @@
2828
import com.google.common.cache.Cache;
2929
import com.google.common.cache.CacheBuilder;
3030
import com.google.spanner.admin.database.v1.DatabaseName;
31-
import io.grpc.CallOptions;
32-
import io.grpc.Channel;
33-
import io.grpc.ClientCall;
34-
import io.grpc.ClientInterceptor;
31+
import io.grpc.*;
3532
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
3633
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
37-
import io.grpc.Metadata;
38-
import io.grpc.MethodDescriptor;
3934
import io.grpc.alts.AltsContextUtil;
4035
import io.opencensus.stats.MeasureMap;
4136
import io.opencensus.stats.Stats;
@@ -91,6 +86,8 @@ class HeaderInterceptor implements ClientInterceptor {
9186
private static final Logger LOGGER = Logger.getLogger(HeaderInterceptor.class.getName());
9287
private static final Level LEVEL = Level.INFO;
9388
private final SpannerRpcMetrics spannerRpcMetrics;
89+
private Float gfeLatency;
90+
private Float afeLatency;
9491

9592
HeaderInterceptor(SpannerRpcMetrics spannerRpcMetrics) {
9693
this.spannerRpcMetrics = spannerRpcMetrics;
@@ -113,24 +110,46 @@ public void start(Listener<RespT> responseListener, Metadata headers) {
113110
TagContext tagContext = getTagContext(key, method.getFullMethodName(), databaseName);
114111
Attributes attributes =
115112
getMetricAttributes(key, method.getFullMethodName(), databaseName);
116-
Map<String, String> builtInMetricsAttributes =
117-
getBuiltInMetricAttributes(key, databaseName);
118-
builtInMetricsAttributes.put(BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), requestId);
119-
addBuiltInMetricAttributes(compositeTracer, builtInMetricsAttributes);
120-
if (span != null) {
121-
span.setAttribute(XGoogSpannerRequestId.REQUEST_ID, requestId);
122-
}
113+
123114
super.start(
124115
new SimpleForwardingClientCallListener<RespT>(responseListener) {
125116
@Override
126117
public void onHeaders(Metadata metadata) {
127-
// Check if the call uses DirectPath by inspecting the ALTS context.
128-
boolean isDirectPathUsed = AltsContextUtil.check(getAttributes());
129-
addDirectPathUsedAttribute(compositeTracer, isDirectPathUsed);
130-
processHeader(
131-
metadata, tagContext, attributes, span, compositeTracer, isDirectPathUsed);
118+
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
119+
try {
120+
// Get gfe and afe Latency value
121+
Map<String, Float> serverTimingMetrics = parseServerTimingHeader(serverTiming);
122+
gfeLatency = serverTimingMetrics.get(GFE_TIMING_HEADER);
123+
afeLatency = serverTimingMetrics.get(AFE_TIMING_HEADER);
124+
} catch (NumberFormatException e) {
125+
LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming);
126+
}
127+
132128
super.onHeaders(metadata);
133129
}
130+
131+
@Override
132+
public void onClose(Status status, Metadata trailers) {
133+
// Record Built-in Metrics
134+
boolean isDirectPathUsed = AltsContextUtil.check(getAttributes());
135+
boolean isAfeEnabled = GapicSpannerRpc.isEnableAFEServerTiming();
136+
recordSpan(span, requestId);
137+
recordCustomMetrics(tagContext, attributes, isDirectPathUsed);
138+
Map<String, String> builtInMetricsAttributes = new HashMap<>();
139+
try {
140+
builtInMetricsAttributes = getBuiltInMetricAttributes(key, databaseName);
141+
} catch (ExecutionException e) {
142+
LOGGER.log(
143+
LEVEL, "Unable to get built-in metric attributes {}", e.getMessage());
144+
}
145+
recordBuiltInMetrics(
146+
compositeTracer,
147+
builtInMetricsAttributes,
148+
requestId,
149+
isDirectPathUsed,
150+
isAfeEnabled);
151+
super.onClose(status, trailers);
152+
}
134153
},
135154
headers);
136155
} catch (ExecutionException executionException) {
@@ -141,29 +160,12 @@ public void onHeaders(Metadata metadata) {
141160
};
142161
}
143162

144-
private void processHeader(
145-
Metadata metadata,
146-
TagContext tagContext,
147-
Attributes attributes,
148-
Span span,
149-
CompositeTracer compositeTracer,
150-
boolean isDirectPathUsed) {
163+
private void recordCustomMetrics(
164+
TagContext tagContext, Attributes attributes, Boolean isDirectPathUsed) {
165+
// Record OpenCensus and Custom OpenTelemetry Metrics
151166
MeasureMap measureMap = STATS_RECORDER.newMeasureMap();
152-
String serverTiming = metadata.get(SERVER_TIMING_HEADER_KEY);
153-
try {
154-
// Previous implementation parsed the GFE latency directly using:
155-
// long latency = Long.parseLong(serverTiming.substring("gfet4t7; dur=".length()));
156-
// This approach assumed the serverTiming header contained exactly one metric "gfet4t7".
157-
// If additional metrics were introduced in the header, older versions of the library
158-
// would fail to parse it correctly. To make the parsing more robust, the logic has been
159-
// updated to handle multiple metrics gracefully.
160-
161-
Map<String, Float> serverTimingMetrics = parseServerTimingHeader(serverTiming);
162-
Float gfeLatency = serverTimingMetrics.get(GFE_TIMING_HEADER);
163-
boolean isAfeEnabled = GapicSpannerRpc.isEnableAFEServerTiming();
164-
Float afeLatency = isAfeEnabled ? serverTimingMetrics.get(AFE_TIMING_HEADER) : null;
165167

166-
// Record OpenCensus and Custom OpenTelemetry Metrics
168+
if (!isDirectPathUsed) {
167169
if (gfeLatency != null) {
168170
long gfeVal = gfeLatency.longValue();
169171
measureMap.put(SPANNER_GFE_LATENCY, gfeVal);
@@ -174,39 +176,35 @@ private void processHeader(
174176
measureMap.put(SPANNER_GFE_HEADER_MISSING_COUNT, 1L);
175177
spannerRpcMetrics.recordGfeHeaderMissingCount(1L, attributes);
176178
}
177-
measureMap.record(tagContext);
179+
}
180+
measureMap.record(tagContext);
181+
}
178182

179-
// Record Built-in Metrics
180-
if (compositeTracer != null) {
181-
// GFE Latency Metrics
182-
if (!isDirectPathUsed) {
183-
if (gfeLatency != null) {
184-
compositeTracer.recordGFELatency(gfeLatency);
185-
} else {
186-
compositeTracer.recordGfeHeaderMissingCount(1L);
187-
}
188-
}
189-
// AFE Tracing
190-
if (isAfeEnabled) {
191-
if (afeLatency != null) {
192-
compositeTracer.recordAFELatency(afeLatency);
193-
} else {
194-
compositeTracer.recordAfeHeaderMissingCount(1L);
195-
}
196-
}
183+
private void recordSpan(Span span, String requestId) {
184+
if (span != null) {
185+
if (gfeLatency != null) {
186+
span.setAttribute("gfe_latency", gfeLatency.toString());
197187
}
198-
199-
// Record Span Attributes
200-
if (span != null) {
201-
if (gfeLatency != null) {
202-
span.setAttribute("gfe_latency", gfeLatency.toString());
203-
}
204-
if (afeLatency != null) {
205-
span.setAttribute("afe_latency", afeLatency.toString());
206-
}
188+
if (afeLatency != null) {
189+
span.setAttribute("afe_latency", afeLatency.toString());
207190
}
208-
} catch (NumberFormatException e) {
209-
LOGGER.log(LEVEL, "Invalid server-timing object in header: {}", serverTiming);
191+
span.setAttribute(XGoogSpannerRequestId.REQUEST_ID, requestId);
192+
}
193+
}
194+
195+
private void recordBuiltInMetrics(
196+
CompositeTracer compositeTracer,
197+
Map<String, String> builtInMetricsAttributes,
198+
String requestId,
199+
Boolean isDirectPathUsed,
200+
Boolean isAfeEnabled) {
201+
if (compositeTracer != null) {
202+
builtInMetricsAttributes.put(BuiltInMetricsConstant.REQUEST_ID_KEY.getKey(), requestId);
203+
builtInMetricsAttributes.put(
204+
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
205+
compositeTracer.addAttributes(builtInMetricsAttributes);
206+
compositeTracer.recordServerTimingHeaderMetrics(
207+
gfeLatency, afeLatency, isDirectPathUsed, isAfeEnabled);
210208
}
211209
}
212210

@@ -309,19 +307,4 @@ private Map<String, String> getBuiltInMetricAttributes(String key, DatabaseName
309307
return attributes;
310308
});
311309
}
312-
313-
private void addBuiltInMetricAttributes(
314-
CompositeTracer compositeTracer, Map<String, String> builtInMetricsAttributes) {
315-
if (compositeTracer != null) {
316-
compositeTracer.addAttributes(builtInMetricsAttributes);
317-
}
318-
}
319-
320-
private void addDirectPathUsedAttribute(
321-
CompositeTracer compositeTracer, Boolean isDirectPathUsed) {
322-
if (compositeTracer != null) {
323-
compositeTracer.addAttributes(
324-
BuiltInMetricsConstant.DIRECT_PATH_USED_KEY.getKey(), Boolean.toString(isDirectPathUsed));
325-
}
326-
}
327310
}

0 commit comments

Comments
 (0)