1616
1717package com .google .cloud .spanner ;
1818
19+ import static com .google .cloud .spanner .SessionClient .optionMap ;
1920import static com .google .cloud .spanner .SpannerExceptionFactory .newSpannerException ;
2021import static com .google .common .base .Preconditions .checkArgument ;
2122import static com .google .common .base .Preconditions .checkNotNull ;
3233import com .google .cloud .spanner .AsyncResultSet .ReadyCallback ;
3334import com .google .cloud .spanner .Options .QueryOption ;
3435import com .google .cloud .spanner .Options .ReadOption ;
36+ import com .google .cloud .spanner .SessionClient .SessionOption ;
3537import com .google .cloud .spanner .SessionImpl .SessionTransaction ;
3638import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
3739import com .google .common .annotations .VisibleForTesting ;
5254import com .google .spanner .v1 .TransactionOptions ;
5355import com .google .spanner .v1 .TransactionSelector ;
5456import java .util .Map ;
57+ import java .util .concurrent .ThreadLocalRandom ;
5558import java .util .concurrent .atomic .AtomicLong ;
5659import javax .annotation .Nullable ;
5760import javax .annotation .concurrent .GuardedBy ;
@@ -180,9 +183,15 @@ static Builder newBuilder() {
180183 @ GuardedBy ("lock" )
181184 private boolean used ;
182185
186+ private final Map <SpannerRpc .Option , ?> channelHint ;
187+
183188 private SingleReadContext (Builder builder ) {
184189 super (builder );
185190 this .bound = builder .bound ;
191+ // single use transaction have a single RPC and hence there is no need
192+ // of a channel hint. GAX will automatically choose a hint when used
193+ // with a multiplexed session.
194+ this .channelHint = getChannelHintOptions (session .getOptions (), null );
186195 }
187196
188197 @ Override
@@ -209,6 +218,11 @@ TransactionSelector getTransactionSelector() {
209218 .setSingleUse (TransactionOptions .newBuilder ().setReadOnly (bound .toProto ()))
210219 .build ();
211220 }
221+
222+ @ Override
223+ Map <SpannerRpc .Option , ?> getTransactionChannelHint () {
224+ return channelHint ;
225+ }
212226 }
213227
214228 private static void assertTimestampAvailable (boolean available ) {
@@ -217,6 +231,7 @@ private static void assertTimestampAvailable(boolean available) {
217231
218232 static class SingleUseReadOnlyTransaction extends SingleReadContext
219233 implements ReadOnlyTransaction {
234+
220235 @ GuardedBy ("lock" )
221236 private Timestamp timestamp ;
222237
@@ -300,6 +315,8 @@ static Builder newBuilder() {
300315 @ GuardedBy ("txnLock" )
301316 private ByteString transactionId ;
302317
318+ private final Map <SpannerRpc .Option , ?> channelHint ;
319+
303320 MultiUseReadOnlyTransaction (Builder builder ) {
304321 super (builder );
305322 checkArgument (
@@ -318,6 +335,14 @@ static Builder newBuilder() {
318335 this .timestamp = builder .timestamp ;
319336 this .transactionId = builder .transactionId ;
320337 }
338+ this .channelHint =
339+ getChannelHintOptions (
340+ session .getOptions (), ThreadLocalRandom .current ().nextLong (Long .MAX_VALUE ));
341+ }
342+
343+ @ Override
344+ public Map <SpannerRpc .Option , ?> getTransactionChannelHint () {
345+ return channelHint ;
321346 }
322347
323348 @ Override
@@ -380,7 +405,7 @@ void initTransaction() {
380405 .setOptions (options )
381406 .build ();
382407 Transaction transaction =
383- rpc .beginTransaction (request , session . getOptions (), isRouteToLeader ());
408+ rpc .beginTransaction (request , getTransactionChannelHint (), isRouteToLeader ());
384409 if (!transaction .hasReadTimestamp ()) {
385410 throw SpannerExceptionFactory .newSpannerException (
386411 ErrorCode .INTERNAL , "Missing expected transaction.read_timestamp metadata field" );
@@ -727,7 +752,10 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
727752 }
728753 SpannerRpc .StreamingCall call =
729754 rpc .executeQuery (
730- request .build (), stream .consumer (), session .getOptions (), isRouteToLeader ());
755+ request .build (),
756+ stream .consumer (),
757+ getTransactionChannelHint (),
758+ isRouteToLeader ());
731759 session .markUsed (clock .instant ());
732760 call .request (prefetchChunks );
733761 stream .setCall (call , request .getTransaction ().hasBegin ());
@@ -738,6 +766,16 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
738766 stream , this , options .hasDecodeMode () ? options .decodeMode () : defaultDecodeMode );
739767 }
740768
769+ Map <SpannerRpc .Option , ?> getChannelHintOptions (
770+ Map <SpannerRpc .Option , ?> channelHintForSession , Long channelHintForTransaction ) {
771+ if (channelHintForSession != null ) {
772+ return channelHintForSession ;
773+ } else if (channelHintForTransaction != null ) {
774+ return optionMap (SessionOption .channelHint (channelHintForTransaction ));
775+ }
776+ return null ;
777+ }
778+
741779 /**
742780 * Called before any read or query is started to perform state checks and initializations.
743781 * Subclasses should call {@code super.beforeReadOrQuery()} if overriding.
@@ -782,6 +820,12 @@ public void close() {
782820 @ Nullable
783821 abstract TransactionSelector getTransactionSelector ();
784822
823+ /**
824+ * Channel hint to be used for a transaction. This enables soft-stickiness per transaction by
825+ * ensuring all RPCs within a transaction land up on the same channel.
826+ */
827+ abstract Map <SpannerRpc .Option , ?> getTransactionChannelHint ();
828+
785829 /**
786830 * Returns the transaction tag for this {@link AbstractReadContext} or <code>null</code> if this
787831 * {@link AbstractReadContext} does not have a transaction tag.
@@ -872,7 +916,10 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
872916 builder .setRequestOptions (buildRequestOptions (readOptions ));
873917 SpannerRpc .StreamingCall call =
874918 rpc .read (
875- builder .build (), stream .consumer (), session .getOptions (), isRouteToLeader ());
919+ builder .build (),
920+ stream .consumer (),
921+ getTransactionChannelHint (),
922+ isRouteToLeader ());
876923 session .markUsed (clock .instant ());
877924 call .request (prefetchChunks );
878925 stream .setCall (call , /* withBeginTransaction = */ builder .getTransaction ().hasBegin ());
0 commit comments