2323import org .elasticsearch .cluster .ClusterService ;
2424import org .elasticsearch .cluster .ClusterState ;
2525import org .elasticsearch .cluster .ClusterStateUpdateTask ;
26+ import org .elasticsearch .cluster .ProcessedClusterStateUpdateTask ;
2627import org .elasticsearch .cluster .node .DiscoveryNodes ;
2728import org .elasticsearch .cluster .routing .IndexRoutingTable ;
2829import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
4445import java .util .ArrayList ;
4546import java .util .List ;
4647import java .util .concurrent .BlockingQueue ;
48+ import java .util .concurrent .atomic .AtomicBoolean ;
4749
4850import static org .elasticsearch .cluster .ClusterState .newClusterStateBuilder ;
4951import static org .elasticsearch .cluster .routing .ImmutableShardRouting .readShardRoutingEntry ;
@@ -59,6 +61,7 @@ public class ShardStateAction extends AbstractComponent {
5961 private final ThreadPool threadPool ;
6062
6163 private final BlockingQueue <ShardRouting > startedShardsQueue = ConcurrentCollections .newBlockingQueue ();
64+ private final AtomicBoolean rerouteRequired = new AtomicBoolean ();
6265
6366 @ Inject
6467 public ShardStateAction (Settings settings , ClusterService clusterService , TransportService transportService ,
@@ -137,7 +140,7 @@ private void innerShardStarted(final ShardRouting shardRouting, final String rea
137140 // process started events as fast as possible, to make shards available
138141 startedShardsQueue .add (shardRouting );
139142
140- clusterService .submitStateUpdateTask ("shard-started (" + shardRouting + "), reason [" + reason + "]" , Priority .HIGH , new ClusterStateUpdateTask () {
143+ clusterService .submitStateUpdateTask ("shard-started (" + shardRouting + "), reason [" + reason + "]" , Priority .HIGH , new ProcessedClusterStateUpdateTask () {
141144 @ Override
142145 public ClusterState execute (ClusterState currentState ) {
143146
@@ -182,12 +185,32 @@ public ClusterState execute(ClusterState currentState) {
182185 if (logger .isDebugEnabled ()) {
183186 logger .debug ("applying started shards {}, reason [{}]" , shards , reason );
184187 }
185- RoutingAllocation .Result routingResult = allocationService .applyStartedShards (currentState , shards );
188+ // we don't do reroute right away, we do it after publishing the fact that it was started
189+ RoutingAllocation .Result routingResult = allocationService .applyStartedShards (currentState , shards , false );
186190 if (!routingResult .changed ()) {
187191 return currentState ;
188192 }
189193 return newClusterStateBuilder ().state (currentState ).routingResult (routingResult ).build ();
190194 }
195+
196+ @ Override
197+ public void clusterStateProcessed (ClusterState clusterState ) {
198+ rerouteRequired .set (true );
199+ clusterService .submitStateUpdateTask ("reroute post shard-started (" + shardRouting + "), reason [" + reason + "]" , new ClusterStateUpdateTask () {
200+ @ Override
201+ public ClusterState execute (ClusterState currentState ) {
202+ if (rerouteRequired .compareAndSet (true , false )) {
203+ RoutingAllocation .Result routingResult = allocationService .reroute (currentState );
204+ if (!routingResult .changed ()) {
205+ return currentState ;
206+ }
207+ return newClusterStateBuilder ().state (currentState ).routingResult (routingResult ).build ();
208+ } else {
209+ return currentState ;
210+ }
211+ }
212+ });
213+ }
191214 });
192215 }
193216
0 commit comments