Skip to content

Commit d1281d2

Browse files
imotovkimchy
authored andcommitted
Add index.routing.allocation.require.... and cluster.routing.allocation.require.... settings
Fixes elastic#2404
1 parent ea2732a commit d1281d2

File tree

5 files changed

+229
-37
lines changed

5 files changed

+229
-37
lines changed

src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.io.IOException;
4848
import java.util.*;
4949

50+
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.*;
5051
import static org.elasticsearch.common.settings.ImmutableSettings.*;
5152

5253
/**
@@ -197,6 +198,7 @@ public static State fromString(String state) {
197198

198199
private transient final int totalNumberOfShards;
199200

201+
private final DiscoveryNodeFilters requireFilters;
200202
private final DiscoveryNodeFilters includeFilters;
201203
private final DiscoveryNodeFilters excludeFilters;
202204

@@ -213,17 +215,23 @@ private IndexMetaData(String index, long version, State state, Settings settings
213215

214216
this.aliases = aliases;
215217

218+
ImmutableMap<String, String> requireMap = settings.getByPrefix("index.routing.allocation.require.").getAsMap();
219+
if (requireMap.isEmpty()) {
220+
requireFilters = null;
221+
} else {
222+
requireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap);
223+
}
216224
ImmutableMap<String, String> includeMap = settings.getByPrefix("index.routing.allocation.include.").getAsMap();
217225
if (includeMap.isEmpty()) {
218226
includeFilters = null;
219227
} else {
220-
includeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap);
228+
includeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap);
221229
}
222230
ImmutableMap<String, String> excludeMap = settings.getByPrefix("index.routing.allocation.exclude.").getAsMap();
223231
if (excludeMap.isEmpty()) {
224232
excludeFilters = null;
225233
} else {
226-
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap);
234+
excludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
227235
}
228236
}
229237

@@ -332,6 +340,11 @@ public <T extends Custom> T custom(String type) {
332340
return (T) customs.get(type);
333341
}
334342

343+
@Nullable
344+
public DiscoveryNodeFilters requireFilters() {
345+
return requireFilters;
346+
}
347+
335348
@Nullable
336349
public DiscoveryNodeFilters includeFilters() {
337350
return includeFilters;

src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeFilters.java

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.elasticsearch.cluster.node;
2121

22-
import com.google.common.collect.ImmutableMap;
2322
import org.elasticsearch.common.Strings;
2423
import org.elasticsearch.common.regex.Regex;
2524
import org.elasticsearch.common.settings.Settings;
@@ -32,13 +31,16 @@
3231
*/
3332
public class DiscoveryNodeFilters {
3433

35-
public static final DiscoveryNodeFilters NO_FILTERS = new DiscoveryNodeFilters(ImmutableMap.<String, String[]>of());
34+
public static enum OpType {
35+
AND,
36+
OR
37+
};
3638

37-
public static DiscoveryNodeFilters buildFromSettings(String prefix, Settings settings) {
38-
return buildFromKeyValue(settings.getByPrefix(prefix).getAsMap());
39+
public static DiscoveryNodeFilters buildFromSettings(OpType opType, String prefix, Settings settings) {
40+
return buildFromKeyValue(opType, settings.getByPrefix(prefix).getAsMap());
3941
}
4042

41-
public static DiscoveryNodeFilters buildFromKeyValue(Map<String, String> filters) {
43+
public static DiscoveryNodeFilters buildFromKeyValue(OpType opType, Map<String, String> filters) {
4244
Map<String, String[]> bFilters = new HashMap<String, String[]>();
4345
for (Map.Entry<String, String> entry : filters.entrySet()) {
4446
String[] values = Strings.splitStringByCommaToArray(entry.getValue());
@@ -47,76 +49,123 @@ public static DiscoveryNodeFilters buildFromKeyValue(Map<String, String> filters
4749
}
4850
}
4951
if (bFilters.isEmpty()) {
50-
return NO_FILTERS;
52+
return null;
5153
}
52-
return new DiscoveryNodeFilters(bFilters);
54+
return new DiscoveryNodeFilters(opType, bFilters);
5355
}
5456

5557
private final Map<String, String[]> filters;
5658

57-
DiscoveryNodeFilters(Map<String, String[]> filters) {
59+
private final OpType opType;
60+
61+
DiscoveryNodeFilters(OpType opType, Map<String, String[]> filters) {
62+
this.opType = opType;
5863
this.filters = filters;
5964
}
6065

6166
public boolean match(DiscoveryNode node) {
62-
if (filters.isEmpty()) {
63-
return true;
64-
}
6567
for (Map.Entry<String, String[]> entry : filters.entrySet()) {
6668
String attr = entry.getKey();
6769
String[] values = entry.getValue();
6870
if ("_ip".equals(attr)) {
6971
if (!(node.address() instanceof InetSocketTransportAddress)) {
70-
return false;
72+
if (opType == OpType.AND) {
73+
return false;
74+
} else {
75+
continue;
76+
}
7177
}
7278
InetSocketTransportAddress inetAddress = (InetSocketTransportAddress) node.address();
7379
for (String value : values) {
7480
if (Regex.simpleMatch(value, inetAddress.address().getAddress().getHostAddress())) {
75-
return true;
81+
if (opType == OpType.OR) {
82+
return true;
83+
}
84+
} else {
85+
if (opType == OpType.AND) {
86+
return false;
87+
}
7688
}
7789
}
78-
return false;
7990
} else if ("_host".equals(attr)) {
8091
if (!(node.address() instanceof InetSocketTransportAddress)) {
81-
return false;
92+
if (opType == OpType.AND) {
93+
return false;
94+
} else {
95+
continue;
96+
}
8297
}
8398
InetSocketTransportAddress inetAddress = (InetSocketTransportAddress) node.address();
8499
for (String value : values) {
85100
if (Regex.simpleMatch(value, inetAddress.address().getHostName())) {
86-
return true;
101+
if (opType == OpType.OR) {
102+
return true;
103+
}
104+
} else {
105+
if (opType == OpType.AND) {
106+
return false;
107+
}
87108
}
88109
if (Regex.simpleMatch(value, inetAddress.address().getAddress().getHostAddress())) {
89-
return true;
110+
if (opType == OpType.OR) {
111+
return true;
112+
}
113+
} else {
114+
if (opType == OpType.AND) {
115+
return false;
116+
}
90117
}
91118
}
92-
return false;
93119
} else if ("_id".equals(attr)) {
94120
for (String value : values) {
95121
if (node.id().equals(value)) {
96-
return true;
122+
if (opType == OpType.OR) {
123+
return true;
124+
}
125+
} else {
126+
if (opType == OpType.AND) {
127+
return false;
128+
}
97129
}
98130
}
99-
return false;
100131
} else if ("_name".equals(attr) || "name".equals(attr)) {
101132
for (String value : values) {
102133
if (Regex.simpleMatch(value, node.name())) {
103-
return true;
134+
if (opType == OpType.OR) {
135+
return true;
136+
}
137+
} else {
138+
if (opType == OpType.AND) {
139+
return false;
140+
}
104141
}
105142
}
106-
return false;
107143
} else {
108144
String nodeAttributeValue = node.attributes().get(attr);
109145
if (nodeAttributeValue == null) {
110-
return false;
146+
if (opType == OpType.AND) {
147+
return false;
148+
} else {
149+
continue;
150+
}
111151
}
112152
for (String value : values) {
113153
if (Regex.simpleMatch(value, nodeAttributeValue)) {
114-
return true;
154+
if (opType == OpType.OR) {
155+
return true;
156+
}
157+
} else {
158+
if (opType == OpType.AND) {
159+
return false;
160+
}
115161
}
116162
}
117-
return false;
118163
}
119164
}
120-
return true;
165+
if (opType == OpType.OR) {
166+
return false;
167+
} else {
168+
return true;
169+
}
121170
}
122171
}

src/main/java/org/elasticsearch/cluster/routing/allocation/decider/FilterAllocationDecider.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,38 +30,49 @@
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.node.settings.NodeSettingsService;
3232

33+
import static org.elasticsearch.cluster.node.DiscoveryNodeFilters.OpType.*;
34+
3335
/**
3436
*/
3537
public class FilterAllocationDecider extends AllocationDecider {
3638

3739
static {
3840
MetaData.addDynamicSettings(
41+
"cluster.routing.allocation.require.*",
3942
"cluster.routing.allocation.include.*",
4043
"cluster.routing.allocation.exclude.*"
4144
);
4245
IndexMetaData.addDynamicSettings(
46+
"index.routing.allocation.require.*",
4347
"index.routing.allocation.include.*",
4448
"index.routing.allocation.exclude.*"
4549
);
4650
}
4751

52+
private volatile DiscoveryNodeFilters clusterRequireFilters;
4853
private volatile DiscoveryNodeFilters clusterIncludeFilters;
4954
private volatile DiscoveryNodeFilters clusterExcludeFilters;
5055

5156
@Inject
5257
public FilterAllocationDecider(Settings settings, NodeSettingsService nodeSettingsService) {
5358
super(settings);
59+
ImmutableMap<String, String> requireMap = settings.getByPrefix("cluster.routing.allocation.require.").getAsMap();
60+
if (requireMap.isEmpty()) {
61+
clusterRequireFilters = null;
62+
} else {
63+
clusterRequireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap);
64+
}
5465
ImmutableMap<String, String> includeMap = settings.getByPrefix("cluster.routing.allocation.include.").getAsMap();
5566
if (includeMap.isEmpty()) {
5667
clusterIncludeFilters = null;
5768
} else {
58-
clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap);
69+
clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap);
5970
}
6071
ImmutableMap<String, String> excludeMap = settings.getByPrefix("cluster.routing.allocation.exclude.").getAsMap();
6172
if (excludeMap.isEmpty()) {
6273
clusterExcludeFilters = null;
6374
} else {
64-
clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap);
75+
clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
6576
}
6677
nodeSettingsService.addListener(new ApplySettings());
6778
}
@@ -77,6 +88,11 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
7788
}
7889

