Skip to content

Commit 668bcd0

Browse files
committed
Bulk execution while a shard is replication might send erroneous version conflict failures for certain items
fixes elastic#2642
1 parent a7bbab7 commit 668bcd0

File tree

2 files changed

+49
-14
lines changed

2 files changed

+49
-14
lines changed

src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
133133
Set<Tuple<String, String>> mappingsToUpdate = null;
134134

135135
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
136+
long[] versions = new long[request.items().length];
136137
for (int i = 0; i < request.items().length; i++) {
137138
BulkItemRequest item = request.items()[i];
138139
if (item.request() instanceof IndexRequest) {
@@ -163,6 +164,7 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
163164
version = create.version();
164165
op = create;
165166
}
167+
versions[i] = indexRequest.version();
166168
// update the version on request so it will happen on the replicas
167169
indexRequest.version(version);
168170

@@ -188,6 +190,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
188190
} catch (Exception e) {
189191
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
190192
if (retryPrimaryException(e)) {
193+
// restore updated versions...
194+
for (int j = 0; j < i; j++) {
195+
applyVersion(request.items()[j], versions[j]);
196+
}
191197
throw (ElasticSearchException) e;
192198
}
193199
if (e instanceof ElasticSearchException && ((ElasticSearchException) e).status() == RestStatus.CONFLICT) {
@@ -214,6 +220,10 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
214220
} catch (Exception e) {
215221
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
216222
if (retryPrimaryException(e)) {
223+
// restore updated versions...
224+
for (int j = 0; j < i; j++) {
225+
applyVersion(request.items()[j], versions[j]);
226+
}
217227
throw (ElasticSearchException) e;
218228
}
219229
if (e instanceof ElasticSearchException && ((ElasticSearchException) e).status() == RestStatus.CONFLICT) {
@@ -352,4 +362,14 @@ public void onFailure(Throwable e) {
352362
logger.warn("failed to update master on updated mapping for index [{}], type [{}]", e, index, type);
353363
}
354364
}
365+
366+
private void applyVersion(BulkItemRequest item, long version) {
367+
if (item.request() instanceof IndexRequest) {
368+
((IndexRequest) item.request()).version(version);
369+
} else if (item.request() instanceof DeleteRequest) {
370+
((DeleteRequest) item.request()).version(version);
371+
} else {
372+
// log?
373+
}
374+
}
355375
}

src/test/java/org/elasticsearch/test/integration/recovery/RelocationTests.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,20 +121,20 @@ public void testPrimaryRelocationWhileIndexingWith10RelocationAnd5Writers() thro
121121
testPrimaryRelocationWhileIndexing(10, 5, false);
122122
}
123123

124-
// @Test
125-
// public void testPrimaryRelocationWhileBulkIndexingWith1RelocationAnd1Writer() throws Exception {
126-
// testPrimaryRelocationWhileIndexing(1, 1, true);
127-
// }
128-
//
129-
// @Test
130-
// public void testPrimaryRelocationWhileBulkIndexingWith10RelocationAnd1Writer() throws Exception {
131-
// testPrimaryRelocationWhileIndexing(10, 1, true);
132-
// }
133-
//
134-
// @Test
135-
// public void testPrimaryRelocationWhileBulkIndexingWith10RelocationAnd5Writers() throws Exception {
136-
// testPrimaryRelocationWhileIndexing(10, 5, true);
137-
// }
124+
@Test
125+
public void testPrimaryRelocationWhileBulkIndexingWith1RelocationAnd1Writer() throws Exception {
126+
testPrimaryRelocationWhileIndexing(1, 1, true);
127+
}
128+
129+
@Test
130+
public void testPrimaryRelocationWhileBulkIndexingWith10RelocationAnd1Writer() throws Exception {
131+
testPrimaryRelocationWhileIndexing(10, 1, true);
132+
}
133+
134+
@Test
135+
public void testPrimaryRelocationWhileBulkIndexingWith10RelocationAnd5Writers() throws Exception {
136+
testPrimaryRelocationWhileIndexing(10, 5, true);
137+
}
138138

139139
private void testPrimaryRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception {
140140
logger.info("--> starting [node1] ...");
@@ -288,6 +288,21 @@ public void testReplicaRelocationWhileIndexingWith10RelocationAnd5Writers() thro
288288
testReplicaRelocationWhileIndexing(10, 5, false);
289289
}
290290

291+
@Test
292+
public void testReplicaRelocationWhileBulkIndexingWith1RelocationAnd1Writer() throws Exception {
293+
testReplicaRelocationWhileIndexing(1, 1, true);
294+
}
295+
296+
@Test
297+
public void testReplicaRelocationWhileBulkIndexingWith10RelocationAnd1Writer() throws Exception {
298+
testReplicaRelocationWhileIndexing(10, 1, true);
299+
}
300+
301+
@Test
302+
public void testReplicaRelocationWhileBulkIndexingWith10RelocationAnd5Writers() throws Exception {
303+
testReplicaRelocationWhileIndexing(10, 5, true);
304+
}
305+
291306
private void testReplicaRelocationWhileIndexing(final int numberOfRelocations, final int numberOfWriters, final boolean batch) throws Exception {
292307
logger.info("--> starting [node1] ...");
293308
startNode("node1");

0 commit comments

Comments
 (0)