Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/96421.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 96421
summary: Promptly fail recovery from snapshot
area: Recovery
type: bug
issues:
- 95525
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ public Releasable tryAcquireSnapshotDownloadPermits() {
return recoverySettings.tryAcquireSnapshotDownloadPermits();
}

// Visible for testing
public int ongoingRecoveryCount() {
return onGoingRecoveries.size();
}

/**
* Prepare the start recovery request.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.repositories.IndexId;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
Expand Down Expand Up @@ -586,7 +587,19 @@ public void restoreFileFromSnapshot(
) {
StoreFileMetadata metadata = fileInfo.metadata();
int readSnapshotFileBufferSize = snapshotFilesProvider.getReadSnapshotFileBufferSizeForRepo(repository);
multiFileWriter.writeFile(metadata, readSnapshotFileBufferSize, inputStream);
multiFileWriter.writeFile(metadata, readSnapshotFileBufferSize, new FilterInputStream(inputStream) {
@Override
public int read() throws IOException {
cancellableThreads.checkForCancel();
return super.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
cancellableThreads.checkForCancel();
return super.read(b, off, len);
}
});
listener.onResponse(null);
} catch (Exception e) {
logger.debug(() -> format("Unable to recover snapshot file %s from repository %s", fileInfo, repository), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,74 @@ public void testRecoveryIsCancelledAfterDeletingTheIndex() throws Exception {
}
}

public void testCancelledRecoveryAbortsDownloadPromptly() throws Exception {
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), "1");

try {
internalCluster().ensureAtLeastNumDataNodes(2);

String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build()
);
ensureGreen(indexName);

int numDocs = randomIntBetween(1, 1000);
indexDocs(indexName, numDocs, numDocs);

String repoName = "repo";
createRepo(repoName, TestRepositoryPlugin.FILTER_TYPE);
createSnapshot(repoName, "snap", Collections.singletonList(indexName));

final AtomicBoolean isCancelled = new AtomicBoolean();
final CountDownLatch readFromBlobCalledLatch = new CountDownLatch(1);
final CountDownLatch readFromBlobRespondLatch = new CountDownLatch(1);

FilterFsRepository.wrapReadBlobMethod((blobName, stream) -> {
if (blobName.startsWith("__")) {
return new FilterInputStream(stream) {
@Override
public int read() throws IOException {
beforeRead();
return super.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
beforeRead();
return super.read(b, off, len);
}

private void beforeRead() {
assertFalse(isCancelled.get()); // should have no further reads once the index is deleted
readFromBlobCalledLatch.countDown();
safeAwait(readFromBlobRespondLatch);
}
};
} else {
return stream;
}
});

updateIndexSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1), indexName);
safeAwait(readFromBlobCalledLatch);

assertAcked(client().admin().indices().prepareDelete(indexName).get());
// cancellation flag is set when applying the cluster state that deletes the index, so no further waiting is necessary
isCancelled.set(true);
readFromBlobRespondLatch.countDown();

assertThat(indexExists(indexName), is(equalTo(false)));
assertBusy(
() -> internalCluster().getInstances(PeerRecoveryTargetService.class)
.forEach(peerRecoveryTargetService -> assertEquals(0, peerRecoveryTargetService.ongoingRecoveryCount()))
);
} finally {
updateSetting(INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.getKey(), null);
}
}

public void testRecoveryAfterRestoreUsesSnapshots() throws Exception {
String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(
Expand Down