Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/96281.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 96281
summary: Fix autoexpand during node replace
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,20 @@ public DiscoveryNode findByAddress(TransportAddress address) {
return null;
}

/**
* Check if a node with provided name exists
*
* @return {@code true} node identified with provided name exists or {@code false} otherwise
*/
public boolean hasByName(String name) {
for (DiscoveryNode node : nodes.values()) {
if (node.getName().equals(name)) {
return true;
}
}
return false;
}

/**
* Returns the version of the node with the oldest version in the cluster that is not a client node
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;

/**
* An allocation decider that ensures that all the shards allocated to the node scheduled for removal are relocated to the replacement node.
* It also ensures that auto-expands replicas are expanded to only the replacement source or target (not both at the same time)
* and only of the shards that were already present on the source node.
*/
public class NodeReplacementAllocationDecider extends AllocationDecider {

public static final String NAME = "node_replacement";
Expand All @@ -38,8 +43,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
Decision.YES,
NAME,
"node [%s] is replacing node [%s], and may receive shards from it",
shardRouting.currentNodeId(),
node.nodeId()
node.nodeId(),
shardRouting.currentNodeId()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order was flipped

);
} else if (isReplacementSource(allocation, shardRouting.currentNodeId())) {
if (allocation.isReconciling()) {
Expand Down Expand Up @@ -110,27 +115,64 @@ public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNod
return YES__NO_REPLACEMENTS;
} else if (isReplacementTargetName(allocation, node.getName())) {
final SingleNodeShutdownMetadata shutdown = allocation.replacementTargetShutdowns().get(node.getName());
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shards cannot auto expand to be on it until the replacement is complete",
node.getId(),
shutdown == null ? null : shutdown.getNodeId()
);
final String sourceNodeId = shutdown != null ? shutdown.getNodeId() : null;
final boolean hasShardsAllocatedOnSourceOrTarget = hasShardOnNode(indexMetadata, node.getId(), allocation)
|| (sourceNodeId != null && hasShardOnNode(indexMetadata, sourceNodeId, allocation));

if (hasShardsAllocatedOnSourceOrTarget) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shard can auto expand to it as it was already present on the source node",
node.getId(),
sourceNodeId
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is a node replacement target for node [%s], "
+ "shards cannot auto expand to be on it until the replacement is complete",
node.getId(),
sourceNodeId
);
}
} else if (isReplacementSource(allocation, node.getId())) {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is being replaced by [%s], shards cannot auto expand to be on it",
node.getId(),
getReplacementName(allocation, node.getId())
);
final SingleNodeShutdownMetadata shutdown = allocation.getClusterState().metadata().nodeShutdowns().get(node.getId());
final String replacementNodeName = shutdown != null ? shutdown.getTargetNodeName() : null;
final boolean hasShardOnSource = hasShardOnNode(indexMetadata, node.getId(), allocation)
&& shutdown != null
&& allocation.getClusterState().getNodes().hasByName(replacementNodeName) == false;

if (hasShardOnSource) {
return allocation.decision(
Decision.YES,
NAME,
"node [%s] is being replaced by [%s], shards can auto expand to be on it "
+ "while replacement node has not joined the cluster",
node.getId(),
replacementNodeName
);
} else {
return allocation.decision(
Decision.NO,
NAME,
"node [%s] is being replaced by [%s], shards cannot auto expand to be on it",
node.getId(),
replacementNodeName
);
}
} else {
return YES__NO_APPLICABLE_REPLACEMENTS;
}
}

private static boolean hasShardOnNode(IndexMetadata indexMetadata, String nodeId, RoutingAllocation allocation) {
RoutingNode node = allocation.routingNodes().node(nodeId);
return node != null && node.numberOfOwningShardsForIndex(indexMetadata.getIndex()) >= 1;
}

@Override
public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (replacementFromSourceToTarget(allocation, shardRouting.currentNodeId(), node.node().getName())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class NodeShutdownAllocationDecider extends AllocationDecider {
*/
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return getDecision(allocation, node.nodeId());
return getDecision(allocation, node.nodeId(), false);
}

/**
Expand All @@ -51,10 +51,10 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
*/
@Override
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
return getDecision(allocation, node.getId());
return getDecision(allocation, node.getId(), true);
}

private static Decision getDecision(RoutingAllocation allocation, String nodeId) {
private static Decision getDecision(RoutingAllocation allocation, String nodeId, boolean canAllocateBeforeReplacementIsReady) {
final var nodeShutdowns = allocation.metadata().nodeShutdowns().getAll();
if (nodeShutdowns.isEmpty()) {
return YES_EMPTY_SHUTDOWN_METADATA;
Expand All @@ -66,12 +66,16 @@ private static Decision getDecision(RoutingAllocation allocation, String nodeId)
}

return switch (thisNodeShutdownMetadata.getType()) {
case REPLACE, REMOVE, SIGTERM -> allocation.decision(
Decision.NO,
NAME,
"node [%s] is preparing to be removed from the cluster",
nodeId
);
case REMOVE, SIGTERM -> allocation.decision(Decision.NO, NAME, "node [%s] is preparing to be removed from the cluster", nodeId);
case REPLACE -> canAllocateBeforeReplacementIsReady
&& allocation.getClusterState().getNodes().hasByName(thisNodeShutdownMetadata.getTargetNodeName()) == false
? allocation.decision(
Decision.YES,
NAME,
"node [%s] is preparing to be removed from the cluster, but replacement is not yet present",
nodeId
)
: allocation.decision(Decision.NO, NAME, "node [%s] is preparing to be removed from the cluster", nodeId);
case RESTART -> allocation.decision(
Decision.YES,
NAME,
Expand Down
Loading