@@ -235,39 +235,9 @@ protected TransportRequestOptions transportOptions(Settings settings) {
235235 return TransportRequestOptions .EMPTY ;
236236 }
237237
238- private String concreteIndex (final ClusterState state , final ReplicationRequest request ) {
239- return resolveIndex () ? indexNameExpressionResolver .concreteSingleIndex (state , request ).getName () : request .index ();
240- }
241-
242- private ClusterBlockException blockExceptions (final ClusterState state , final String indexName ) {
243- ClusterBlockLevel globalBlockLevel = globalBlockLevel ();
244- if (globalBlockLevel != null ) {
245- ClusterBlockException blockException = state .blocks ().globalBlockedException (globalBlockLevel );
246- if (blockException != null ) {
247- return blockException ;
248- }
249- }
250- ClusterBlockLevel indexBlockLevel = indexBlockLevel ();
251- if (indexBlockLevel != null ) {
252- ClusterBlockException blockException = state .blocks ().indexBlockedException (indexBlockLevel , indexName );
253- if (blockException != null ) {
254- return blockException ;
255- }
256- }
257- return null ;
258- }
259-
260238 protected boolean retryPrimaryException (final Throwable e ) {
261239 return e .getClass () == ReplicationOperation .RetryOnPrimaryException .class
262- || TransportActions .isShardNotAvailableException (e )
263- || isRetryableClusterBlockException (e );
264- }
265-
266- boolean isRetryableClusterBlockException (final Throwable e ) {
267- if (e instanceof ClusterBlockException ) {
268- return ((ClusterBlockException ) e ).retryable ();
269- }
270- return false ;
240+ || TransportActions .isShardNotAvailableException (e );
271241 }
272242
273243 protected class OperationTransportHandler implements TransportRequestHandler <Request > {
@@ -340,15 +310,6 @@ protected void doRun() throws Exception {
340310 @ Override
341311 public void onResponse (PrimaryShardReference primaryShardReference ) {
342312 try {
343- final ClusterState clusterState = clusterService .state ();
344- final IndexMetaData indexMetaData = clusterState .metaData ().getIndexSafe (primaryShardReference .routingEntry ().index ());
345-
346- final ClusterBlockException blockException = blockExceptions (clusterState , indexMetaData .getIndex ().getName ());
347- if (blockException != null ) {
348- logger .trace ("cluster is blocked, action failed on primary" , blockException );
349- throw blockException ;
350- }
351-
352313 if (primaryShardReference .isRelocated ()) {
353314 primaryShardReference .close (); // release shard operation lock as soon as possible
354315 setPhase (replicationTask , "primary_delegation" );
@@ -362,7 +323,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
362323 response .readFrom (in );
363324 return response ;
364325 };
365- DiscoveryNode relocatingNode = clusterState .nodes ().get (primary .relocatingNodeId ());
326+ DiscoveryNode relocatingNode = clusterService . state () .nodes ().get (primary .relocatingNodeId ());
366327 transportService .sendRequest (relocatingNode , transportPrimaryAction ,
367328 new ConcreteShardRequest <>(request , primary .allocationId ().getRelocationId (), primaryTerm ),
368329 transportOptions ,
@@ -735,42 +696,35 @@ public void onFailure(Exception e) {
735696 protected void doRun () {
736697 setPhase (task , "routing" );
737698 final ClusterState state = observer .setAndGetObservedState ();
738- final String concreteIndex = concreteIndex (state , request );
739- final ClusterBlockException blockException = blockExceptions (state , concreteIndex );
740- if (blockException != null ) {
741- if (blockException .retryable ()) {
742- logger .trace ("cluster is blocked, scheduling a retry" , blockException );
743- retry (blockException );
744- } else {
745- finishAsFailed (blockException );
746- }
747- } else {
748- // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
749- final IndexMetaData indexMetaData = state .metaData ().index (concreteIndex );
750- if (indexMetaData == null ) {
751- retry (new IndexNotFoundException (concreteIndex ));
752- return ;
753- }
754- if (indexMetaData .getState () == IndexMetaData .State .CLOSE ) {
755- throw new IndexClosedException (indexMetaData .getIndex ());
756- }
699+ if (handleBlockExceptions (state )) {
700+ return ;
701+ }
702+
703+ // request does not have a shardId yet, we need to pass the concrete index to resolve shardId
704+ final String concreteIndex = concreteIndex (state );
705+ final IndexMetaData indexMetaData = state .metaData ().index (concreteIndex );
706+ if (indexMetaData == null ) {
707+ retry (new IndexNotFoundException (concreteIndex ));
708+ return ;
709+ }
710+ if (indexMetaData .getState () == IndexMetaData .State .CLOSE ) {
711+ throw new IndexClosedException (indexMetaData .getIndex ());
712+ }
757713
758- // resolve all derived request fields, so we can route and apply it
759- resolveRequest (indexMetaData , request );
760- assert request .shardId () != null : "request shardId must be set in resolveRequest" ;
761- assert request .waitForActiveShards () != ActiveShardCount .DEFAULT :
762- "request waitForActiveShards must be set in resolveRequest" ;
714+ // resolve all derived request fields, so we can route and apply it
715+ resolveRequest (indexMetaData , request );
716+ assert request .shardId () != null : "request shardId must be set in resolveRequest" ;
717+ assert request .waitForActiveShards () != ActiveShardCount .DEFAULT : "request waitForActiveShards must be set in resolveRequest" ;
763718
764- final ShardRouting primary = primary (state );
765- if (retryIfUnavailable (state , primary )) {
766- return ;
767- }
768- final DiscoveryNode node = state .nodes ().get (primary .currentNodeId ());
769- if (primary .currentNodeId ().equals (state .nodes ().getLocalNodeId ())) {
770- performLocalAction (state , primary , node , indexMetaData );
771- } else {
772- performRemoteAction (state , primary , node );
773- }
719+ final ShardRouting primary = primary (state );
720+ if (retryIfUnavailable (state , primary )) {
721+ return ;
722+ }
723+ final DiscoveryNode node = state .nodes ().get (primary .currentNodeId ());
724+ if (primary .currentNodeId ().equals (state .nodes ().getLocalNodeId ())) {
725+ performLocalAction (state , primary , node , indexMetaData );
726+ } else {
727+ performRemoteAction (state , primary , node );
774728 }
775729 }
776730
@@ -822,11 +776,44 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
822776 return false ;
823777 }
824778
779+ private String concreteIndex (ClusterState state ) {
780+ return resolveIndex () ? indexNameExpressionResolver .concreteSingleIndex (state , request ).getName () : request .index ();
781+ }
782+
825783 private ShardRouting primary (ClusterState state ) {
826784 IndexShardRoutingTable indexShard = state .getRoutingTable ().shardRoutingTable (request .shardId ());
827785 return indexShard .primaryShard ();
828786 }
829787
788+ private boolean handleBlockExceptions (ClusterState state ) {
789+ ClusterBlockLevel globalBlockLevel = globalBlockLevel ();
790+ if (globalBlockLevel != null ) {
791+ ClusterBlockException blockException = state .blocks ().globalBlockedException (globalBlockLevel );
792+ if (blockException != null ) {
793+ handleBlockException (blockException );
794+ return true ;
795+ }
796+ }
797+ ClusterBlockLevel indexBlockLevel = indexBlockLevel ();
798+ if (indexBlockLevel != null ) {
799+ ClusterBlockException blockException = state .blocks ().indexBlockedException (indexBlockLevel , concreteIndex (state ));
800+ if (blockException != null ) {
801+ handleBlockException (blockException );
802+ return true ;
803+ }
804+ }
805+ return false ;
806+ }
807+
808+ private void handleBlockException (ClusterBlockException blockException ) {
809+ if (blockException .retryable ()) {
810+ logger .trace ("cluster is blocked, scheduling a retry" , blockException );
811+ retry (blockException );
812+ } else {
813+ finishAsFailed (blockException );
814+ }
815+ }
816+
830817 private void performAction (final DiscoveryNode node , final String action , final boolean isPrimaryAction ,
831818 final TransportRequest requestToPerform ) {
832819 transportService .sendRequest (node , action , requestToPerform , transportOptions , new TransportResponseHandler <Response >() {
0 commit comments