Skip to content

Commit 8f93a29

Browse files
committed
reset both
1 parent defbd96 commit 8f93a29

File tree

4 files changed

+400
-2
lines changed

4 files changed

+400
-2
lines changed

server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/AllocationFailuresResetIT.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
package org.elasticsearch.cluster.routing.allocation;
1111

1212
import org.elasticsearch.cluster.routing.ShardRouting;
13+
import org.elasticsearch.cluster.routing.ShardRoutingState;
1314
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
1415
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.index.Index;
1517
import org.elasticsearch.index.shard.IndexEventListener;
1618
import org.elasticsearch.plugins.Plugin;
1719
import org.elasticsearch.test.ESIntegTestCase;
@@ -20,6 +22,12 @@
2022
import org.elasticsearch.test.MockIndexEventListener;
2123

2224
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_PREFIX;
28+
import static org.hamcrest.CoreMatchers.equalTo;
29+
import static org.hamcrest.CoreMatchers.not;
30+
import static org.hamcrest.CoreMatchers.notNullValue;
2331

2432
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
2533
public class AllocationFailuresResetIT extends ESIntegTestCase {
@@ -72,7 +80,7 @@ private void awaitShardAllocSucceed() throws Exception {
7280
});
7381
}
7482

75-
public void testResetFailuresOnNodeJoin() throws Exception {
83+
public void testResetAllocationFailuresOnNodeJoin() throws Exception {
7684
var node1 = internalCluster().startNode();
7785
injectAllocationFailures(node1);
7886
prepareCreate(INDEX, indexSettings(1, 0)).execute();
@@ -82,4 +90,44 @@ public void testResetFailuresOnNodeJoin() throws Exception {
8290
awaitShardAllocSucceed();
8391
}
8492

93+
public void testResetRelocationFailuresOnNodeJoin() throws Exception {
94+
String node1 = internalCluster().startNode();
95+
createIndex(INDEX, 1, 0);
96+
ensureGreen(INDEX);
97+
final var failRelocation = new AtomicBoolean(true);
98+
String node2 = internalCluster().startNode();
99+
internalCluster().getInstance(MockIndexEventListener.TestEventListener.class, node2).setNewDelegate(new IndexEventListener() {
100+
@Override
101+
public void beforeIndexCreated(Index index, Settings indexSettings) {
102+
if (failRelocation.get()) {
103+
throw new RuntimeException("FAIL");
104+
}
105+
}
106+
});
107+
updateIndexSettings(Settings.builder().put(INDEX_ROUTING_EXCLUDE_GROUP_PREFIX + "._name", node1), INDEX);
108+
ensureGreen(INDEX);
109+
// await all relocation attempts are exhausted
110+
var maxAttempts = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY);
111+
assertBusy(() -> {
112+
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
113+
var shard = state.routingTable().index(INDEX).shard(SHARD).primaryShard();
114+
assertThat(shard, notNullValue());
115+
assertThat(shard.relocationFailureInfo().failedRelocations(), equalTo(maxAttempts));
116+
});
117+
// ensure the shard remain started
118+
var state = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
119+
var shard = state.routingTable().index(INDEX).shard(SHARD).primaryShard();
120+
assertThat(shard, notNullValue());
121+
assertThat(shard.state(), equalTo(ShardRoutingState.STARTED));
122+
assertThat(state.nodes().get(shard.currentNodeId()).getName(), equalTo(node1));
123+
failRelocation.set(false);
124+
// A new node joining should reset the counter and allow more relocation retries
125+
internalCluster().startNode();
126+
assertBusy(() -> {
127+
var stateAfterNodeJoin = internalCluster().clusterService().state();
128+
var relocatedShard = stateAfterNodeJoin.routingTable().index(INDEX).shard(SHARD).primaryShard();
129+
assertThat(relocatedShard, notNullValue());
130+
assertThat(stateAfterNodeJoin.nodes().get(relocatedShard.currentNodeId()).getName(), not(equalTo(node1)));
131+
});
132+
}
85133
}

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1298,6 +1298,17 @@ public boolean hasAllocationFailures() {
12981298
}));
12991299
}
13001300

