6868import com .google .common .base .Function ;
6969import com .google .common .base .MoreObjects ;
7070import com .google .common .base .Preconditions ;
71- import com .google .common .base .Supplier ;
7271import com .google .common .collect .ImmutableList ;
7372import com .google .common .util .concurrent .ForwardingListenableFuture ;
7473import com .google .common .util .concurrent .ForwardingListenableFuture .SimpleForwardingListenableFuture ;
@@ -156,7 +155,8 @@ void maybeWaitOnMinSessions() {
156155 }
157156 }
158157
159- private abstract static class CachedResultSetSupplier implements Supplier <ResultSet > {
158+ private abstract static class CachedResultSetSupplier
159+ implements com .google .common .base .Supplier <ResultSet > {
160160
161161 private ResultSet cached ;
162162
@@ -2265,7 +2265,6 @@ public String getName() {
22652265 @ Override
22662266 public void close () {
22672267 synchronized (lock ) {
2268- numMultiplexedSessionsReleased ++;
22692268 if (lastException != null && isDatabaseOrInstanceNotFound (lastException )) {
22702269 SessionPool .this .resourceNotFoundException =
22712270 MoreObjects .firstNonNull (
@@ -2771,15 +2770,9 @@ enum Position {
27712770 @ GuardedBy ("lock" )
27722771 private long numSessionsAcquired = 0 ;
27732772
2774- @ GuardedBy ("lock" )
2775- private long numMultiplexedSessionsAcquired = 0 ;
2776-
27772773 @ GuardedBy ("lock" )
27782774 private long numSessionsReleased = 0 ;
27792775
2780- @ GuardedBy ("lock" )
2781- private long numMultiplexedSessionsReleased = 0 ;
2782-
27832776 @ GuardedBy ("lock" )
27842777 private long numIdleSessionsRemoved = 0 ;
27852778
@@ -2830,7 +2823,9 @@ static SessionPool createPool(
28302823 SessionClient sessionClient ,
28312824 TraceWrapper tracer ,
28322825 List <LabelValue > labelValues ,
2833- Attributes attributes ) {
2826+ Attributes attributes ,
2827+ AtomicLong numMultiplexedSessionsAcquired ,
2828+ AtomicLong numMultiplexedSessionsReleased ) {
28342829 final SessionPoolOptions sessionPoolOptions = spannerOptions .getSessionPoolOptions ();
28352830
28362831 // A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
@@ -2846,7 +2841,9 @@ static SessionPool createPool(
28462841 tracer ,
28472842 labelValues ,
28482843 spannerOptions .getOpenTelemetry (),
2849- attributes );
2844+ attributes ,
2845+ numMultiplexedSessionsAcquired ,
2846+ numMultiplexedSessionsReleased );
28502847 }
28512848
28522849 static SessionPool createPool (
@@ -2884,7 +2881,9 @@ static SessionPool createPool(
28842881 tracer ,
28852882 SPANNER_DEFAULT_LABEL_VALUES ,
28862883 openTelemetry ,
2887- null );
2884+ null ,
2885+ new AtomicLong (),
2886+ new AtomicLong ());
28882887 }
28892888
28902889 static SessionPool createPool (
@@ -2898,7 +2897,9 @@ static SessionPool createPool(
28982897 TraceWrapper tracer ,
28992898 List <LabelValue > labelValues ,
29002899 OpenTelemetry openTelemetry ,
2901- Attributes attributes ) {
2900+ Attributes attributes ,
2901+ AtomicLong numMultiplexedSessionsAcquired ,
2902+ AtomicLong numMultiplexedSessionsReleased ) {
29022903 SessionPool pool =
29032904 new SessionPool (
29042905 poolOptions ,
@@ -2912,7 +2913,9 @@ static SessionPool createPool(
29122913 tracer ,
29132914 labelValues ,
29142915 openTelemetry ,
2915- attributes );
2916+ attributes ,
2917+ numMultiplexedSessionsAcquired ,
2918+ numMultiplexedSessionsReleased );
29162919 pool .initPool ();
29172920 return pool ;
29182921 }
@@ -2929,7 +2932,9 @@ private SessionPool(
29292932 TraceWrapper tracer ,
29302933 List <LabelValue > labelValues ,
29312934 OpenTelemetry openTelemetry ,
2932- Attributes attributes ) {
2935+ Attributes attributes ,
2936+ AtomicLong numMultiplexedSessionsAcquired ,
2937+ AtomicLong numMultiplexedSessionsReleased ) {
29332938 this .options = options ;
29342939 this .databaseRole = databaseRole ;
29352940 this .executorFactory = executorFactory ;
@@ -2940,8 +2945,13 @@ private SessionPool(
29402945 this .initialReleasePosition = initialReleasePosition ;
29412946 this .poolMaintainer = new PoolMaintainer ();
29422947 this .tracer = tracer ;
2943- this .initOpenCensusMetricsCollection (metricRegistry , labelValues );
2944- this .initOpenTelemetryMetricsCollection (openTelemetry , attributes );
2948+ this .initOpenCensusMetricsCollection (
2949+ metricRegistry ,
2950+ labelValues ,
2951+ numMultiplexedSessionsAcquired ,
2952+ numMultiplexedSessionsReleased );
2953+ this .initOpenTelemetryMetricsCollection (
2954+ openTelemetry , attributes , numMultiplexedSessionsAcquired , numMultiplexedSessionsReleased );
29452955 this .waitOnMinSessionsLatch =
29462956 options .getMinSessions () > 0 ? new CountDownLatch (1 ) : new CountDownLatch (0 );
29472957 this .waitOnMultiplexedSessionsLatch = new CountDownLatch (1 );
@@ -3143,7 +3153,7 @@ boolean isValid() {
31433153
31443154 /**
31453155 * Returns a multiplexed session. The method fallbacks to a regular session if {@link
3146- * SessionPoolOptions#useMultiplexedSession } is not set.
3156+ * SessionPoolOptions#getUseMultiplexedSession } is not set.
31473157 */
31483158 SessionFutureWrapper getMultiplexedSessionWithFallback () throws SpannerException {
31493159 if (useMultiplexedSessions ()) {
@@ -3250,8 +3260,6 @@ private void incrementNumSessionsInUse(boolean isMultiplexed) {
32503260 maxSessionsInUse = numSessionsInUse ;
32513261 }
32523262 numSessionsAcquired ++;
3253- } else {
3254- numMultiplexedSessionsAcquired ++;
32553263 }
32563264 }
32573265 }
@@ -3775,7 +3783,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
37753783 * exporter, it allows users to monitor client behavior.
37763784 */
37773785 private void initOpenCensusMetricsCollection (
3778- MetricRegistry metricRegistry , List <LabelValue > labelValues ) {
3786+ MetricRegistry metricRegistry ,
3787+ List <LabelValue > labelValues ,
3788+ AtomicLong numMultiplexedSessionsAcquired ,
3789+ AtomicLong numMultiplexedSessionsReleased ) {
37793790 if (!SpannerOptions .isEnabledOpenCensusMetrics ()) {
37803791 return ;
37813792 }
@@ -3860,18 +3871,14 @@ private void initOpenCensusMetricsCollection(
38603871 labelValuesWithRegularSessions , this , sessionPool -> sessionPool .numSessionsAcquired );
38613872 numAcquiredSessionsMetric .removeTimeSeries (labelValuesWithMultiplexedSessions );
38623873 numAcquiredSessionsMetric .createTimeSeries (
3863- labelValuesWithMultiplexedSessions ,
3864- this ,
3865- sessionPool -> sessionPool .numMultiplexedSessionsAcquired );
3874+ labelValuesWithMultiplexedSessions , this , unused -> numMultiplexedSessionsAcquired .get ());
38663875
38673876 numReleasedSessionsMetric .removeTimeSeries (labelValuesWithRegularSessions );
38683877 numReleasedSessionsMetric .createTimeSeries (
38693878 labelValuesWithRegularSessions , this , sessionPool -> sessionPool .numSessionsReleased );
38703879 numReleasedSessionsMetric .removeTimeSeries (labelValuesWithMultiplexedSessions );
38713880 numReleasedSessionsMetric .createTimeSeries (
3872- labelValuesWithMultiplexedSessions ,
3873- this ,
3874- sessionPool -> sessionPool .numMultiplexedSessionsReleased );
3881+ labelValuesWithMultiplexedSessions , this , unused -> numMultiplexedSessionsReleased .get ());
38753882
38763883 List <LabelValue > labelValuesWithBeingPreparedType = new ArrayList <>(labelValues );
38773884 labelValuesWithBeingPreparedType .add (NUM_SESSIONS_BEING_PREPARED );
@@ -3909,7 +3916,10 @@ private void initOpenCensusMetricsCollection(
39093916 * an exporter, it allows users to monitor client behavior.
39103917 */
39113918 private void initOpenTelemetryMetricsCollection (
3912- OpenTelemetry openTelemetry , Attributes attributes ) {
3919+ OpenTelemetry openTelemetry ,
3920+ Attributes attributes ,
3921+ AtomicLong numMultiplexedSessionsAcquired ,
3922+ AtomicLong numMultiplexedSessionsReleased ) {
39133923 if (openTelemetry == null || !SpannerOptions .isEnabledOpenTelemetryMetrics ()) {
39143924 return ;
39153925 }
@@ -3981,7 +3991,8 @@ private void initOpenTelemetryMetricsCollection(
39813991 .buildWithCallback (
39823992 measurement -> {
39833993 measurement .record (this .numSessionsAcquired , attributesRegularSession );
3984- measurement .record (this .numMultiplexedSessionsAcquired , attributesMultiplexedSession );
3994+ measurement .record (
3995+ numMultiplexedSessionsAcquired .get (), attributesMultiplexedSession );
39853996 });
39863997
39873998 meter
@@ -3991,7 +4002,8 @@ private void initOpenTelemetryMetricsCollection(
39914002 .buildWithCallback (
39924003 measurement -> {
39934004 measurement .record (this .numSessionsReleased , attributesRegularSession );
3994- measurement .record (this .numMultiplexedSessionsReleased , attributesMultiplexedSession );
4005+ measurement .record (
4006+ numMultiplexedSessionsReleased .get (), attributesMultiplexedSession );
39954007 });
39964008 }
39974009}
0 commit comments