Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/87458.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 87458
summary: Fork to WRITE thread before failing shard in `updateCheckPoints`
area: Engine
type: bug
issues:
- 87094
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,12 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
@Override
public void onResponse(Void aVoid) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
updateCheckPoints(
primary.routingEntry(),
primary::localCheckpoint,
primary::globalCheckpoint,
() -> decPendingAndFinishIfNeeded()
);
}

@Override
Expand Down Expand Up @@ -221,11 +222,7 @@ private void performOnReplica(
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
} finally {
decPendingAndFinishIfNeeded();
}
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint, () -> decPendingAndFinishIfNeeded());
}

@Override
Expand Down Expand Up @@ -302,16 +299,46 @@ public boolean shouldRetry(Exception e) {
replicationAction.run();
}

private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
private void updateCheckPoints(
ShardRouting shard,
LongSupplier localCheckpointSupplier,
LongSupplier globalCheckpointSupplier,
Runnable onCompletion
) {
boolean forked = false;
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
assert false : e;
}

@Override
public boolean isForceExecution() {
return true;
}

@Override
protected void doRun() {
// fail the primary but fall through and let the rest of operation processing complete
primary.failShard(String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard), e);
}

@Override
public void onAfter() {
onCompletion.run();
}
});
forked = true;
} finally {
if (forked == false) {
onCompletion.run();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,9 @@ public void testPrimaryFailureHandlingReplicaResponse() throws Exception {
final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();
final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);

final Thread testThread = Thread.currentThread();
final boolean fatal = randomBoolean();
final AtomicBoolean primaryFailed = new AtomicBoolean();
final PlainActionFuture<Void> primaryFailedFuture = new PlainActionFuture<>();
final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary = new TestPrimary(
primaryRouting,
() -> initialReplicationGroup,
Expand All @@ -520,7 +521,10 @@ public void testPrimaryFailureHandlingReplicaResponse() throws Exception {

@Override
public void failShard(String message, Exception exception) {
primaryFailed.set(true);
assertNotSame(testThread, Thread.currentThread());
assertThat(Thread.currentThread().getName(), containsString('[' + ThreadPool.Names.WRITE + ']'));
assertTrue(fatal);
primaryFailedFuture.onResponse(null);
}

@Override
Expand All @@ -543,7 +547,10 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint)
TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, primaryTerm);
operation.execute();

assertThat(primaryFailed.get(), equalTo(fatal));
if (fatal) {
primaryFailedFuture.get(10, TimeUnit.SECONDS);
}

final ShardInfo shardInfo = listener.actionGet().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
Expand Down