6666import com .google .cloud .bigtable .data .v2 .models .RowMutation ;
6767import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
6868import com .google .cloud .bigtable .data .v2 .stub .metrics .CompositeTracerFactory ;
69+ import com .google .cloud .bigtable .data .v2 .stub .metrics .HeaderTracerStreamingCallable ;
70+ import com .google .cloud .bigtable .data .v2 .stub .metrics .HeaderTracerUnaryCallable ;
6971import com .google .cloud .bigtable .data .v2 .stub .metrics .MetricsTracerFactory ;
7072import com .google .cloud .bigtable .data .v2 .stub .metrics .RpcMeasureConstants ;
7173import com .google .cloud .bigtable .data .v2 .stub .mutaterows .BulkMutateRowsUserFacingCallable ;
@@ -162,6 +164,15 @@ public static EnhancedBigtableStubSettings finalizeSettings(
162164 .build ());
163165 }
164166
167+ ImmutableMap <TagKey , TagValue > attributes =
168+ ImmutableMap .<TagKey , TagValue >builder ()
169+ .put (RpcMeasureConstants .BIGTABLE_PROJECT_ID , TagValue .create (settings .getProjectId ()))
170+ .put (
171+ RpcMeasureConstants .BIGTABLE_INSTANCE_ID , TagValue .create (settings .getInstanceId ()))
172+ .put (
173+ RpcMeasureConstants .BIGTABLE_APP_PROFILE_ID ,
174+ TagValue .create (settings .getAppProfileId ()))
175+ .build ();
165176 // Inject Opencensus instrumentation
166177 builder .setTracerFactory (
167178 new CompositeTracerFactory (
@@ -187,23 +198,17 @@ public static EnhancedBigtableStubSettings finalizeSettings(
187198 GaxProperties .getLibraryVersion (EnhancedBigtableStubSettings .class ))
188199 .build ()),
189200 // Add OpenCensus Metrics
190- MetricsTracerFactory .create (
191- tagger ,
192- stats ,
193- ImmutableMap .<TagKey , TagValue >builder ()
194- .put (
195- RpcMeasureConstants .BIGTABLE_PROJECT_ID ,
196- TagValue .create (settings .getProjectId ()))
197- .put (
198- RpcMeasureConstants .BIGTABLE_INSTANCE_ID ,
199- TagValue .create (settings .getInstanceId ()))
200- .put (
201- RpcMeasureConstants .BIGTABLE_APP_PROFILE_ID ,
202- TagValue .create (settings .getAppProfileId ()))
203- .build ()),
201+ MetricsTracerFactory .create (tagger , stats , attributes ),
204202 // Add user configured tracer
205203 settings .getTracerFactory ())));
206-
204+ builder .setHeaderTracer (
205+ builder
206+ .getHeaderTracer ()
207+ .toBuilder ()
208+ .setStats (stats )
209+ .setTagger (tagger )
210+ .setStatsAttributes (attributes )
211+ .build ());
207212 return builder .build ();
208213 }
209214
@@ -268,11 +273,10 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
268273 ServerStreamingCallable <Query , RowT > readRowsUserCallable =
269274 new ReadRowsUserCallable <>(readRowsCallable , requestContext );
270275
276+ SpanName span = getSpanName ("ReadRows" );
271277 ServerStreamingCallable <Query , RowT > traced =
272278 new TracedServerStreamingCallable <>(
273- readRowsUserCallable ,
274- clientContext .getTracerFactory (),
275- SpanName .of (CLIENT_NAME , "ReadRows" ));
279+ readRowsUserCallable , clientContext .getTracerFactory (), span );
276280
277281 return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
278282 }
@@ -315,6 +319,7 @@ public <RowT> UnaryCallable<Query, RowT> createReadRowCallable(RowAdapter<RowT>
315319 * <li>Upon receiving the response stream, it will merge the {@link
316320 * com.google.bigtable.v2.ReadRowsResponse.CellChunk}s in logical rows. The actual row
317321 * implementation can be configured by the {@code rowAdapter} parameter.
322+ * <li>Add header tracer for tracking GFE metrics.
318323 * <li>Retry/resume on failure.
319324 * <li>Filter out marker rows.
320325 * </ul>
@@ -356,10 +361,14 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
356361 ServerStreamingCallable <ReadRowsRequest , RowT > watched =
357362 Callables .watched (merging , innerSettings , clientContext );
358363
364+ ServerStreamingCallable <ReadRowsRequest , RowT > withHeaderTracer =
365+ new HeaderTracerStreamingCallable <>(
366+ watched , settings .getHeaderTracer (), getSpanName ("ReadRows" ).toString ());
367+
359368 // Retry logic is split into 2 parts to workaround a rare edge case described in
360369 // ReadRowsRetryCompletedCallable
361370 ServerStreamingCallable <ReadRowsRequest , RowT > retrying1 =
362- new ReadRowsRetryCompletedCallable <>(watched );
371+ new ReadRowsRetryCompletedCallable <>(withHeaderTracer );
363372
364373 ServerStreamingCallable <ReadRowsRequest , RowT > retrying2 =
365374 Callables .retrying (retrying1 , innerSettings , clientContext );
@@ -380,6 +389,8 @@ public Map<String, String> extract(ReadRowsRequest readRowsRequest) {
380389 * </ul>
381390 */
382391 private UnaryCallable <String , List <KeyOffset >> createSampleRowKeysCallable () {
392+ String methodName = "SampleRowKeys" ;
393+
383394 ServerStreamingCallable <SampleRowKeysRequest , SampleRowKeysResponse > base =
384395 GrpcRawCallableFactory .createServerStreamingCallable (
385396 GrpcCallSettings .<SampleRowKeysRequest , SampleRowKeysResponse >newBuilder ()
@@ -399,11 +410,15 @@ public Map<String, String> extract(
399410
400411 UnaryCallable <SampleRowKeysRequest , List <SampleRowKeysResponse >> spoolable = base .all ();
401412
413+ UnaryCallable <SampleRowKeysRequest , List <SampleRowKeysResponse >> withHeaderTracer =
414+ new HeaderTracerUnaryCallable <>(
415+ spoolable , settings .getHeaderTracer (), getSpanName (methodName ).toString ());
416+
402417 UnaryCallable <SampleRowKeysRequest , List <SampleRowKeysResponse >> retryable =
403- Callables .retrying (spoolable , settings .sampleRowKeysSettings (), clientContext );
418+ Callables .retrying (withHeaderTracer , settings .sampleRowKeysSettings (), clientContext );
404419
405420 return createUserFacingUnaryCallable (
406- "SampleRowKeys" , new SampleRowKeysCallable (retryable , requestContext ));
421+ methodName , new SampleRowKeysCallable (retryable , requestContext ));
407422 }
408423
409424 /**
@@ -415,6 +430,7 @@ public Map<String, String> extract(
415430 * </ul>
416431 */
417432 private UnaryCallable <RowMutation , Void > createMutateRowCallable () {
433+ String methodName = "MutateRow" ;
418434 UnaryCallable <MutateRowRequest , MutateRowResponse > base =
419435 GrpcRawCallableFactory .createUnaryCallable (
420436 GrpcCallSettings .<MutateRowRequest , MutateRowResponse >newBuilder ()
@@ -431,11 +447,15 @@ public Map<String, String> extract(MutateRowRequest mutateRowRequest) {
431447 .build (),
432448 settings .mutateRowSettings ().getRetryableCodes ());
433449
450+ UnaryCallable <MutateRowRequest , MutateRowResponse > withHeaderTracer =
451+ new HeaderTracerUnaryCallable <>(
452+ base , settings .getHeaderTracer (), getSpanName (methodName ).toString ());
453+
434454 UnaryCallable <MutateRowRequest , MutateRowResponse > retrying =
435- Callables .retrying (base , settings .mutateRowSettings (), clientContext );
455+ Callables .retrying (withHeaderTracer , settings .mutateRowSettings (), clientContext );
436456
437457 return createUserFacingUnaryCallable (
438- "MutateRow" , new MutateRowCallable (retrying , requestContext ));
458+ methodName , new MutateRowCallable (retrying , requestContext ));
439459 }
440460
441461 /**
@@ -459,11 +479,13 @@ private UnaryCallable<BulkMutation, Void> createBulkMutateRowsCallable() {
459479 UnaryCallable <BulkMutation , Void > userFacing =
460480 new BulkMutateRowsUserFacingCallable (baseCallable , requestContext );
461481
482+ SpanName spanName = getSpanName ("MutateRows" );
462483 UnaryCallable <BulkMutation , Void > traced =
463- new TracedUnaryCallable <>(
464- userFacing , clientContext .getTracerFactory (), SpanName .of (CLIENT_NAME , "MutateRows" ));
484+ new TracedUnaryCallable <>(userFacing , clientContext .getTracerFactory (), spanName );
485+ UnaryCallable <BulkMutation , Void > withHeaderTracer =
486+ new HeaderTracerUnaryCallable <>(traced , settings .getHeaderTracer (), spanName .toString ());
465487
466- return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
488+ return withHeaderTracer .withDefaultCallContext (clientContext .getDefaultCallContext ());
467489 }
468490
469491 /**
@@ -569,6 +591,7 @@ public Map<String, String> extract(MutateRowsRequest mutateRowsRequest) {
569591 * </ul>
570592 */
571593 private UnaryCallable <ConditionalRowMutation , Boolean > createCheckAndMutateRowCallable () {
594+ String methodName = "CheckAndMutateRow" ;
572595 UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > base =
573596 GrpcRawCallableFactory .createUnaryCallable (
574597 GrpcCallSettings .<CheckAndMutateRowRequest , CheckAndMutateRowResponse >newBuilder ()
@@ -586,11 +609,15 @@ public Map<String, String> extract(
586609 .build (),
587610 settings .checkAndMutateRowSettings ().getRetryableCodes ());
588611
612+ UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > withHeaderTracer =
613+ new HeaderTracerUnaryCallable <>(
614+ base , settings .getHeaderTracer (), getSpanName (methodName ).toString ());
615+
589616 UnaryCallable <CheckAndMutateRowRequest , CheckAndMutateRowResponse > retrying =
590- Callables .retrying (base , settings .checkAndMutateRowSettings (), clientContext );
617+ Callables .retrying (withHeaderTracer , settings .checkAndMutateRowSettings (), clientContext );
591618
592619 return createUserFacingUnaryCallable (
593- "CheckAndMutateRow" , new CheckAndMutateRowCallable (retrying , requestContext ));
620+ methodName , new CheckAndMutateRowCallable (retrying , requestContext ));
594621 }
595622
596623 /**
@@ -619,12 +646,16 @@ public Map<String, String> extract(ReadModifyWriteRowRequest request) {
619646 })
620647 .build (),
621648 settings .readModifyWriteRowSettings ().getRetryableCodes ());
649+ String methodName = "ReadModifyWriteRow" ;
650+ UnaryCallable <ReadModifyWriteRowRequest , ReadModifyWriteRowResponse > withHeaderTracer =
651+ new HeaderTracerUnaryCallable <>(
652+ base , settings .getHeaderTracer (), getSpanName (methodName ).toString ());
622653
623654 UnaryCallable <ReadModifyWriteRowRequest , ReadModifyWriteRowResponse > retrying =
624- Callables .retrying (base , settings .readModifyWriteRowSettings (), clientContext );
655+ Callables .retrying (withHeaderTracer , settings .readModifyWriteRowSettings (), clientContext );
625656
626657 return createUserFacingUnaryCallable (
627- "ReadModifyWriteRow" , new ReadModifyWriteRowCallable (retrying , requestContext ));
658+ methodName , new ReadModifyWriteRowCallable (retrying , requestContext ));
628659 }
629660
630661 /**
@@ -635,8 +666,7 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
635666 String methodName , UnaryCallable <RequestT , ResponseT > inner ) {
636667
637668 UnaryCallable <RequestT , ResponseT > traced =
638- new TracedUnaryCallable <>(
639- inner , clientContext .getTracerFactory (), SpanName .of (CLIENT_NAME , methodName ));
669+ new TracedUnaryCallable <>(inner , clientContext .getTracerFactory (), getSpanName (methodName ));
640670
641671 return traced .withDefaultCallContext (clientContext .getDefaultCallContext ());
642672 }
@@ -686,6 +716,10 @@ public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
686716 }
687717 // </editor-fold>
688718
719+ private SpanName getSpanName (String methodName ) {
720+ return SpanName .of (CLIENT_NAME , methodName );
721+ }
722+
689723 @ Override
690724 public void close () {
691725 for (BackgroundResource backgroundResource : clientContext .getBackgroundResources ()) {
0 commit comments