Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ org.elasticsearch.cluster.ClusterState#compatibilityVersions()

@defaultMessage ClusterFeatures#nodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#nodeFeatures()
@defaultMessage ClusterFeatures#allNodeFeatures is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#allNodeFeatures()
@defaultMessage ClusterFeatures#clusterHasFeature is for internal use only. Use FeatureService#clusterHasFeature to determine if a feature is present on the cluster.
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.features.NodeFeature)
org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cluster.node.DiscoveryNodes, org.elasticsearch.features.NodeFeature, org.elasticsearch.common.settings.Settings)

@defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/118143.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118143
summary: Infrastructure for assuming cluster features in the next major version
area: "Infra/Core"
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public void setupServices() {
client,
clusterService,
errorStore,
new FeatureService(List.of(new DataStreamFeatures()))
new FeatureService(Settings.EMPTY, List.of(new DataStreamFeatures()))
),
globalRetentionSettings
);
Expand Down Expand Up @@ -1468,7 +1468,7 @@ public void testTrackingTimeStats() {
getTransportRequestsRecordingClient(),
clusterService,
errorStore,
new FeatureService(List.of(new DataStreamFeatures()))
new FeatureService(Settings.EMPTY, List.of(new DataStreamFeatures()))
),
globalRetentionSettings
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setupServices() {
client,
clusterService,
errorStore,
new FeatureService(List.of(new DataStreamFeatures()))
new FeatureService(Settings.EMPTY, List.of(new DataStreamFeatures()))
);
}

Expand Down
71 changes: 60 additions & 11 deletions server/src/main/java/org/elasticsearch/cluster/ClusterFeatures.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@

package org.elasticsearch.cluster;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.xcontent.ToXContent;

Expand Down Expand Up @@ -79,28 +82,74 @@ public Map<String, Set<String>> nodeFeatures() {
return nodeFeatures;
}

/**
* The features in all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
public Set<String> allNodeFeatures() {
private Set<String> allNodeFeatures() {
if (allNodeFeatures == null) {
allNodeFeatures = Set.copyOf(calculateAllNodeFeatures(nodeFeatures.values()));
}
return allNodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
* <p>
* This is true if it is of the next major version, or it is running on serverless.<br/>
* The next major version can be assumed because the next major version can only ever
* talk to the highest minor of the previous major, so any features added before that
* will always exist on the cluster.<br/>
* It can be assumed for serverless because we never go backwards on serverless,
* so once a feature is in the serverless environment it will always be there.
*/
public static boolean featuresCanBeAssumedForNode(DiscoveryNode node, Settings settings) {
return DiscoveryNode.isStateless(settings) || node.getVersion().major == Version.CURRENT.major + 1;
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
* <p>
* This is true if it is of the next major version, or it is running on serverless.<br/>
* The next major version can be assumed because the next major version can only ever
* talk to the highest minor of the previous major, so any features added before that point
* will always exist on the cluster.<br/>
* It can be assumed for serverless because we never go backwards on serverless,
* so once a feature is in the serverless environment it will always be there.
*/
public static boolean featuresCanBeAssumedForNode(DiscoveryNodes nodes, Settings settings) {
int nextMajor = Version.CURRENT.major + 1;
return DiscoveryNode.isStateless(settings) || nodes.getAllNodes().stream().anyMatch(n -> n.getVersion().major == nextMajor);
}

/**
* {@code true} if {@code feature} is present on all nodes in the cluster.
* <p>
* NOTE: This should not be used directly.
* Please use {@link org.elasticsearch.features.FeatureService#clusterHasFeature} instead.
*/
@SuppressForbidden(reason = "directly reading cluster features")
public boolean clusterHasFeature(NodeFeature feature) {
return allNodeFeatures().contains(feature.id());
public boolean clusterHasFeature(DiscoveryNodes nodes, NodeFeature feature, Settings settings) {
assert nodes.getNodes().keySet().equals(nodeFeatures.keySet())
: "Cluster features nodes " + nodeFeatures.keySet() + " is different to discovery nodes " + nodes.getNodes().keySet();

// basic case
boolean allNodesHaveFeature = allNodeFeatures().contains(feature.id());
if (allNodesHaveFeature) {
return true;
}

// if the feature is assumed, check the major versions more closely
// it's actually ok if the feature is assumed, and all nodes missing the feature can assume it
// TODO: do we need some kind of transient cache of this calculation?
if (feature.assumedInNextMajor()) {
for (var nf : nodeFeatures.entrySet()) {
if (nf.getValue().contains(feature.id()) == false
&& featuresCanBeAssumedForNode(nodes.getNodes().get(nf.getKey()), settings) == false) {
return false;
}
}

// all nodes missing the feature can assume it - so that's alright then
return true;
}

return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.features.NodeFeature;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
Expand All @@ -39,6 +40,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -138,7 +140,7 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(newState.nodes());
Map<String, CompatibilityVersions> compatibilityVersionsMap = new HashMap<>(newState.compatibilityVersions());
Map<String, Set<String>> nodeFeatures = new HashMap<>(newState.nodeFeatures());
Set<String> allNodesFeatures = ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
Set<String> allAssumedNodesFeatures = calculateAllAssumedClusterFeatures(newState.nodes(), nodeFeatures);

assert nodesBuilder.isLocalNodeElectedMaster();

Expand Down Expand Up @@ -174,14 +176,17 @@ public ClusterState execute(BatchExecutionContext<JoinTask> batchExecutionContex
}
blockForbiddenVersions(compatibilityVersions.transportVersion());
ensureNodesCompatibility(node.getVersion(), minClusterNodeVersion, maxClusterNodeVersion);
enforceNodeFeatureBarrier(node.getId(), allNodesFeatures, features);
Set<String> assumedNewNodeFeatures = enforceNodeFeatureBarrier(node, allAssumedNodesFeatures, features);
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
// we have to reject nodes that don't support all indices we have in this cluster
ensureIndexCompatibility(node.getMinIndexVersion(), node.getMaxIndexVersion(), initialState.getMetadata());

nodesBuilder.add(node);
compatibilityVersionsMap.put(node.getId(), compatibilityVersions);
// store the actual node features here, not including assumed features, as this is persisted in cluster state
nodeFeatures.put(node.getId(), features);
allNodesFeatures.retainAll(features);

allAssumedNodesFeatures.retainAll(assumedNewNodeFeatures);
nodesChanged = true;
minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion());
maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion());
Expand Down Expand Up @@ -355,6 +360,30 @@ private static void blockForbiddenVersions(TransportVersion joiningTransportVers
}
}

