Skip to content

Commit be00437

Browse files
committed
Changed expectedOperationsPerItem from AtomicInteger array to AtomicReferenceArray<AtomicInteger>.
Made node stopping / starting less aggressive in RecoveryPercolatorTests.
1 parent c9c8501 commit be00437

File tree

2 files changed

+37
-31
lines changed

2 files changed

+37
-31
lines changed

src/main/java/org/elasticsearch/action/percolate/TransportMultiPercolateAction.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,13 @@ public void onFailure(Throwable e) {
137137
private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final AtomicReferenceArray<Object> percolateRequests,
138138
final ActionListener<MultiPercolateResponse> listener, ClusterState clusterState) {
139139

140-
final AtomicInteger[] expectedOperationsPerItem = new AtomicInteger[percolateRequests.length()];
140+
final AtomicReferenceArray<AtomicInteger> expectedOperationsPerItem = new AtomicReferenceArray<AtomicInteger>(percolateRequests.length());
141141
final AtomicReferenceArray<AtomicReferenceArray> responsesByItemAndShard = new AtomicReferenceArray<AtomicReferenceArray>(multiPercolateRequest.requests().size());
142142
final AtomicArray<Object> reducedResponses = new AtomicArray<Object>(percolateRequests.length());
143143

144-
// Keep track what slots belong to what shard, in case a request to a shard fails on all copies
145-
final ConcurrentMap<ShardId, AtomicIntegerArray> shardToSlots = ConcurrentCollections.newConcurrentMap();
146-
147144
// Resolving concrete indices and routing and grouping the requests by shard
148145
Map<ShardId, TransportShardMultiPercolateAction.Request> requestsByShard = new HashMap<ShardId, TransportShardMultiPercolateAction.Request>();
146+
// Keep track what slots belong to what shard, in case a request to a shard fails on all copies
149147
Map<ShardId, TIntArrayList> shardToSlotsBuilder = new HashMap<ShardId, TIntArrayList>();
150148
int expectedResults = 0;
151149
for (int i = 0; i < percolateRequests.length(); i++) {
@@ -161,7 +159,7 @@ private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final A
161159
);
162160

163161
responsesByItemAndShard.set(i, new AtomicReferenceArray(shards.size()));
164-
expectedOperationsPerItem[i] = new AtomicInteger(shards.size());
162+
expectedOperationsPerItem.set(i, new AtomicInteger(shards.size()));
165163
for (ShardIterator shard : shards) {
166164
ShardId shardId = shard.shardId();
167165
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
@@ -180,7 +178,7 @@ private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final A
180178
} else if (element instanceof Throwable) {
181179
reducedResponses.set(i, element);
182180
responsesByItemAndShard.set(i, new AtomicReferenceArray(0));
183-
expectedOperationsPerItem[i] = new AtomicInteger(0);
181+
expectedOperationsPerItem.set(i, new AtomicInteger(0));
184182
}
185183
}
186184

@@ -189,6 +187,8 @@ private void multiPercolate(MultiPercolateRequest multiPercolateRequest, final A
189187
return;
190188
}
191189

190+
// Move slot to shard tracking from normal map to concurrent save map
191+
final ConcurrentMap<ShardId, AtomicIntegerArray> shardToSlots = ConcurrentCollections.newConcurrentMap();
192192
for (Map.Entry<ShardId, TIntArrayList> entry : shardToSlotsBuilder.entrySet()) {
193193
shardToSlots.put(entry.getKey(), new AtomicIntegerArray(entry.getValue().toArray()));
194194
}
@@ -215,8 +215,8 @@ public void onResponse(TransportShardMultiPercolateAction.Response response) {
215215
shardResults.set(shardId.id(), item.response());
216216
}
217217

