3131import com .google .cloud .bigquery .storage .v1 .StreamConnection .RequestCallback ;
3232import com .google .common .annotations .VisibleForTesting ;
3333import com .google .common .base .Preconditions ;
34+ import com .google .common .collect .ImmutableList ;
3435import com .google .common .util .concurrent .Uninterruptibles ;
3536import com .google .protobuf .Int64Value ;
3637import io .grpc .Status ;
4041import io .opentelemetry .api .common .Attributes ;
4142import io .opentelemetry .api .common .AttributesBuilder ;
4243import io .opentelemetry .api .metrics .LongCounter ;
44+ import io .opentelemetry .api .metrics .LongHistogram ;
4345import io .opentelemetry .api .metrics .Meter ;
4446import io .opentelemetry .api .metrics .MeterProvider ;
4547import java .io .IOException ;
@@ -265,6 +267,7 @@ class ConnectionWorker implements AutoCloseable {
265267 private static Pattern streamPatternTable = Pattern .compile (tableMatching );
266268 private Meter writeMeter ;
267269 static AttributeKey <String > telemetryKeyTableId = AttributeKey .stringKey ("table_id" );
270+ static AttributeKey <String > telemetryKeyWriterId = AttributeKey .stringKey ("writer_id" );
268271 private static String dataflowPrefix = "dataflow:" ;
269272 static List <AttributeKey <String >> telemetryKeysTraceId =
270273 new ArrayList <AttributeKey <String >>() {
@@ -274,10 +277,25 @@ class ConnectionWorker implements AutoCloseable {
274277 add (AttributeKey .stringKey ("trace_field_3" ));
275278 }
276279 };
280+ static AttributeKey <String > telemetryKeyErrorCode = AttributeKey .stringKey ("error_code" );
281+ static AttributeKey <String > telemetryKeyIsRetry = AttributeKey .stringKey ("is_retry" );
277282 private Attributes telemetryAttributes ;
278- private LongCounter instrumentIncomingRequestCount ;
279- private LongCounter instrumentIncomingRequestSize ;
280- private LongCounter instrumentIncomingRequestRows ;
283+ private static final List <Long > METRICS_MILLISECONDS_LATENCY_BUCKETS =
284+ ImmutableList .of (0L , 50L , 100L , 500L , 1000L , 5000L , 10000L , 20000L , 30000L , 60000L , 120000L );
285+
286+ private static final class OpenTelemetryMetrics {
287+ private LongCounter instrumentSentRequestCount ;
288+ private LongCounter instrumentSentRequestSize ;
289+ private LongCounter instrumentSentRequestRows ;
290+ private LongCounter instrumentAckedRequestCount ;
291+ private LongCounter instrumentAckedRequestSize ;
292+ private LongCounter instrumentAckedRequestRows ;
293+ private LongHistogram instrumentNetworkResponseLatency ;
294+ private LongCounter instrumentConnectionStartCount ;
295+ private LongCounter instrumentConnectionEndCount ;
296+ }
297+
298+ private OpenTelemetryMetrics telemetryMetrics = new OpenTelemetryMetrics ();
281299
282300 public static Boolean isDefaultStreamName (String streamName ) {
283301 Matcher matcher = DEFAULT_STREAM_PATTERN .matcher (streamName );
@@ -339,6 +357,7 @@ private Attributes buildOpenTelemetryAttributes() {
339357 if (!tableName .isEmpty ()) {
340358 builder .put (telemetryKeyTableId , tableName );
341359 }
360+ builder .put (telemetryKeyWriterId , writerId );
342361 setTraceIdAttributes (builder );
343362 return builder .build ();
344363 }
@@ -353,6 +372,20 @@ private void refreshOpenTelemetryTableNameAttributes() {
353372 }
354373 }
355374
375+ private Attributes augmentAttributesWithErrorCode (Attributes attributes , String errorCode ) {
376+ AttributesBuilder builder = attributes .toBuilder ();
377+ if ((errorCode != null ) && !errorCode .isEmpty ()) {
378+ builder .put (telemetryKeyErrorCode , errorCode );
379+ }
380+ return builder .build ();
381+ }
382+
383+ private Attributes augmentAttributesWithRetry (Attributes attributes ) {
384+ AttributesBuilder builder = attributes .toBuilder ();
385+ builder .put (telemetryKeyIsRetry , "1" );
386+ return builder .build ();
387+ }
388+
356389 @ VisibleForTesting
357390 Attributes getTelemetryAttributes () {
358391 return telemetryAttributes ;
@@ -366,20 +399,87 @@ private void registerOpenTelemetryMetrics() {
366399 .setInstrumentationVersion (
367400 ConnectionWorker .class .getPackage ().getImplementationVersion ())
368401 .build ();
369- instrumentIncomingRequestCount =
402+ telemetryMetrics .instrumentSentRequestCount =
403+ writeMeter
404+ .counterBuilder ("append_requests_sent" )
405+ .setDescription ("Counts number of requests sent over the network" )
406+ .build ();
407+ telemetryMetrics .instrumentSentRequestSize =
408+ writeMeter
409+ .counterBuilder ("append_request_bytes_sent" )
410+ .setDescription ("Counts byte size of requests sent over the network" )
411+ .build ();
412+ telemetryMetrics .instrumentSentRequestRows =
413+ writeMeter
414+ .counterBuilder ("append_rows_sent" )
415+ .setDescription ("Counts number of request rows sent over the network" )
416+ .build ();
417+ telemetryMetrics .instrumentAckedRequestCount =
370418 writeMeter
371- .counterBuilder ("append_requests " )
372- .setDescription ("Counts number of incoming requests" )
419+ .counterBuilder ("append_requests_acked " )
420+ .setDescription ("Counts number of requests acked by the server " )
373421 .build ();
374- instrumentIncomingRequestSize =
422+ telemetryMetrics . instrumentAckedRequestSize =
375423 writeMeter
376- .counterBuilder ("append_request_bytes " )
377- .setDescription ("Counts byte size of incoming requests" )
424+ .counterBuilder ("append_request_bytes_acked " )
425+ .setDescription ("Counts byte size of requests acked by the server " )
378426 .build ();
379- instrumentIncomingRequestRows =
427+ telemetryMetrics . instrumentAckedRequestRows =
380428 writeMeter
381- .counterBuilder ("append_rows" )
382- .setDescription ("Counts number of incoming request rows" )
429+ .counterBuilder ("append_rows_acked" )
430+ .setDescription ("Counts number of request rows acked by the server" )
431+ .build ();
432+ writeMeter
433+ .gaugeBuilder ("active_connection_count" )
434+ .ofLongs ()
435+ .setDescription ("Reports number of active connections" )
436+ .buildWithCallback (
437+ measurement -> {
438+ int count = 0 ;
439+ this .lock .lock ();
440+ try {
441+ if (streamConnectionIsConnected ) {
442+ count = 1 ;
443+ }
444+ } finally {
445+ this .lock .unlock ();
446+ }
447+ measurement .record (count , getTelemetryAttributes ());
448+ });
449+ writeMeter
450+ .gaugeBuilder ("inflight_queue_length" )
451+ .ofLongs ()
452+ .setDescription (
453+ "Reports length of inflight queue. This queue contains sent append requests waiting for response from the server." )
454+ .buildWithCallback (
455+ measurement -> {
456+ int length = 0 ;
457+ this .lock .lock ();
458+ try {
459+ length = inflightRequestQueue .size ();
460+ } finally {
461+ this .lock .unlock ();
462+ }
463+ measurement .record (length , getTelemetryAttributes ());
464+ });
465+ telemetryMetrics .instrumentNetworkResponseLatency =
466+ writeMeter
467+ .histogramBuilder ("network_response_latency" )
468+ .ofLongs ()
469+ .setDescription (
470+ "Reports time taken in milliseconds for a response to arrive once a message has been sent over the network." )
471+ .setExplicitBucketBoundariesAdvice (METRICS_MILLISECONDS_LATENCY_BUCKETS )
472+ .build ();
473+ telemetryMetrics .instrumentConnectionStartCount =
474+ writeMeter
475+ .counterBuilder ("connection_start_count" )
476+ .setDescription (
477+ "Counts number of connection attempts made, regardless of whether these are initial or retry." )
478+ .build ();
479+ telemetryMetrics .instrumentConnectionEndCount =
480+ writeMeter
481+ .counterBuilder ("connection_end_count" )
482+ .setDescription ("Counts number of connection end events." )
383483 .build ();
384484 }
385485
@@ -469,6 +569,7 @@ public void run() {
469569
470570 private void resetConnection () {
471571 log .info ("Start connecting stream: " + streamName + " id: " + writerId );
572+ telemetryMetrics .instrumentConnectionStartCount .add (1 , getTelemetryAttributes ());
472573 if (this .streamConnection != null ) {
473574 // It's safe to directly close the previous connection as the in flight messages
474575 // will be picked up by the next connection.
@@ -615,9 +716,6 @@ private ApiFuture<AppendRowsResponse> appendInternal(
615716 + requestWrapper .messageSize )));
616717 return requestWrapper .appendResult ;
617718 }
618- instrumentIncomingRequestCount .add (1 , getTelemetryAttributes ());
619- instrumentIncomingRequestSize .add (requestWrapper .messageSize , getTelemetryAttributes ());
620- instrumentIncomingRequestRows .add (message .getProtoRows ().getRows ().getSerializedRowsCount ());
621719 this .lock .lock ();
622720 try {
623721 if (userClosed ) {
@@ -876,7 +974,8 @@ private void appendLoop() {
876974 }
877975 while (!localQueue .isEmpty ()) {
878976 localQueue .peekFirst ().setRequestSendQueueTime ();
879- AppendRowsRequest originalRequest = localQueue .pollFirst ().message ;
977+ AppendRequestAndResponse requestWrapper = localQueue .pollFirst ();
978+ AppendRowsRequest originalRequest = requestWrapper .message ;
880979 AppendRowsRequest .Builder originalRequestBuilder = originalRequest .toBuilder ();
881980 // Always respect the first writer schema seen by the loop.
882981 if (writerSchema == null ) {
@@ -928,6 +1027,15 @@ private void appendLoop() {
9281027 // In the close case, the request is in the inflight queue, and will either be returned
9291028 // to the user with an error, or will be resent.
9301029 this .streamConnection .send (originalRequestBuilder .build ());
1030+ Attributes telemetryAttributes = getTelemetryAttributes ();
1031+ if (requestWrapper .retryCount > 0 ) {
1032+ telemetryAttributes = augmentAttributesWithRetry (telemetryAttributes );
1033+ }
1034+ telemetryMetrics .instrumentSentRequestCount .add (1 , telemetryAttributes );
1035+ telemetryMetrics .instrumentSentRequestSize .add (
1036+ requestWrapper .messageSize , telemetryAttributes );
1037+ telemetryMetrics .instrumentSentRequestRows .add (
1038+ originalRequest .getProtoRows ().getRows ().getSerializedRowsCount (), telemetryAttributes );
9311039 }
9321040 }
9331041 cleanupConnectionAndRequests (/* avoidBlocking= */ false );
@@ -1195,6 +1303,13 @@ private void requestCallback(AppendRowsResponse response) {
11951303 connectionRetryStartTime = 0 ;
11961304 }
11971305 if (!this .inflightRequestQueue .isEmpty ()) {
1306+ Instant sendInstant = inflightRequestQueue .getFirst ().requestSendTimeStamp ;
1307+ if (sendInstant != null ) {
1308+ Duration durationLatency = Duration .between (sendInstant , Instant .now ());
1309+ telemetryMetrics .instrumentNetworkResponseLatency .record (
1310+ durationLatency .toMillis (), getTelemetryAttributes ());
1311+ }
1312+
11981313 requestWrapper = pollFirstInflightRequestQueue ();
11991314 } else if (inflightCleanuped ) {
12001315 // It is possible when requestCallback is called, the inflight queue is already drained
@@ -1213,6 +1328,19 @@ private void requestCallback(AppendRowsResponse response) {
12131328 this .lock .unlock ();
12141329 }
12151330
1331+ Attributes augmentedTelemetryAttributes =
1332+ augmentAttributesWithErrorCode (
1333+ getTelemetryAttributes (),
1334+ Code .values ()[
1335+ response .hasError () ? response .getError ().getCode () : Status .Code .OK .ordinal ()]
1336+ .toString ());
1337+ telemetryMetrics .instrumentAckedRequestCount .add (1 , augmentedTelemetryAttributes );
1338+ telemetryMetrics .instrumentAckedRequestSize .add (
1339+ requestWrapper .messageSize , augmentedTelemetryAttributes );
1340+ telemetryMetrics .instrumentAckedRequestRows .add (
1341+ requestWrapper .message .getProtoRows ().getRows ().getSerializedRowsCount (),
1342+ augmentedTelemetryAttributes );
1343+
12161344 // Retries need to happen on the same thread as queue locking may occur
12171345 if (response .hasError ()) {
12181346 if (retryOnRetryableError (Code .values ()[response .getError ().getCode ()], requestWrapper )) {
@@ -1290,6 +1418,11 @@ private void doneCallback(Throwable finalStatus) {
12901418 this .lock .lock ();
12911419 try {
12921420 this .streamConnectionIsConnected = false ;
1421+ this .telemetryMetrics .instrumentConnectionEndCount .add (
1422+ 1 ,
1423+ augmentAttributesWithErrorCode (
1424+ getTelemetryAttributes (),
1425+ Code .values ()[Status .fromThrowable (finalStatus ).getCode ().ordinal ()].toString ()));
12931426 if (connectionFinalStatus == null ) {
12941427 if (connectionRetryStartTime == 0 ) {
12951428 connectionRetryStartTime = System .currentTimeMillis ();
@@ -1301,6 +1434,8 @@ private void doneCallback(Throwable finalStatus) {
13011434 || System .currentTimeMillis () - connectionRetryStartTime
13021435 <= maxRetryDuration .toMillis ())) {
13031436 this .conectionRetryCountWithoutCallback ++;
1437+ this .telemetryMetrics .instrumentConnectionStartCount .add (
1438+ 1 , augmentAttributesWithRetry (getTelemetryAttributes ()));
13041439 log .info (
13051440 "Connection is going to be reestablished with the next request. Retriable error "
13061441 + finalStatus .toString ()
0 commit comments