104104import java .util .concurrent .atomic .AtomicBoolean ;
105105import java .util .concurrent .atomic .AtomicInteger ;
106106import java .util .concurrent .atomic .AtomicLong ;
107+ import java .util .concurrent .atomic .AtomicReference ;
107108import java .util .logging .Level ;
108109import java .util .logging .Logger ;
109110import javax .annotation .Nullable ;
@@ -146,6 +147,7 @@ void maybeWaitOnMinSessions() {
146147 }
147148
148149 private abstract static class CachedResultSetSupplier implements Supplier <ResultSet > {
150+
149151 private ResultSet cached ;
150152
151153 abstract ResultSet load ();
@@ -1155,6 +1157,46 @@ private PooledSessionFuture createPooledSessionFuture(
11551157 return new PooledSessionFuture (future , span );
11561158 }
11571159
1160+ /** Wrapper class for the {@link SessionFuture} implementations. */
1161+ interface SessionFutureWrapper <T extends SessionFuture > {
1162+
1163+ /** Method to resolve {@link SessionFuture} implementation for different use-cases. */
1164+ T get ();
1165+ }
1166+
1167+ class PooledSessionFutureWrapper implements SessionFutureWrapper <PooledSessionFuture > {
1168+ PooledSessionFuture pooledSessionFuture ;
1169+
1170+ public PooledSessionFutureWrapper (PooledSessionFuture pooledSessionFuture ) {
1171+ this .pooledSessionFuture = pooledSessionFuture ;
1172+ }
1173+
1174+ @ Override
1175+ public PooledSessionFuture get () {
1176+ return this .pooledSessionFuture ;
1177+ }
1178+ }
1179+
1180+ class MultiplexedSessionFutureWrapper implements SessionFutureWrapper <MultiplexedSessionFuture > {
1181+ SettableApiFuture <MultiplexedSessionFuture > multiplexedSessionSettableApiFuture ;
1182+
1183+ public MultiplexedSessionFutureWrapper (
1184+ SettableApiFuture <MultiplexedSessionFuture > multiplexedSessionSettableApiFuture ) {
1185+ this .multiplexedSessionSettableApiFuture = multiplexedSessionSettableApiFuture ;
1186+ }
1187+
1188+ @ Override
1189+ public MultiplexedSessionFuture get () {
1190+ try {
1191+ return this .multiplexedSessionSettableApiFuture .get ();
1192+ } catch (InterruptedException interruptedException ) {
1193+ throw SpannerExceptionFactory .propagateInterrupt (interruptedException );
1194+ } catch (ExecutionException executionException ) {
1195+ throw SpannerExceptionFactory .asSpannerException (executionException .getCause ());
1196+ }
1197+ }
1198+ }
1199+
11581200 interface SessionFuture extends Session {
11591201
11601202 /**
@@ -1435,12 +1477,9 @@ PooledSession get(final boolean eligibleForLongRunning) {
14351477
14361478 class MultiplexedSessionFuture extends SimpleForwardingListenableFuture <MultiplexedSession >
14371479 implements SessionFuture {
1438- private final ISpan span ;
1439-
14401480 @ VisibleForTesting
1441- MultiplexedSessionFuture (ListenableFuture <MultiplexedSession > delegate , ISpan span ) {
1481+ MultiplexedSessionFuture (ListenableFuture <MultiplexedSession > delegate ) {
14421482 super (delegate );
1443- this .span = span ;
14441483 }
14451484
14461485 @ Override
@@ -1645,15 +1684,6 @@ private MultiplexedSession getOrNull() {
16451684
16461685 @ Override
16471686 public MultiplexedSession get () {
1648- MultiplexedSession res = null ;
1649- try {
1650- res = super .get ();
1651- } catch (Throwable e ) {
1652- // ignore the exception as it will be handled by the call to super.get() below.
1653- }
1654- if (res != null ) {
1655- res .markBusy (span );
1656- }
16571687 try {
16581688 return super .get ();
16591689 } catch (ExecutionException e ) {
@@ -2231,6 +2261,9 @@ private PooledSession pollUninterruptiblyWithTimeout(
22312261 */
22322262 final class PoolMaintainer {
22332263
2264+ // Delay post which the maintainer will retry creating/replacing the current multiplexed session
2265+ private final Duration multiplexedSessionCreationRetryDelay = Duration .ofMinutes (10 );
2266+
22342267 // Length of the window in millis over which we keep track of maximum number of concurrent
22352268 // sessions in use.
22362269 private final Duration windowLength = Duration .ofMillis (TimeUnit .MINUTES .toMillis (10 ));
@@ -2254,6 +2287,8 @@ final class PoolMaintainer {
22542287 */
22552288 @ VisibleForTesting Instant lastExecutionTime ;
22562289
2290+ @ VisibleForTesting Instant multiplexedSessionReplacementAttemptTime ;
2291+
22572292 /**
22582293 * The previous numSessionsAcquired seen by the maintainer. This is used to calculate the
22592294 * transactions per second, which again is used to determine whether to randomize the order of
@@ -2271,6 +2306,8 @@ final class PoolMaintainer {
22712306
22722307 void init () {
22732308 lastExecutionTime = clock .instant ();
2309+ multiplexedSessionReplacementAttemptTime = clock .instant ();
2310+
22742311 // Scheduled pool maintenance worker.
22752312 synchronized (lock ) {
22762313 scheduledFuture =
@@ -2312,6 +2349,7 @@ void maintainPool() {
23122349 this .prevNumSessionsAcquired = SessionPool .this .numSessionsAcquired ;
23132350 }
23142351 Instant currTime = clock .instant ();
2352+ maintainMultiplexedSession (currTime );
23152353 removeIdleSessions (currTime );
23162354 // Now go over all the remaining sessions and see if they need to be kept alive explicitly.
23172355 keepAliveSessions (currTime );
@@ -2480,6 +2518,46 @@ private void removeLongRunningSessions(
24802518 }
24812519 }
24822520 }
2521+
2522+ void maintainMultiplexedSession (Instant currentTime ) {
2523+ try {
2524+ if (options .getUseMultiplexedSession ()) {
2525+ synchronized (lock ) {
2526+ if (getMultiplexedSession ().isDone ()
2527+ && getMultiplexedSession ().get () != null
2528+ && isMultiplexedSessionStale (currentTime )) {
2529+ final Instant minExecutionTime =
2530+ multiplexedSessionReplacementAttemptTime .plus (
2531+ multiplexedSessionCreationRetryDelay );
2532+ if (currentTime .isBefore (minExecutionTime )) {
2533+ return ;
2534+ }
2535+
2536+ /*
2537+ This will attempt to create a new multiplexed session. if successfully created then
2538+ the existing session will be replaced. Note that there maybe active transactions
2539+ running on the stale session. Hence, it is important that we only replace the reference
2540+ and not invoke a DeleteSession RPC.
2541+ */
2542+ maybeCreateMultiplexedSession (multiplexedMaintainerConsumer );
2543+
2544+ // update this only after we have attempted to replace the multiplexed session
2545+ multiplexedSessionReplacementAttemptTime = currentTime ;
2546+ }
2547+ }
2548+ }
2549+ } catch (final Throwable t ) {
2550+ logger .log (Level .WARNING , "Failed to maintain multiplexed session" , t );
2551+ }
2552+ }
2553+
2554+ boolean isMultiplexedSessionStale (Instant currentTime ) {
2555+ final CachedSession session = getMultiplexedSession ().get ();
2556+ final Duration durationFromCreationTime =
2557+ Duration .between (session .getDelegate ().getCreateTime (), currentTime );
2558+ return durationFromCreationTime .compareTo (options .getMultiplexedSessionMaintenanceDuration ())
2559+ > 0 ;
2560+ }
24832561 }
24842562
24852563 enum Position {
@@ -2556,6 +2634,9 @@ enum Position {
25562634 @ GuardedBy ("lock" )
25572635 private int numSessionsBeingCreated = 0 ;
25582636
2637+ @ GuardedBy ("lock" )
2638+ private boolean multiplexedSessionBeingCreated = false ;
2639+
25592640 @ GuardedBy ("lock" )
25602641 private int numSessionsInUse = 0 ;
25612642
@@ -2585,6 +2666,10 @@ enum Position {
25852666
25862667 private AtomicLong numWaiterTimeouts = new AtomicLong ();
25872668
2669+ private final AtomicReference <SettableApiFuture <MultiplexedSessionFuture >>
2670+ currentMultiplexedSessionReference = new AtomicReference <>(SettableApiFuture .create ());
2671+ MultiplexedSessionFutureWrapper wrappedMultiplexedSessionFuture = null ;
2672+
25882673 @ GuardedBy ("lock" )
25892674 private final Set <PooledSession > allSessions = new HashSet <>();
25902675
@@ -2593,9 +2678,16 @@ enum Position {
25932678 final Set <PooledSessionFuture > checkedOutSessions = new HashSet <>();
25942679
25952680 private final SessionConsumer sessionConsumer = new SessionConsumerImpl ();
2681+
2682+ private final MultiplexedSessionInitializationConsumer multiplexedSessionInitializationConsumer =
2683+ new MultiplexedSessionInitializationConsumer ();
2684+ private final MultiplexedSessionMaintainerConsumer multiplexedMaintainerConsumer =
2685+ new MultiplexedSessionMaintainerConsumer ();
2686+
25962687 @ VisibleForTesting Function <PooledSession , Void > idleSessionRemovedListener ;
25972688
25982689 @ VisibleForTesting Function <PooledSession , Void > longRunningSessionRemovedListener ;
2690+ @ VisibleForTesting Function <MultiplexedSession , Void > multiplexedSessionRemovedListener ;
25992691 private final CountDownLatch waitOnMinSessionsLatch ;
26002692 private final SessionReplacementHandler pooledSessionReplacementHandler =
26012693 new PooledSessionReplacementHandler ();
@@ -2839,6 +2931,9 @@ private void initPool() {
28392931 if (options .getMinSessions () > 0 ) {
28402932 createSessions (options .getMinSessions (), true );
28412933 }
2934+ if (options .getUseMultiplexedSession ()) {
2935+ maybeCreateMultiplexedSession (multiplexedSessionInitializationConsumer );
2936+ }
28422937 }
28432938 }
28442939
@@ -2900,6 +2995,38 @@ boolean isValid() {
29002995 }
29012996 }
29022997
2998+ /**
2999+ * Returns a multiplexed session. The method fallbacks to a regular session if {@link
3000+ * SessionPoolOptions#useMultiplexedSession} is not set.
3001+ */
3002+ SessionFutureWrapper getMultiplexedSessionWithFallback () throws SpannerException {
3003+ if (options .getUseMultiplexedSession ()) {
3004+ try {
3005+ SessionFutureWrapper sessionFuture = getWrappedMultiplexedSessionFuture ();
3006+ incrementNumSessionsInUse (true );
3007+ return sessionFuture ;
3008+ } catch (Throwable t ) {
3009+ ISpan span = tracer .getCurrentSpan ();
3010+ span .addAnnotation ("No multiplexed session available." );
3011+ throw SpannerExceptionFactory .asSpannerException (t .getCause ());
3012+ }
3013+ } else {
3014+ return new PooledSessionFutureWrapper (getSession ());
3015+ }
3016+ }
3017+
3018+ SessionFutureWrapper getWrappedMultiplexedSessionFuture () {
3019+ return wrappedMultiplexedSessionFuture ;
3020+ }
3021+
3022+ /**
3023+ * This method is a blocking method. It will block until the underlying {@code
3024+ * SettableApiFuture<MultiplexedSessionFuture>} is resolved.
3025+ */
3026+ MultiplexedSessionFuture getMultiplexedSession () {
3027+ return (MultiplexedSessionFuture ) getWrappedMultiplexedSessionFuture ().get ();
3028+ }
3029+
29033030 /**
29043031 * Returns a session to be used for requests to spanner. This method is always non-blocking and
29053032 * returns a {@link PooledSessionFuture}. In case the pool is exhausted and {@link
@@ -3303,6 +3430,20 @@ private boolean canCreateSession() {
33033430 }
33043431 }
33053432
3433+ private void maybeCreateMultiplexedSession (SessionConsumer sessionConsumer ) {
3434+ synchronized (lock ) {
3435+ if (!multiplexedSessionBeingCreated ) {
3436+ logger .log (Level .FINE , String .format ("Creating multiplexed sessions" ));
3437+ try {
3438+ multiplexedSessionBeingCreated = true ;
3439+ sessionClient .createMultiplexedSession (sessionConsumer );
3440+ } catch (Throwable ignore ) {
3441+ // such an exception will never be thrown. the exception will be passed onto the consumer.
3442+ }
3443+ }
3444+ }
3445+ }
3446+
33063447 private void createSessions (final int sessionCount , boolean distributeOverChannels ) {
33073448 logger .log (Level .FINE , String .format ("Creating %d sessions" , sessionCount ));
33083449 synchronized (lock ) {
@@ -3325,6 +3466,103 @@ private void createSessions(final int sessionCount, boolean distributeOverChanne
33253466 }
33263467 }
33273468
3469+ /**
3470+ * Callback interface which is invoked when a multiplexed session is being replaced by the
3471+ * background maintenance thread. When a multiplexed session creation fails during background
3472+ * thread, it would simply log the exception and retry the session creation in the next background
3473+ * thread invocation.
3474+ *
3475+ * <p>This consumer is not used when the multiplexed session is getting initialized for the first
3476+ * time during application startup. We instead use {@link
3477+ * MultiplexedSessionInitializationConsumer} for the first time when multiplexed session is
3478+ * getting created.
3479+ */
3480+ class MultiplexedSessionMaintainerConsumer implements SessionConsumer {
3481+ @ Override
3482+ public void onSessionReady (SessionImpl sessionImpl ) {
3483+ final SettableFuture <MultiplexedSession > settableFuture = SettableFuture .create ();
3484+ final MultiplexedSession newSession = new MultiplexedSession (sessionImpl );
3485+ settableFuture .set (newSession );
3486+
3487+ synchronized (lock ) {
3488+ MultiplexedSession oldSession = null ;
3489+ if (currentMultiplexedSessionReference .get ().isDone ()) {
3490+ oldSession = getMultiplexedSession ().get ();
3491+ }
3492+ SettableApiFuture <MultiplexedSessionFuture > settableApiFuture = SettableApiFuture .create ();
3493+ settableApiFuture .set (new MultiplexedSessionFuture (settableFuture ));
3494+ currentMultiplexedSessionReference .set (settableApiFuture );
3495+ wrappedMultiplexedSessionFuture = new MultiplexedSessionFutureWrapper (settableApiFuture );
3496+ if (oldSession != null ) {
3497+ logger .log (
3498+ Level .INFO ,
3499+ String .format (
3500+ "Removed Multiplexed Session => %s created at => %s and" ,
3501+ oldSession .getName (), oldSession .getDelegate ().getCreateTime ()));
3502+ if (multiplexedSessionRemovedListener != null ) {
3503+ multiplexedSessionRemovedListener .apply (oldSession );
3504+ }
3505+ }
3506+ multiplexedSessionBeingCreated = false ;
3507+ }
3508+ }
3509+
3510+ /**
3511+ * Method which logs the exception so that session creation can be re-attempted in the next
3512+ * background thread invocation.
3513+ */
3514+ @ Override
3515+ public void onSessionCreateFailure (Throwable t , int createFailureForSessionCount ) {
3516+ synchronized (lock ) {
3517+ multiplexedSessionBeingCreated = false ;
3518+ wrappedMultiplexedSessionFuture =
3519+ new MultiplexedSessionFutureWrapper (currentMultiplexedSessionReference .get ());
3520+ }
3521+ logger .log (
3522+ Level .WARNING ,
3523+ String .format (
3524+ "Failed to create multiplexed session. "
3525+ + "Pending replacing stale multiplexed session" ,
3526+ t ));
3527+ }
3528+ }
3529+
3530+ /**
3531+ * Callback interface which is invoked when a multiplexed session is getting initialised for the
3532+ * first time when a session is getting created.
3533+ */
3534+ class MultiplexedSessionInitializationConsumer implements SessionConsumer {
3535+ @ Override
3536+ public void onSessionReady (SessionImpl sessionImpl ) {
3537+ final SettableFuture <MultiplexedSession > settableFuture = SettableFuture .create ();
3538+ final MultiplexedSession newSession = new MultiplexedSession (sessionImpl );
3539+ settableFuture .set (newSession );
3540+
3541+ synchronized (lock ) {
3542+ SettableApiFuture <MultiplexedSessionFuture > settableApiFuture =
3543+ currentMultiplexedSessionReference .get ();
3544+ settableApiFuture .set (new MultiplexedSessionFuture (settableFuture ));
3545+ wrappedMultiplexedSessionFuture = new MultiplexedSessionFutureWrapper (settableApiFuture );
3546+ multiplexedSessionBeingCreated = false ;
3547+ }
3548+ }
3549+
3550+ /**
3551+ * When a multiplexed session fails during initialization we would like all pending threads to
3552+ * receive the exception and throw the error. This is done because at the time of start up there
3553+ * is no other multiplexed session which could have been assigned to the pending requests.
3554+ */
3555+ @ Override
3556+ public void onSessionCreateFailure (Throwable t , int createFailureForSessionCount ) {
3557+ synchronized (lock ) {
3558+ multiplexedSessionBeingCreated = false ;
3559+ wrappedMultiplexedSessionFuture =
3560+ new MultiplexedSessionFutureWrapper (currentMultiplexedSessionReference .get ());
3561+ currentMultiplexedSessionReference .get ().setException (newSpannerException (t ));
3562+ }
3563+ }
3564+ }
3565+
33283566 /**
33293567 * {@link SessionConsumer} that receives the created sessions from a {@link SessionClient} and
33303568 * releases these into the pool. The session pool only needs one instance of this, as all sessions
0 commit comments