Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
f36bfd5
Different approach at allowing separate weights per tier
nicktindall Apr 2, 2025
45f8c44
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 2, 2025
b40f3a4
Compiling without errors
nicktindall Apr 3, 2025
91b2072
Minimised
nicktindall Apr 3, 2025
4807773
Hack in tiered partitions for serverless
nicktindall Apr 3, 2025
05a27b6
Merge branch 'main' into separate_weights_per_tier_v2
nicktindall Apr 4, 2025
539499f
Register settings
nicktindall Apr 4, 2025
2db4d2e
Use the same PartitionedClusterFactory for NodeAllocationStatsAndWeig…
nicktindall Apr 4, 2025
8bc0aba
Use the same PartitionedClusterFactory for NodeAllocationStatsAndWeig…
nicktindall Apr 4, 2025
6e7d2e0
Move balancer settings testing to use BalancerSettings
nicktindall Apr 4, 2025
453e819
Tidy
nicktindall Apr 4, 2025
daff958
Add assertion around tiering
nicktindall Apr 4, 2025
5fbf544
Handle case when there are no search nodes
nicktindall Apr 4, 2025
f8f579c
Handle zero nodes in NodeSorter
nicktindall Apr 8, 2025
afea2ef
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 8, 2025
2f87308
Update docs/changelog/126091.yaml
nicktindall Apr 8, 2025
321ce06
Rename PartitionedCluster(Factory), add javadoc
nicktindall Apr 14, 2025
3557b54
[CI] Auto commit changes from spotless
Apr 14, 2025
f186c56
TieredBalancingWeightsFactory -> StatelessBalancingWeightsFactory
nicktindall Apr 14, 2025
0679990
Allow balancer weights factory to be provided by ClusterPlugin
nicktindall Apr 14, 2025
0dbd4ca
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 14, 2025
49429f1
Move stateless parts to a separate PR
nicktindall Apr 14, 2025
6952169
Fix changelog message
nicktindall Apr 14, 2025
2d1ec90
Make ModelNode and NodeSorter public
nicktindall Apr 14, 2025
9e7f13d
Merge branch 'main' into separate_weights_per_tier_v2
nicktindall Apr 15, 2025
99c9144
Merge branch 'main' into separate_weights_per_tier_v2
nicktindall Apr 15, 2025
bfdc214
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 17, 2025
b9d0241
PartitionedNodeSorter -> NodeSorters
nicktindall Apr 17, 2025
218901f
Add tests for weights-by-partition
nicktindall Apr 17, 2025
da8a69e
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 17, 2025
e50bd1b
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 22, 2025
3361bbe
Make NodeSorters iterable
nicktindall Apr 22, 2025
b9403bb
Javadoc on NodeSorter
nicktindall Apr 22, 2025
da14071
Merge remote-tracking branch 'origin/main' into separate_weights_per_…
nicktindall Apr 25, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/126091.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 126091
summary: Allow balancing weights to be set per tier
area: Allocation
type: enhancement
issues: []
37 changes: 34 additions & 3 deletions server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
import org.elasticsearch.cluster.routing.allocation.allocator.GlobalBalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
Expand Down Expand Up @@ -146,11 +148,20 @@ public ClusterModule(
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings());
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(writeLoadForecaster, balancerSettings);
final BalancingWeightsFactory balancingWeightsFactory = getBalancingWeightsFactory(
clusterPlugins,
balancerSettings,
clusterService.getClusterSettings()
);
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(
writeLoadForecaster,
balancingWeightsFactory
);
this.shardsAllocator = createShardsAllocator(
settings,
clusterService.getClusterSettings(),
balancerSettings,
balancingWeightsFactory,
threadPool,
clusterPlugins,
clusterService,
Expand Down Expand Up @@ -203,6 +214,22 @@ public ShardRouting.Role newEmptyRole(int copyIndex) {
};
}

static BalancingWeightsFactory getBalancingWeightsFactory(
List<ClusterPlugin> clusterPlugins,
BalancerSettings balancerSettings,
ClusterSettings clusterSettings
) {
final var strategies = clusterPlugins.stream()
.map(pl -> pl.getBalancingWeightsFactory(balancerSettings, clusterSettings))
.filter(Objects::nonNull)
.toList();
return switch (strategies.size()) {
case 0 -> new GlobalBalancingWeightsFactory(balancerSettings);
case 1 -> strategies.getFirst();
default -> throw new IllegalArgumentException("multiple plugins define balancing weights factories, which is not permitted");
};
}

private ClusterState reconcile(ClusterState clusterState, RerouteStrategy rerouteStrategy) {
return allocationService.executeWithRoutingAllocation(clusterState, "reconcile-desired-balance", rerouteStrategy);
}
Expand Down Expand Up @@ -439,6 +466,7 @@ private static ShardsAllocator createShardsAllocator(
Settings settings,
ClusterSettings clusterSettings,
BalancerSettings balancerSettings,
BalancingWeightsFactory balancingWeightsFactory,
ThreadPool threadPool,
List<ClusterPlugin> clusterPlugins,
ClusterService clusterService,
Expand All @@ -448,12 +476,15 @@ private static ShardsAllocator createShardsAllocator(
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
) {
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster));
allocators.put(
BALANCED_ALLOCATOR,
() -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory)
);
allocators.put(
DESIRED_BALANCE_ALLOCATOR,
() -> new DesiredBalanceShardsAllocator(
clusterSettings,
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster),
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster, balancingWeightsFactory),
threadPool,
clusterService,
reconciler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeights;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancingWeightsFactory;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
import org.elasticsearch.common.util.Maps;
Expand All @@ -28,7 +29,7 @@
*/
public class NodeAllocationStatsAndWeightsCalculator {
private final WriteLoadForecaster writeLoadForecaster;
private final BalancerSettings balancerSettings;
private final BalancingWeightsFactory balancingWeightsFactory;

/**
* Node shard allocation stats and the total node weight.
Expand All @@ -42,9 +43,12 @@ public record NodeAllocationStatsAndWeight(
float currentNodeWeight
) {}

public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, BalancerSettings balancerSettings) {
public NodeAllocationStatsAndWeightsCalculator(
WriteLoadForecaster writeLoadForecaster,
BalancingWeightsFactory balancingWeightsFactory
) {
this.writeLoadForecaster = writeLoadForecaster;
this.balancerSettings = balancerSettings;
this.balancingWeightsFactory = balancingWeightsFactory;
}

/**
Expand All @@ -60,18 +64,14 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
// must not use licensed features when just starting up
writeLoadForecaster.refreshLicense();
}
var weightFunction = new WeightFunction(
balancerSettings.getShardBalanceFactor(),
balancerSettings.getIndexBalanceFactor(),
balancerSettings.getWriteLoadBalanceFactor(),
balancerSettings.getDiskUsageBalanceFactor()
);
final BalancingWeights balancingWeights = balancingWeightsFactory.create();
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);

var nodeAllocationStatsAndWeights = Maps.<String, NodeAllocationStatsAndWeight>newMapWithExpectedSize(routingNodes.size());
for (RoutingNode node : routingNodes) {
WeightFunction weightFunction = balancingWeights.weightFunctionForNode(node);
int shards = 0;
int undesiredShards = 0;
double forecastedWriteLoad = 0.0;
Expand Down
Loading
Loading