Skip to content

Commit 358b724

Browse files
authored
Deduplicate monitoring of balancer settings (#126752)
1 parent 46c3657 commit 358b724

File tree

10 files changed

+128
-102
lines changed

10 files changed

+128
-102
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.cluster.routing.allocation.NodeAllocationStatsAndWeightsCalculator;
3838
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
3939
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
40+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
4041
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
4142
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.DesiredBalanceReconcilerAction;
4243
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
@@ -144,13 +145,12 @@ public ClusterModule(
144145
this.clusterPlugins = clusterPlugins;
145146
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
146147
this.allocationDeciders = new AllocationDeciders(deciderList);
147-
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(
148-
writeLoadForecaster,
149-
clusterService.getClusterSettings()
150-
);
148+
final BalancerSettings balancerSettings = new BalancerSettings(clusterService.getClusterSettings());
149+
var nodeAllocationStatsAndWeightsCalculator = new NodeAllocationStatsAndWeightsCalculator(writeLoadForecaster, balancerSettings);
151150
this.shardsAllocator = createShardsAllocator(
152151
settings,
153152
clusterService.getClusterSettings(),
153+
balancerSettings,
154154
threadPool,
155155
clusterPlugins,
156156
clusterService,
@@ -438,6 +438,7 @@ private static void addAllocationDecider(Map<Class<?>, AllocationDecider> decide
438438
private static ShardsAllocator createShardsAllocator(
439439
Settings settings,
440440
ClusterSettings clusterSettings,
441+
BalancerSettings balancerSettings,
441442
ThreadPool threadPool,
442443
List<ClusterPlugin> clusterPlugins,
443444
ClusterService clusterService,
@@ -447,12 +448,12 @@ private static ShardsAllocator createShardsAllocator(
447448
NodeAllocationStatsAndWeightsCalculator nodeAllocationStatsAndWeightsCalculator
448449
) {
449450
Map<String, Supplier<ShardsAllocator>> allocators = new HashMap<>();
450-
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(clusterSettings, writeLoadForecaster));
451+
allocators.put(BALANCED_ALLOCATOR, () -> new BalancedShardsAllocator(balancerSettings, writeLoadForecaster));
451452
allocators.put(
452453
DESIRED_BALANCE_ALLOCATOR,
453454
() -> new DesiredBalanceShardsAllocator(
454455
clusterSettings,
455-
new BalancedShardsAllocator(clusterSettings, writeLoadForecaster),
456+
new BalancedShardsAllocator(balancerSettings, writeLoadForecaster),
456457
threadPool,
457458
clusterService,
458459
reconciler,

server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,9 @@
1515
import org.elasticsearch.cluster.routing.RoutingNode;
1616
import org.elasticsearch.cluster.routing.RoutingNodes;
1717
import org.elasticsearch.cluster.routing.ShardRouting;
18-
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
18+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
1919
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
2020
import org.elasticsearch.cluster.routing.allocation.allocator.WeightFunction;
21-
import org.elasticsearch.common.settings.ClusterSettings;
2221
import org.elasticsearch.common.util.Maps;
2322
import org.elasticsearch.core.Nullable;
2423

@@ -29,11 +28,7 @@
2928
*/
3029
public class NodeAllocationStatsAndWeightsCalculator {
3130
private final WriteLoadForecaster writeLoadForecaster;
32-
33-
private volatile float indexBalanceFactor;
34-
private volatile float shardBalanceFactor;
35-
private volatile float writeLoadBalanceFactor;
36-
private volatile float diskUsageBalanceFactor;
31+
private final BalancerSettings balancerSettings;
3732

3833
/**
3934
* Node shard allocation stats and the total node weight.
@@ -47,18 +42,9 @@ public record NodeAllocationStatsAndWeight(
4742
float currentNodeWeight
4843
) {}
4944

50-
public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, ClusterSettings clusterSettings) {
45+
public NodeAllocationStatsAndWeightsCalculator(WriteLoadForecaster writeLoadForecaster, BalancerSettings balancerSettings) {
5146
this.writeLoadForecaster = writeLoadForecaster;
52-
clusterSettings.initializeAndWatch(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
53-
clusterSettings.initializeAndWatch(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
54-
clusterSettings.initializeAndWatch(
55-
BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING,
56-
value -> this.writeLoadBalanceFactor = value
57-
);
58-
clusterSettings.initializeAndWatch(
59-
BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING,
60-
value -> this.diskUsageBalanceFactor = value
61-
);
47+
this.balancerSettings = balancerSettings;
6248
}
6349

6450
/**
@@ -74,7 +60,12 @@ public Map<String, NodeAllocationStatsAndWeight> nodesAllocationStatsAndWeights(
7460
// must not use licensed features when just starting up
7561
writeLoadForecaster.refreshLicense();
7662
}
77-
var weightFunction = new WeightFunction(shardBalanceFactor, indexBalanceFactor, writeLoadBalanceFactor, diskUsageBalanceFactor);
63+
var weightFunction = new WeightFunction(
64+
balancerSettings.getShardBalanceFactor(),
65+
balancerSettings.getIndexBalanceFactor(),
66+
balancerSettings.getWriteLoadBalanceFactor(),
67+
balancerSettings.getDiskUsageBalanceFactor()
68+
);
7869
var avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
7970
var avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
8071
var avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(clusterInfo, metadata, routingNodes);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java

Lines changed: 14 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
3636
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
3737
import org.elasticsearch.cluster.routing.allocation.decider.Decision.Type;
38-
import org.elasticsearch.common.settings.ClusterSettings;
3938
import org.elasticsearch.common.settings.Setting;
4039
import org.elasticsearch.common.settings.Setting.Property;
4140
import org.elasticsearch.common.settings.Settings;
@@ -60,7 +59,6 @@
6059
import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
6160
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
6261
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
63-
import static org.elasticsearch.common.settings.ClusterSettings.createBuiltInClusterSettings;
6462

6563
/**
6664
* The {@link BalancedShardsAllocator} allocates and balances shards on the cluster nodes using {@link WeightFunction}.
@@ -114,34 +112,20 @@ public class BalancedShardsAllocator implements ShardsAllocator {
114112
Property.NodeScope
115113
);
116114

117-
// TODO: deduplicate these fields, use the fields in NodeAllocationStatsAndWeightsCalculator instead.
118-
private volatile float indexBalanceFactor;
119-
private volatile float shardBalanceFactor;
120-
private volatile float writeLoadBalanceFactor;
121-
private volatile float diskUsageBalanceFactor;
122-
private volatile float threshold;
123-
115+
private final BalancerSettings balancerSettings;
124116
private final WriteLoadForecaster writeLoadForecaster;
125117

126118
public BalancedShardsAllocator() {
127119
this(Settings.EMPTY);
128120
}
129121

130122
public BalancedShardsAllocator(Settings settings) {
131-
this(createBuiltInClusterSettings(settings), WriteLoadForecaster.DEFAULT);
132-
}
133-
134-
public BalancedShardsAllocator(ClusterSettings clusterSettings) {
135-
this(clusterSettings, WriteLoadForecaster.DEFAULT);
123+
this(new BalancerSettings(settings), WriteLoadForecaster.DEFAULT);
136124
}
137125

138126
@Inject
139-
public BalancedShardsAllocator(ClusterSettings clusterSettings, WriteLoadForecaster writeLoadForecaster) {
140-
clusterSettings.initializeAndWatch(SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
141-
clusterSettings.initializeAndWatch(INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
142-
clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
143-
clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
144-
clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value);
127+
public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) {
128+
this.balancerSettings = balancerSettings;
145129
this.writeLoadForecaster = writeLoadForecaster;
146130
}
147131

@@ -159,12 +143,12 @@ public void allocate(RoutingAllocation allocation) {
159143
return;
160144
}
161145
final WeightFunction weightFunction = new WeightFunction(
162-
shardBalanceFactor,
163-
indexBalanceFactor,
164-
writeLoadBalanceFactor,
165-
diskUsageBalanceFactor
146+
balancerSettings.getShardBalanceFactor(),
147+
balancerSettings.getIndexBalanceFactor(),
148+
balancerSettings.getWriteLoadBalanceFactor(),
149+
balancerSettings.getDiskUsageBalanceFactor()
166150
);
167-
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold);
151+
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, balancerSettings.getThreshold());
168152
balancer.allocateUnassigned();
169153
balancer.moveShards();
170154
balancer.balance();
@@ -196,12 +180,12 @@ private void collectAndRecordNodeWeightStats(Balancer balancer, WeightFunction w
196180
@Override
197181
public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) {
198182
WeightFunction weightFunction = new WeightFunction(
199-
shardBalanceFactor,
200-
indexBalanceFactor,
201-
writeLoadBalanceFactor,
202-
diskUsageBalanceFactor
183+
balancerSettings.getShardBalanceFactor(),
184+
balancerSettings.getIndexBalanceFactor(),
185+
balancerSettings.getWriteLoadBalanceFactor(),
186+
balancerSettings.getDiskUsageBalanceFactor()
203187
);
204-
Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, threshold);
188+
Balancer balancer = new Balancer(writeLoadForecaster, allocation, weightFunction, balancerSettings.getThreshold());
205189
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN;
206190
MoveDecision moveDecision = MoveDecision.NOT_TAKEN;
207191
final ProjectIndex index = new ProjectIndex(allocation, shard);
@@ -244,27 +228,6 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) {
244228
}
245229
}
246230

247-
/**
248-
* Returns the currently configured delta threshold
249-
*/
250-
public float getThreshold() {
251-
return threshold;
252-
}
253-
254-
/**
255-
* Returns the index related weight factor.
256-
*/
257-
public float getIndexBalance() {
258-
return indexBalanceFactor;
259-
}
260-
261-
/**
262-
* Returns the shard related weight factor.
263-
*/
264-
public float getShardBalance() {
265-
return shardBalanceFactor;
266-
}
267-
268231
/**
269232
* A {@link Balancer}
270233
*/
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.common.settings.ClusterSettings;
13+
import org.elasticsearch.common.settings.Settings;
14+
15+
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.DISK_USAGE_BALANCE_FACTOR_SETTING;
16+
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING;
17+
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING;
18+
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.THRESHOLD_SETTING;
19+
import static org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WRITE_LOAD_BALANCE_FACTOR_SETTING;
20+
21+
public class BalancerSettings {
22+
public static final BalancerSettings DEFAULT = new BalancerSettings(ClusterSettings.createBuiltInClusterSettings());
23+
24+
private volatile float indexBalanceFactor;
25+
private volatile float shardBalanceFactor;
26+
private volatile float writeLoadBalanceFactor;
27+
private volatile float diskUsageBalanceFactor;
28+
private volatile float threshold;
29+
30+
public BalancerSettings(Settings settings) {
31+
this(ClusterSettings.createBuiltInClusterSettings(settings));
32+
}
33+
34+
public BalancerSettings(ClusterSettings clusterSettings) {
35+
clusterSettings.initializeAndWatch(SHARD_BALANCE_FACTOR_SETTING, value -> this.shardBalanceFactor = value);
36+
clusterSettings.initializeAndWatch(INDEX_BALANCE_FACTOR_SETTING, value -> this.indexBalanceFactor = value);
37+
clusterSettings.initializeAndWatch(WRITE_LOAD_BALANCE_FACTOR_SETTING, value -> this.writeLoadBalanceFactor = value);
38+
clusterSettings.initializeAndWatch(DISK_USAGE_BALANCE_FACTOR_SETTING, value -> this.diskUsageBalanceFactor = value);
39+
clusterSettings.initializeAndWatch(THRESHOLD_SETTING, value -> this.threshold = value);
40+
}
41+
42+
/**
43+
* Returns the index related weight factor.
44+
*/
45+
public float getIndexBalanceFactor() {
46+
return indexBalanceFactor;
47+
}
48+
49+
/**
50+
* Returns the shard related weight factor.
51+
*/
52+
public float getShardBalanceFactor() {
53+
return shardBalanceFactor;
54+
}
55+
56+
public float getWriteLoadBalanceFactor() {
57+
return writeLoadBalanceFactor;
58+
}
59+
60+
public float getDiskUsageBalanceFactor() {
61+
return diskUsageBalanceFactor;
62+
}
63+
64+
/**
65+
* Returns the currently configured delta threshold
66+
*/
67+
public float getThreshold() {
68+
return threshold;
69+
}
70+
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.cluster.routing.RoutingTable;
2222
import org.elasticsearch.cluster.routing.ShardRouting;
2323
import org.elasticsearch.cluster.routing.ShardRoutingState;
24+
import org.elasticsearch.cluster.routing.allocation.allocator.BalancerSettings;
2425
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalance;
2526
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator;
2627
import org.elasticsearch.cluster.routing.allocation.allocator.ShardAssignment;
@@ -84,7 +85,7 @@ public void testShardStats() {
8485
clusterService,
8586
() -> clusterInfo,
8687
createShardAllocator(),
87-
new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings())
88+
new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancerSettings.DEFAULT)
8889
);
8990
assertThat(
9091
service.stats(),
@@ -125,7 +126,7 @@ public void testRelocatingShardIsOnlyCountedOnceOnTargetNode() {
125126
clusterService,
126127
EmptyClusterInfoService.INSTANCE,
127128
createShardAllocator(),
128-
new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings())
129+
new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancerSettings.DEFAULT)
129130
);
130131
assertThat(
131132
service.stats(),
@@ -182,7 +183,7 @@ public DesiredBalance getDesiredBalance() {
182183
);
183184
}
184185
},
185-
new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, ClusterSettings.createBuiltInClusterSettings())
186+
new NodeAllocationStatsAndWeightsCalculator(TEST_WRITE_LOAD_FORECASTER, BalancerSettings.DEFAULT)
186187
);
187188
assertThat(
188189
service.stats(),

0 commit comments

Comments
 (0)