Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/95430.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95430
summary: SIGTERM node shutdown type
area: Infra/Node Lifecycle
type: enhancement
issues: []
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some transport version guards in the writeTo method of this class? We shouldn't ever be in a situation where we're in a mixed cluster with a version that doesn't understand sigterm but given how nasty serialization problems are, better safe than sorry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class SingleNodeShutdownMetadata implements SimpleDiffable<SingleNodeShutdownMetadata>, ToXContentObject {

public static final TransportVersion REPLACE_SHUTDOWN_TYPE_ADDED_VERSION = TransportVersion.V_7_16_0;
public static final TransportVersion SIGTERM_ADDED_VERSION = TransportVersion.V_8_9_0;

public static final ParseField NODE_ID_FIELD = new ParseField("node_id");
public static final ParseField TYPE_FIELD = new ParseField("type");
Expand Down Expand Up @@ -199,7 +200,8 @@ public TimeValue getAllocationDelay() {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(nodeId);
if (out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION) && this.type == SingleNodeShutdownMetadata.Type.REPLACE) {
if ((out.getTransportVersion().before(REPLACE_SHUTDOWN_TYPE_ADDED_VERSION) && this.type == SingleNodeShutdownMetadata.Type.REPLACE)
|| (out.getTransportVersion().before(SIGTERM_ADDED_VERSION) && this.type == Type.SIGTERM)) {
out.writeEnum(SingleNodeShutdownMetadata.Type.REMOVE);
} else {
out.writeEnum(type);
Expand Down Expand Up @@ -380,7 +382,8 @@ public SingleNodeShutdownMetadata build() {
public enum Type {
REMOVE,
RESTART,
REPLACE;
REPLACE,
SIGTERM; // locally-initiated version of REMOVE

public static Type parse(String type) {
if ("remove".equals(type.toLowerCase(Locale.ROOT))) {
Expand All @@ -389,6 +392,8 @@ public static Type parse(String type) {
return RESTART;
} else if ("replace".equals(type.toLowerCase(Locale.ROOT))) {
return REPLACE;
} else if ("sigterm".equals(type.toLowerCase(Locale.ROOT))) {
return SIGTERM;
} else {
throw new IllegalArgumentException("unknown shutdown type: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ private static Decision getDecision(RoutingAllocation allocation, String nodeId)
}

return switch (thisNodeShutdownMetadata.getType()) {
case REPLACE, REMOVE -> allocation.decision(Decision.NO, NAME, "node [%s] is preparing to be removed from the cluster", nodeId);
case REPLACE, REMOVE, SIGTERM -> allocation.decision(
Decision.NO,
NAME,
"node [%s] is preparing to be removed from the cluster",
nodeId
);
case RESTART -> allocation.decision(
Decision.YES,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -64,36 +66,59 @@ public void testRemoveShutdownMetadata() {
}

public void testIsNodeShuttingDown() {
NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(
Collections.singletonMap(
"this_node",
SingleNodeShutdownMetadata.builder()
.setNodeId("this_node")
.setReason("shutdown for a unit test")
.setType(randomBoolean() ? SingleNodeShutdownMetadata.Type.REMOVE : SingleNodeShutdownMetadata.Type.RESTART)
.setStartedAtMillis(randomNonNegativeLong())
.build()
)
);

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
nodes.add(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "this_node"));
nodes.localNodeId("this_node");
nodes.masterNodeId("this_node");

ClusterState state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build();

state = ClusterState.builder(state)
.metadata(Metadata.builder(state.metadata()).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata).build())
.nodes(
DiscoveryNodes.builder(state.nodes())
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.build()
)
.build();
for (SingleNodeShutdownMetadata.Type type : List.of(
SingleNodeShutdownMetadata.Type.RESTART,
SingleNodeShutdownMetadata.Type.REMOVE,
SingleNodeShutdownMetadata.Type.SIGTERM
)) {
NodesShutdownMetadata nodesShutdownMetadata = new NodesShutdownMetadata(
Collections.singletonMap(
"this_node",
SingleNodeShutdownMetadata.builder()
.setNodeId("this_node")
.setReason("shutdown for a unit test")
.setType(type)
.setStartedAtMillis(randomNonNegativeLong())
.build()
)
);

DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
nodes.add(DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), "this_node"));
nodes.localNodeId("this_node");
nodes.masterNodeId("this_node");

ClusterState state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build();

state = ClusterState.builder(state)
.metadata(Metadata.builder(state.metadata()).putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata).build())
.nodes(
DiscoveryNodes.builder(state.nodes())
.add(new DiscoveryNode("_node_1", buildNewFakeTransportAddress(), Version.CURRENT))
.build()
)
.build();

assertThat(NodesShutdownMetadata.isNodeShuttingDown(state, "this_node"), equalTo(true));
assertThat(NodesShutdownMetadata.isNodeShuttingDown(state, "_node_1"), equalTo(false));
}
}

assertThat(NodesShutdownMetadata.isNodeShuttingDown(state, "this_node"), equalTo(true));
assertThat(NodesShutdownMetadata.isNodeShuttingDown(state, "_node_1"), equalTo(false));
public void testSigtermIsRemoveInOlderVersions() throws IOException {
SingleNodeShutdownMetadata metadata = SingleNodeShutdownMetadata.builder()
.setNodeId("myid")
.setType(SingleNodeShutdownMetadata.Type.SIGTERM)
.setReason("myReason")
.setStartedAtMillis(0L)
.build();
BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(TransportVersion.V_8_7_1);
metadata.writeTo(out);
assertThat(new SingleNodeShutdownMetadata(out.bytes().streamInput()).getType(), equalTo(SingleNodeShutdownMetadata.Type.REMOVE));

out = new BytesStreamOutput();
metadata.writeTo(out);
assertThat(new SingleNodeShutdownMetadata(out.bytes().streamInput()).getType(), equalTo(SingleNodeShutdownMetadata.Type.SIGTERM));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,17 +646,22 @@ public void testRemainingDelayCalculationsWithUnrelatedShutdowns() throws Except
* Verifies that delay calculation is not impacted when the node the shard was last assigned to was registered for removal.
*/
public void testRemainingDelayCalculationWhenNodeIsShuttingDownForRemoval() throws Exception {
String lastNodeId = "bogusNodeId";
Map<String, SingleNodeShutdownMetadata> shutdowns = new HashMap<>();
SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder()
.setNodeId(lastNodeId)
.setReason(this.getTestName())
.setStartedAtMillis(randomNonNegativeLong())
.setType(SingleNodeShutdownMetadata.Type.REMOVE)
.build();
shutdowns.put(shutdown.getNodeId(), shutdown);
for (SingleNodeShutdownMetadata.Type type : List.of(
SingleNodeShutdownMetadata.Type.REMOVE,
SingleNodeShutdownMetadata.Type.SIGTERM
)) {
String lastNodeId = "bogusNodeId";
Map<String, SingleNodeShutdownMetadata> shutdowns = new HashMap<>();
SingleNodeShutdownMetadata shutdown = SingleNodeShutdownMetadata.builder()
.setNodeId(lastNodeId)
.setReason(this.getTestName())
.setStartedAtMillis(randomNonNegativeLong())
.setType(type)
.build();
shutdowns.put(shutdown.getNodeId(), shutdown);

checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueNanos(10), false);
checkRemainingDelayCalculation(lastNodeId, TimeValue.timeValueNanos(10), shutdowns, TimeValue.timeValueNanos(10), false);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
import static org.hamcrest.Matchers.equalTo;
Expand Down Expand Up @@ -78,6 +79,12 @@ public class NodeShutdownAllocationDeciderTests extends ESAllocationTestCase {
)
.build();

private static final List<SingleNodeShutdownMetadata.Type> REMOVE_SHUTDOWN_TYPES = List.of(
SingleNodeShutdownMetadata.Type.REPLACE,
SingleNodeShutdownMetadata.Type.REMOVE,
SingleNodeShutdownMetadata.Type.SIGTERM
);

public void testCanAllocateShardsToRestartingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()),
Expand All @@ -96,17 +103,16 @@ public void testCanAllocateShardsToRestartingNode() {
}

public void testCannotAllocateShardsToRemovingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()),
randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
RoutingNode routingNode = RoutingNodesHelper.routingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canAllocate(shard, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster"));
for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()), type);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
RoutingNode routingNode = RoutingNodesHelper.routingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canAllocate(shard, routingNode, allocation);
assertThat(type.toString(), decision.type(), equalTo(Decision.Type.NO));
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster"));
}
}

public void testShardsCanRemainOnRestartingNode() {
Expand All @@ -127,17 +133,20 @@ public void testShardsCanRemainOnRestartingNode() {
}

public void testShardsCannotRemainOnRemovingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()),
randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
RoutingNode routingNode = RoutingNodesHelper.routingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canRemain(null, shard, routingNode, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster"));
for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()), type);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
RoutingNode routingNode = RoutingNodesHelper.routingNode(DATA_NODE.getId(), DATA_NODE, shard);
allocation.debugDecision(true);