218-
assert expectedOperationsPerItem[item.slot()].get() >= 1 : "slot[" + item.slot() + "] can't be lower than one";
219-
if (expectedOperationsPerItem[item.slot()].decrementAndGet() == 0) {
218+
assert expectedOperationsPerItem.get(item.slot()).get() >= 1 : "slot[" + item.slot() + "] can't be lower than one";
219+
if (expectedOperationsPerItem.get(item.slot()).decrementAndGet() == 0) {
220220
// Failure won't bubble up, since we fail the whole request now via the catch clause below,
221221
// so expectedOperationsPerItem will not be decremented twice.
222222
reduce(item.slot(), percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
@@ -242,8 +242,8 @@ public void onFailure(Throwable e) {
242242
}
243243

244244
shardResults.set(shardId.id(), new BroadcastShardOperationFailedException(shardId, e));
245-
assert expectedOperationsPerItem[slot].get() >= 1 : "slot[" + slot + "] can't be lower than one. Caused by: " + e.getMessage();
246-
if (expectedOperationsPerItem[slot].decrementAndGet() == 0) {
245+
assert expectedOperationsPerItem.get(slot).get() >= 1 : "slot[" + slot + "] can't be lower than one. Caused by: " + e.getMessage();
246+
if (expectedOperationsPerItem.get(slot).decrementAndGet() == 0) {
247247
reduce(slot, percolateRequests, expectedOperations, reducedResponses, listener, responsesByItemAndShard);
248248
}
249249
}

src/test/java/org/elasticsearch/test/integration/percolator/RecoveryPercolatorTests.java

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.client.Requests;
3232
import org.elasticsearch.common.Priority;
3333
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.common.unit.TimeValue;
3435
import org.elasticsearch.common.xcontent.XContentBuilder;
3536
import org.elasticsearch.env.NodeEnvironment;
3637
import org.elasticsearch.gateway.Gateway;
@@ -100,8 +101,8 @@ public void testRestartNodePercolator1() throws Exception {
100101
PercolateResponse percolate = client.preparePercolate()
101102
.setIndices("test").setDocumentType("type1")
102103
.setSource(jsonBuilder().startObject().startObject("doc")
103-
.field("field1", "value1")
104-
.endObject().endObject())
104+
.field("field1", "value1")
105+
.endObject().endObject())
105106
.execute().actionGet();
106107
assertThat(percolate.getMatches(), arrayWithSize(1));
107108

@@ -120,8 +121,8 @@ public void testRestartNodePercolator1() throws Exception {
120121
percolate = client.preparePercolate()
121122
.setIndices("test").setDocumentType("type1")
122123
.setSource(jsonBuilder().startObject().startObject("doc")
123-
.field("field1", "value1")
124-
.endObject().endObject())
124+
.field("field1", "value1")
125+
.endObject().endObject())
125126
.execute().actionGet();
126127
assertThat(percolate.getMatches(), arrayWithSize(1));
127128
}
@@ -153,8 +154,8 @@ public void testRestartNodePercolator2() throws Exception {
153154
PercolateResponse percolate = client.preparePercolate()
154155
.setIndices("test").setDocumentType("type1")
155156
.setSource(jsonBuilder().startObject().startObject("doc")
156-
.field("field1", "value1")
157-
.endObject().endObject())
157+
.field("field1", "value1")
158+
.endObject().endObject())
158159
.execute().actionGet();
159160
assertThat(percolate.getMatches(), arrayWithSize(1));
160161

@@ -184,8 +185,8 @@ public void testRestartNodePercolator2() throws Exception {
184185
percolate = client.preparePercolate()
185186
.setIndices("test").setDocumentType("type1")
186187
.setSource(jsonBuilder().startObject().startObject("doc")
187-
.field("field1", "value1")
188-
.endObject().endObject())
188+
.field("field1", "value1")
189+
.endObject().endObject())
189190
.execute().actionGet();
190191
assertThat(percolate.getMatches(), emptyArray());
191192

@@ -203,8 +204,8 @@ public void testRestartNodePercolator2() throws Exception {
203204
percolate = client.preparePercolate()
204205
.setIndices("test").setDocumentType("type1")
205206
.setSource(jsonBuilder().startObject().startObject("doc")
206-
.field("field1", "value1")
207-
.endObject().endObject())
207+
.field("field1", "value1")
208+
.endObject().endObject())
208209
.execute().actionGet();
209210
assertThat(percolate.getMatches(), arrayWithSize(1));
210211
}
@@ -370,7 +371,7 @@ public void run() {
370371
for (MultiPercolateResponse.Item item : response) {
371372
assertThat(item.isFailure(), equalTo(false));
372373
assertNoFailures(item.getResponse());
373-
assertThat(item.getResponse().getSuccessfulShards(), equalTo(2));
374+
assertThat(item.getResponse().getSuccessfulShards(), equalTo(item.getResponse().getTotalShards()));
374375
assertThat(item.getResponse().getCount(), equalTo((long) numQueries));
375376
assertThat(item.getResponse().getMatches().length, equalTo(numQueries));
376377
}
@@ -390,7 +391,7 @@ public void run() {
390391
.execute().actionGet();
391392
}
392393
assertNoFailures(response);
393-
assertThat(response.getSuccessfulShards(), equalTo(2));
394+
assertThat(response.getSuccessfulShards(), equalTo(response.getTotalShards()));
394395
assertThat(response.getCount(), equalTo((long) numQueries));
395396
assertThat(response.getMatches().length, equalTo(numQueries));
396397
}
@@ -407,33 +408,38 @@ public void run() {
407408
new Thread(r).start();
408409

409410
try {
411+
// 1 index, 2 primaries, 2 replicas per primary
410412
for (int i = 0; i < 4; i++) {
411413
closeNode("node3");
412-
client.admin().cluster().prepareHealth()
414+
client.admin().cluster().prepareHealth("test")
413415
.setWaitForEvents(Priority.LANGUID)
416+
.setTimeout(TimeValue.timeValueMinutes(2))
414417
.setWaitForYellowStatus()
415-
.setWaitForNodes("2")
418+
.setWaitForActiveShards(4) // 2 nodes, so 4 shards (2 primaries, 2 replicas)
416419
.execute().actionGet();
417420
assertThat(error.get(), nullValue());
418421
closeNode("node2");
419-
client.admin().cluster().prepareHealth()
422+
client.admin().cluster().prepareHealth("test")
420423
.setWaitForEvents(Priority.LANGUID)
424+
.setTimeout(TimeValue.timeValueMinutes(2))
421425
.setWaitForYellowStatus()
422-
.setWaitForNodes("1")
426+
.setWaitForActiveShards(2) // 1 node, so 2 shards (2 primaries, 0 replicas)
423427
.execute().actionGet();
424428
assertThat(error.get(), nullValue());
425429
startNode("node3");
426-
client.admin().cluster().prepareHealth()
430+
client.admin().cluster().prepareHealth("test")
427431
.setWaitForEvents(Priority.LANGUID)
432+
.setTimeout(TimeValue.timeValueMinutes(2))
428433
.setWaitForYellowStatus()
429-
.setWaitForNodes("2")
434+
.setWaitForActiveShards(4) // 2 nodes, so 4 shards (2 primaries, 2 replicas)
430435
.execute().actionGet();
431436
assertThat(error.get(), nullValue());
432437
startNode("node2");
433-
client.admin().cluster().prepareHealth()
438+
client.admin().cluster().prepareHealth("test")
434439
.setWaitForEvents(Priority.LANGUID)
435-
.setWaitForYellowStatus()
436-
.setWaitForNodes("3")
440+
.setTimeout(TimeValue.timeValueMinutes(2))
441+
.setWaitForGreenStatus() // We're confirm the shard settings, so green instead of yellow
442+
.setWaitForActiveShards(6) // 3 nodes, so 6 shards (2 primaries, 4 replicas)
437443
.execute().actionGet();
438444
assertThat(error.get(), nullValue());
439445
}

0 commit comments

Comments
 (0)