@@ -1093,6 +1093,7 @@ private static enum Position {
10931093 private final ScheduledExecutorService executor ;
10941094 private final ExecutorFactory <ScheduledExecutorService > executorFactory ;
10951095 private final ScheduledExecutorService prepareExecutor ;
1096+ private final int prepareThreadPoolSize ;
10961097 final PoolMaintainer poolMaintainer ;
10971098 private final Clock clock ;
10981099 private final Object lock = new Object ();
@@ -1137,6 +1138,12 @@ private static enum Position {
11371138 @ GuardedBy ("lock" )
11381139 private long numSessionsReleased = 0 ;
11391140
1141+ @ GuardedBy ("lock" )
1142+ private long numSessionsInProcessPrepared = 0 ;
1143+
1144+ @ GuardedBy ("lock" )
1145+ private long numSessionsAsyncPrepared = 0 ;
1146+
11401147 private AtomicLong numWaiterTimeouts = new AtomicLong ();
11411148
11421149 @ GuardedBy ("lock" )
@@ -1213,15 +1220,14 @@ private SessionPool(
12131220 this .options = options ;
12141221 this .executorFactory = executorFactory ;
12151222 this .executor = executor ;
1216- int prepareThreads ;
12171223 if (executor instanceof ThreadPoolExecutor ) {
1218- prepareThreads = Math .max (((ThreadPoolExecutor ) executor ).getCorePoolSize (), 1 );
1224+ prepareThreadPoolSize = Math .max (((ThreadPoolExecutor ) executor ).getCorePoolSize (), 1 );
12191225 } else {
1220- prepareThreads = 8 ;
1226+ prepareThreadPoolSize = 8 ;
12211227 }
12221228 this .prepareExecutor =
12231229 Executors .newScheduledThreadPool (
1224- prepareThreads ,
1230+ prepareThreadPoolSize ,
12251231 new ThreadFactoryBuilder ()
12261232 .setDaemon (true )
12271233 .setNameFormat ("session-pool-prepare-%d" )
@@ -1232,6 +1238,20 @@ private SessionPool(
12321238 this .initMetricsCollection (metricRegistry , labelValues );
12331239 }
12341240
1241+ @ VisibleForTesting
1242+ long getNumberOfSessionsInProcessPrepared () {
1243+ synchronized (lock ) {
1244+ return numSessionsInProcessPrepared ;
1245+ }
1246+ }
1247+
1248+ @ VisibleForTesting
1249+ long getNumberOfSessionsAsyncPrepared () {
1250+ synchronized (lock ) {
1251+ return numSessionsAsyncPrepared ;
1252+ }
1253+ }
1254+
12351255 @ VisibleForTesting
12361256 int getNumberOfAvailableWritePreparedSessions () {
12371257 synchronized (lock ) {
@@ -1416,46 +1436,93 @@ PooledSession getReadSession() throws SpannerException {
14161436 PooledSession getReadWriteSession () {
14171437 Span span = Tracing .getTracer ().getCurrentSpan ();
14181438 span .addAnnotation ("Acquiring read write session" );
1419- Waiter waiter = null ;
14201439 PooledSession sess = null ;
1421- synchronized (lock ) {
1422- if (closureFuture != null ) {
1423- span .addAnnotation ("Pool has been closed" );
1424- throw new IllegalStateException ("Pool has been closed" );
1440+ // Loop to retry SessionNotFoundExceptions that might occur during in-process prepare of a
1441+ // session.
1442+ while (true ) {
1443+ Waiter waiter = null ;
1444+ boolean inProcessPrepare = false ;
1445+ synchronized (lock ) {
1446+ if (closureFuture != null ) {
1447+ span .addAnnotation ("Pool has been closed" );
1448+ throw new IllegalStateException ("Pool has been closed" );
1449+ }
1450+ if (resourceNotFoundException != null ) {
1451+ span .addAnnotation ("Database has been deleted" );
1452+ throw SpannerExceptionFactory .newSpannerException (
1453+ ErrorCode .NOT_FOUND ,
1454+ String .format (
1455+ "The session pool has been invalidated because a previous RPC returned 'Database not found': %s" ,
1456+ resourceNotFoundException .getMessage ()),
1457+ resourceNotFoundException );
1458+ }
1459+ sess = writePreparedSessions .poll ();
1460+ if (sess == null ) {
1461+ if (numSessionsBeingPrepared <= prepareThreadPoolSize ) {
1462+ if (numSessionsBeingPrepared <= readWriteWaiters .size ()) {
1463+ PooledSession readSession = readSessions .poll ();
1464+ if (readSession != null ) {
1465+ span .addAnnotation (
1466+ "Acquired read only session. Preparing for read write transaction" );
1467+ prepareSession (readSession );
1468+ } else {
1469+ span .addAnnotation ("No session available" );
1470+ maybeCreateSession ();
1471+ }
1472+ }
1473+ } else {
1474+ inProcessPrepare = true ;
1475+ numSessionsInProcessPrepared ++;
1476+ PooledSession readSession = readSessions .poll ();
1477+ if (readSession != null ) {
1478+ // Create a read/write transaction in-process if there is already a queue for prepared
1479+ // sessions. This is more efficient than doing it asynchronously, as it scales with
1480+ // the number of user threads. The thread pool for asynchronously preparing sessions
1481+ // is fixed.
1482+ span .addAnnotation (
1483+ "Acquired read only session. Preparing in-process for read write transaction" );
1484+ sess = readSession ;
1485+ } else {
1486+ span .addAnnotation ("No session available" );
1487+ maybeCreateSession ();
1488+ }
1489+ }
1490+ if (sess == null ) {
1491+ waiter = new Waiter ();
1492+ if (inProcessPrepare ) {
1493+ readWaiters .add (waiter );
1494+ } else {
1495+ readWriteWaiters .add (waiter );
1496+ }
1497+ }
1498+ } else {
1499+ span .addAnnotation ("Acquired read write session" );
1500+ }
14251501 }
1426- if (resourceNotFoundException != null ) {
1427- span .addAnnotation ("Database has been deleted" );
1428- throw SpannerExceptionFactory .newSpannerException (
1429- ErrorCode .NOT_FOUND ,
1430- String .format (
1431- "The session pool has been invalidated because a previous RPC returned 'Database not found': %s" ,
1432- resourceNotFoundException .getMessage ()),
1433- resourceNotFoundException );
1502+ if (waiter != null ) {
1503+ logger .log (
1504+ Level .FINE ,
1505+ "No session available in the pool. Blocking for one to become available/created" );
1506+ span .addAnnotation ("Waiting for read write session to be available" );
1507+ sess = waiter .take ();
14341508 }
1435- sess = writePreparedSessions .poll ();
1436- if (sess == null ) {
1437- if (numSessionsBeingPrepared <= readWriteWaiters .size ()) {
1438- PooledSession readSession = readSessions .poll ();
1439- if (readSession != null ) {
1440- span .addAnnotation ("Acquired read only session. Preparing for read write transaction" );
1441- prepareSession (readSession );
1442- } else {
1443- span .addAnnotation ("No session available" );
1444- maybeCreateSession ();
1509+ if (inProcessPrepare ) {
1510+ try {
1511+ sess .prepareReadWriteTransaction ();
1512+ } catch (Throwable t ) {
1513+ sess = null ;
1514+ SpannerException e = newSpannerException (t );
1515+ if (!isClosed ()) {
1516+ handlePrepareSessionFailure (e , sess , false );
1517+ }
1518+ if (!isSessionNotFound (e )) {
1519+ throw e ;
14451520 }
14461521 }
1447- waiter = new Waiter ();
1448- readWriteWaiters .add (waiter );
1449- } else {
1450- span .addAnnotation ("Acquired read write session" );
14511522 }
1452- }
1453- if (waiter != null ) {
1454- logger .log (
1455- Level .FINE ,
1456- "No session available in the pool. Blocking for one to become available/created" );
1457- span .addAnnotation ("Waiting for read write session to be available" );
1458- sess = waiter .take ();
1523+ if (sess != null ) {
1524+ break ;
1525+ }
14591526 }
14601527 sess .markBusy ();
14611528 incrementNumSessionsInUse ();
@@ -1583,7 +1650,8 @@ private void handleCreateSessionsFailure(SpannerException e, int count) {
15831650 }
15841651 }
15851652
1586- private void handlePrepareSessionFailure (SpannerException e , PooledSession session ) {
1653+ private void handlePrepareSessionFailure (
1654+ SpannerException e , PooledSession session , boolean informFirstWaiter ) {
15871655 synchronized (lock ) {
15881656 if (isSessionNotFound (e )) {
15891657 invalidateSession (session );
@@ -1606,7 +1674,7 @@ private void handlePrepareSessionFailure(SpannerException e, PooledSession sessi
16061674 MoreObjects .firstNonNull (
16071675 this .resourceNotFoundException ,
16081676 isDatabaseOrInstanceNotFound (e ) ? (ResourceNotFoundException ) e : null );
1609- } else if (readWriteWaiters .size () > 0 ) {
1677+ } else if (informFirstWaiter && readWriteWaiters .size () > 0 ) {
16101678 releaseSession (session , Position .FIRST );
16111679 readWriteWaiters .poll ().put (e );
16121680 } else {
@@ -1755,6 +1823,7 @@ public void run() {
17551823 sess .prepareReadWriteTransaction ();
17561824 logger .log (Level .FINE , "Session prepared" );
17571825 synchronized (lock ) {
1826+ numSessionsAsyncPrepared ++;
17581827 numSessionsBeingPrepared --;
17591828 if (!isClosed ()) {
17601829 if (readWriteWaiters .size () > 0 ) {
@@ -1770,7 +1839,7 @@ public void run() {
17701839 synchronized (lock ) {
17711840 numSessionsBeingPrepared --;
17721841 if (!isClosed ()) {
1773- handlePrepareSessionFailure (newSpannerException (t ), sess );
1842+ handlePrepareSessionFailure (newSpannerException (t ), sess , true );
17741843 }
17751844 }
17761845 }
0 commit comments