Decision decision = decider.canRemain(null, shard, routingNode, allocation);
assertThat(type.toString(), decision.type(), equalTo(Decision.Type.NO));
assertThat(
type.toString(),
decision.getExplanation(),
equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster")
);
}
}

public void testCanAutoExpandToRestartingNode() {
Expand Down Expand Up @@ -168,31 +177,32 @@ public void testCanAutoExpandToNodeIfNoNodesShuttingDown() {
}

public void testCanAutoExpandToNodeThatIsNotShuttingDown() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()),
randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE),
"other-node-id"
);

RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
allocation.debugDecision(true);

Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
assertThat(decision.getExplanation(), equalTo("this node is not shutting down"));
for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()),
type,
"other-node-id"
);

RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
allocation.debugDecision(true);

Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
assertThat(type.toString(), decision.type(), equalTo(Decision.Type.YES));
assertThat(type.toString(), decision.getExplanation(), equalTo("this node is not shutting down"));
}
}

public void testCannotAutoExpandToRemovingNode() {
ClusterState state = prepareState(
service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()),
randomFrom(SingleNodeShutdownMetadata.Type.REMOVE, SingleNodeShutdownMetadata.Type.REPLACE)
);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
allocation.debugDecision(true);

Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster"));
for (SingleNodeShutdownMetadata.Type type : REMOVE_SHUTDOWN_TYPES) {
ClusterState state = prepareState(service.reroute(ClusterState.EMPTY_STATE, "initial state", ActionListener.noop()), type);
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
allocation.debugDecision(true);

Decision decision = decider.shouldAutoExpandToNode(indexMetadata, DATA_NODE, allocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
assertThat(decision.getExplanation(), equalTo("node [" + DATA_NODE.getId() + "] is preparing to be removed from the cluster"));
}
}

private ClusterState prepareState(ClusterState initialState, SingleNodeShutdownMetadata.Type shutdownType) {
Expand Down
Loading