1301+
public boolean hasRelocationFailures() {
1302+
for (var shardRoutings : assignedShards.values()) {
1303+
for (var routing : shardRoutings) {
1304+
if (routing.relocationFailureInfo() != null && routing.relocationFailureInfo().failedRelocations() > 0) {
1305+
return true;
1306+
}
1307+
}
1308+
}
1309+
return false;
1310+
}
1311+
13011312
public void resetFailedCounter(RoutingChangesObserver routingChangesObserver) {
13021313
final var unassignedIterator = unassigned().iterator();
13031314
while (unassignedIterator.hasNext()) {

server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import org.apache.logging.log4j.LogManager;
1313
import org.apache.logging.log4j.Logger;
1414
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.cluster.ClusterChangedEvent;
1516
import org.elasticsearch.cluster.ClusterInfoService;
1617
import org.elasticsearch.cluster.ClusterState;
1718
import org.elasticsearch.cluster.RestoreInProgress;
1819
import org.elasticsearch.cluster.health.ClusterHealthStatus;
1920
import org.elasticsearch.cluster.metadata.AutoExpandReplicas;
2021
import org.elasticsearch.cluster.metadata.IndexMetadata;
2122
import org.elasticsearch.cluster.metadata.Metadata;
23+
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2224
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type;
2325
import org.elasticsearch.cluster.node.DiscoveryNode;
2426
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -51,6 +53,7 @@
5153
import java.util.ArrayList;
5254
import java.util.Collections;
5355
import java.util.Comparator;
56+
import java.util.HashSet;
5457
import java.util.Iterator;
5558
import java.util.List;
5659
import java.util.Map;
@@ -80,6 +83,8 @@ public class AllocationService {
8083
private final ClusterInfoService clusterInfoService;
8184
private final SnapshotsInfoService snapshotsInfoService;
8285
private final ShardRoutingRoleStrategy shardRoutingRoleStrategy;
86+
// Tracks node IDs whose shutdown metadata has already been considered for resetting allocation/relocation failures
87+
private final Set<String> processedNodeShutdowns = new HashSet<>();
8388

8489
// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
8590
@SuppressWarnings("this-escape")
@@ -573,12 +578,43 @@ public void addAllocFailuresResetListenerTo(ClusterService clusterService) {
573578
});
574579

575580
clusterService.addListener((changeEvent) -> {
576-
if (changeEvent.nodesAdded() && changeEvent.state().getRoutingNodes().hasAllocationFailures()) {
581+
if (shouldResetAllocationFailures(changeEvent)) {
577582
taskQueue.submitTask("reset-allocation-failures", (e) -> { assert MasterService.isPublishFailureException(e); }, null);
578583
}
579584
});
580585
}
581586

587+
/**
588+
* We should reset allocation/relocation failure count to allow further retries when:
589+
*
590+
* 1. A new node joins the cluster.
591+
* 2. A node shutdown metadata is added that could lead to a node being removed or replaced in the cluster.
592+
*
593+
* Note that removing a non-RESTART shutdown metadata from a node that is still in the cluster is treated similarly and
594+
* will cause resetting the allocation/relocation failures.
595+
*/
596+
private boolean shouldResetAllocationFailures(ClusterChangedEvent changeEvent) {
597+
final var clusterState = changeEvent.state();
598+
boolean hasAllocationFailures = clusterState.getRoutingNodes().hasAllocationFailures();
599+
boolean hasRelocationFailures = clusterState.getRoutingNodes().hasRelocationFailures();
600+
var shutdownEventAffectsAllocation = false;
601+
final var nodes = clusterState.nodes();
602+
final var nodeShutdowns = clusterState.metadata().nodeShutdowns();
603+
// If we remove a shutdown marker from a node, but it is still in the cluster, we could re-attempt failed relocations/allocations.
604+
shutdownEventAffectsAllocation = processedNodeShutdowns.stream()
605+
.anyMatch(nodeId -> nodeShutdowns.contains(nodeId) == false && nodes.get(nodeId) != null);
606+
// Clean up processed shutdowns that are removed from the cluster metadata
607+
processedNodeShutdowns.removeIf(nodeId -> nodeShutdowns.contains(nodeId) == false);
608+
for (var shutdown : nodeShutdowns.getAll().entrySet()) {
609+
// A RESTART doesn't necessarily move around shards, so no need to consider it for a reset.
610+
// Furthermore, once the node rejoins after restarting, there will be a reset if necessary.
611+
if (shutdown.getValue().getType() != SingleNodeShutdownMetadata.Type.RESTART) {
612+
shutdownEventAffectsAllocation |= processedNodeShutdowns.add(shutdown.getKey());
613+
}
614+
}
615+
return (changeEvent.nodesAdded() || shutdownEventAffectsAllocation) && (hasAllocationFailures || hasRelocationFailures);
616+
}
617+
582618
private ClusterState rerouteWithResetFailedCounter(ClusterState clusterState) {
583619
RoutingAllocation allocation = createRoutingAllocation(clusterState, currentNanoTime());
584620
allocation.routingNodes().resetFailedCounter(allocation.changes());

0 commit comments

Comments
 (0)