7990
private boolean shouldFilter(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
91+
if (clusterRequireFilters != null) {
92+
if (!clusterRequireFilters.match(node.node())) {
93+
return true;
94+
}
95+
}
8096
if (clusterIncludeFilters != null) {
8197
if (!clusterIncludeFilters.match(node.node())) {
8298
return true;
@@ -89,6 +105,11 @@ private boolean shouldFilter(ShardRouting shardRouting, RoutingNode node, Routin
89105
}
90106

91107
IndexMetaData indexMd = allocation.routingNodes().metaData().index(shardRouting.index());
108+
if (indexMd.requireFilters() != null) {
109+
if (!indexMd.requireFilters().match(node.node())) {
110+
return true;
111+
}
112+
}
92113
if (indexMd.includeFilters() != null) {
93114
if (!indexMd.includeFilters().match(node.node())) {
94115
return true;
@@ -106,13 +127,17 @@ private boolean shouldFilter(ShardRouting shardRouting, RoutingNode node, Routin
106127
class ApplySettings implements NodeSettingsService.Listener {
107128
@Override
108129
public void onRefreshSettings(Settings settings) {
130+
ImmutableMap<String, String> requireMap = settings.getByPrefix("cluster.routing.allocation.require.").getAsMap();
131+
if (!requireMap.isEmpty()) {
132+
clusterRequireFilters = DiscoveryNodeFilters.buildFromKeyValue(AND, requireMap);
133+
}
109134
ImmutableMap<String, String> includeMap = settings.getByPrefix("cluster.routing.allocation.include.").getAsMap();
110135
if (!includeMap.isEmpty()) {
111-
clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(includeMap);
136+
clusterIncludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, includeMap);
112137
}
113138
ImmutableMap<String, String> excludeMap = settings.getByPrefix("cluster.routing.allocation.exclude.").getAsMap();
114139
if (!excludeMap.isEmpty()) {
115-
clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(excludeMap);
140+
clusterExcludeFilters = DiscoveryNodeFilters.buildFromKeyValue(OR, excludeMap);
116141
}
117142
}
118143
}

src/test/java/org/elasticsearch/test/integration/cluster/allocation/FilteringAllocationTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,66 @@ public void testDecommissionNodeNoReplicas() throws Exception {
9393
client("node1").admin().indices().prepareRefresh().execute().actionGet();
9494
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(100l));
9595
}
96+
97+
@Test
98+
public void testDisablingAllocationFiltering() throws Exception {
99+
logger.info("--> starting 2 nodes");
100+
startNode("node1");
101+
startNode("node2");
102+
103+
logger.info("--> creating an index with no replicas");
104+
client("node1").admin().indices().prepareCreate("test")
105+
.setSettings(settingsBuilder().put("index.number_of_replicas", 0))
106+
.execute().actionGet();
107+
108+
ClusterHealthResponse clusterHealthResponse = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
109+
assertThat(clusterHealthResponse.timedOut(), equalTo(false));
110+
111+
logger.info("--> index some data");
112+
for (int i = 0; i < 100; i++) {
113+
client("node1").prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
114+
}
115+
client("node1").admin().indices().prepareRefresh().execute().actionGet();
116+
assertThat(client("node1").prepareCount().setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(100l));
117+
118+
logger.info("--> remove index from the first node");
119+
client("node1").admin().indices().prepareUpdateSettings("test")
120+
.setSettings(settingsBuilder().put("index.routing.allocation.exclude._name", "node1"))
121+
.execute().actionGet();
122+
123+
Thread.sleep(200);
124+
125+
clusterHealthResponse = client("node1").admin().cluster().prepareHealth()
126+
.setWaitForGreenStatus()
127+
.setWaitForRelocatingShards(0)
128+
.execute().actionGet();
129+
assertThat(clusterHealthResponse.timedOut(), equalTo(false));
130+
131+
logger.info("--> verify all shards are allocated on node2 now");
132+
ClusterState clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().state();
133+
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index("test");
134+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
135+
for (ShardRouting shardRouting : indexShardRoutingTable) {
136+
assertThat(clusterState.nodes().get(shardRouting.currentNodeId()).name(), equalTo("node2"));
137+
}
138+
}
139+
140+
logger.info("--> disable allocation filtering ");
141+
client("node1").admin().indices().prepareUpdateSettings("test")
142+
.setSettings(settingsBuilder().put("index.routing.allocation.exclude._name", ""))
143+
.execute().actionGet();
144+
145+
Thread.sleep(200);
146+
147+
clusterHealthResponse = client("node1").admin().cluster().prepareHealth()
148+
.setWaitForGreenStatus()
149+
.setWaitForRelocatingShards(0)
150+
.execute().actionGet();
151+
assertThat(clusterHealthResponse.timedOut(), equalTo(false));
152+
153+
logger.info("--> verify that there are shards allocated on both nodes now");
154+
clusterState = client("node1").admin().cluster().prepareState().execute().actionGet().state();
155+
assertThat(clusterState.routingTable().index("test").numberOfNodesShardsAreAllocatedOn(), equalTo(2));
156+
}
96157
}
158+

0 commit comments

Comments
 (0)