private Set<String> calculateAllAssumedClusterFeatures(DiscoveryNodes nodes, Map<String, Set<String>> nodeFeatures) {
if (featureService.featuresCanBeAssumedForNodes(nodes)) {
Set<String> assumedFeatures = featureService.getNodeFeatures()
.values()
.stream()
.filter(NodeFeature::assumedInNextMajor)
.map(NodeFeature::id)
.collect(Collectors.toSet());

// add all assumed features to the featureset of all nodes of the next major version
nodeFeatures = new HashMap<>(nodeFeatures);
for (var node : nodes.getNodes().entrySet()) {
if (featureService.featuresCanBeAssumedForNode(node.getValue())) {
nodeFeatures.compute(node.getKey(), (k, v) -> {
var newFeatures = new HashSet<>(v);
return newFeatures.addAll(assumedFeatures) ? newFeatures : v;
});
}
}
}

return ClusterFeatures.calculateAllNodeFeatures(nodeFeatures.values());
}

/**
* Ensures that all indices are compatible with the given index version. This will ensure that all indices in the given metadata
* will not be created with a newer version of elasticsearch as well as that all indices are newer or equal to the minimum index
Expand Down Expand Up @@ -461,13 +490,44 @@ public static void ensureVersionBarrier(Version joiningNodeVersion, Version minC
}
}

private void enforceNodeFeatureBarrier(String nodeId, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
/**
* Enforces the feature join barrier - a joining node should have all features already present in all existing nodes in the cluster
*
* @return The set of features that this node has (including assumed features)
*/
private Set<String> enforceNodeFeatureBarrier(DiscoveryNode node, Set<String> existingNodesFeatures, Set<String> newNodeFeatures) {
// prevent join if it does not have one or more features that all other nodes have
Set<String> missingFeatures = new HashSet<>(existingNodesFeatures);
missingFeatures.removeAll(newNodeFeatures);

if (missingFeatures.isEmpty() == false) {
throw new IllegalStateException("Node " + nodeId + " is missing required features " + missingFeatures);
if (missingFeatures.isEmpty()) {
// nothing missing - all ok
return newNodeFeatures;
}

if (featureService.featuresCanBeAssumedForNode(node)) {
// it might still be ok for this node to join if this node can have assumed features,
// and all the missing features are assumed
// we can get the NodeFeature object direct from this node's registered features
// as all existing nodes in the cluster have the features present in existingNodesFeatures, including this one
newNodeFeatures = new HashSet<>(newNodeFeatures);
for (Iterator<String> it = missingFeatures.iterator(); it.hasNext();) {
String feature = it.next();
NodeFeature nf = featureService.getNodeFeatures().get(feature);
if (nf.assumedInNextMajor()) {
// its ok for this feature to be missing from this node
it.remove();
// and it should be assumed to still be in the cluster
newNodeFeatures.add(feature);
}
// even if we don't remove it, still continue, so the set in the exception message below is accurate
}
}

if (missingFeatures.isEmpty()) {
return newNodeFeatures;
} else {
throw new IllegalStateException("Node " + node.getId() + " is missing required features " + missingFeatures);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@

package org.elasticsearch.features;

import org.elasticsearch.cluster.ClusterFeatures;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
Expand All @@ -32,15 +36,15 @@ public class FeatureService {
private static final Logger logger = LogManager.getLogger(FeatureService.class);

private final Map<String, NodeFeature> nodeFeatures;
private final Settings settings;

/**
* Creates a new {@code FeatureService}, reporting all the features declared in {@code specs}
* as the local node's supported feature set
*/
public FeatureService(List<? extends FeatureSpecification> specs) {

var featureData = FeatureData.createFromSpecifications(specs);
nodeFeatures = featureData.getNodeFeatures();
public FeatureService(Settings settings, List<? extends FeatureSpecification> specs) {
this.nodeFeatures = FeatureData.createFromSpecifications(specs).getNodeFeatures();
this.settings = settings;

logger.info("Registered local node features {}", nodeFeatures.keySet().stream().sorted().toList());
}
Expand All @@ -53,11 +57,25 @@ public Map<String, NodeFeature> getNodeFeatures() {
return nodeFeatures;
}

/**
* Returns {@code true} if {@code node} can have assumed features.
*/
public boolean featuresCanBeAssumedForNode(DiscoveryNode node) {
return ClusterFeatures.featuresCanBeAssumedForNode(node, settings);
}

/**
* Returns {@code true} if one or more nodes in {@code nodes} can have assumed features.
*/
public boolean featuresCanBeAssumedForNodes(DiscoveryNodes nodes) {
return ClusterFeatures.featuresCanBeAssumedForNode(nodes, settings);
}

/**
* Returns {@code true} if all nodes in {@code state} support feature {@code feature}.
*/
@SuppressForbidden(reason = "We need basic feature information from cluster state")
public boolean clusterHasFeature(ClusterState state, NodeFeature feature) {
return state.clusterFeatures().clusterHasFeature(feature);
return state.clusterFeatures().clusterHasFeature(state.nodes(), feature, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@
* A feature published by a node.
*
* @param id The feature id. Must be unique in the node.
* @param assumedInNextMajor
* {@code true} if this feature is removed in the next major version,
* and so should be assumed to be true for all nodes of the next major version,
* or on stateless clusters.
*/
public record NodeFeature(String id) {
public record NodeFeature(String id, boolean assumedInNextMajor) {

public NodeFeature {
Objects.requireNonNull(id);
}

public NodeFeature(String id) {
this(id, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,7 @@ private void construct(

final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry);

FeatureService featureService = new FeatureService(pluginsService.loadServiceProviders(FeatureSpecification.class));
FeatureService featureService = new FeatureService(settings, pluginsService.loadServiceProviders(FeatureSpecification.class));

if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,9 @@ protected boolean areFileSettingsApplied(ClusterState clusterState) {
}

@SuppressForbidden(reason = "need to check file settings support on exact cluster state")
private static boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures().clusterHasFeature(FileSettingsFeatures.FILE_SETTINGS_SUPPORTED);
private boolean supportsFileSettings(ClusterState clusterState) {
return clusterState.clusterFeatures()
.clusterHasFeature(clusterState.nodes(), FileSettingsFeatures.FILE_SETTINGS_SUPPORTED, environment.settings());
}

private void setReady(boolean ready) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public class TransportRolloverActionTests extends ESTestCase {
final DataStreamAutoShardingService dataStreamAutoShardingService = new DataStreamAutoShardingService(
Settings.EMPTY,
mockClusterService,
new FeatureService(List.of()),
new FeatureService(Settings.EMPTY, List.of()),
System::currentTimeMillis
);

Expand Down
Loading