Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,8 +506,9 @@ private void prewarmCache(ActionListener<Void> listener) {
}, listener::onFailure), snapshot().totalFileCount());

for (BlobStoreIndexShardSnapshot.FileInfo file : snapshot().indexFiles()) {
if (file.metadata().hashEqualsContents() || isExcludedFromCache(file.physicalName())) {
if (file.metadata().hashEqualsContents()) {
boolean hashEqualsContents = file.metadata().hashEqualsContents();
if (hashEqualsContents || isExcludedFromCache(file.physicalName())) {
if (hashEqualsContents) {
recoveryState.getIndex().addFileDetail(file.physicalName(), file.length(), true);
} else {
recoveryState.ignoreFile(file.physicalName());
Expand All @@ -516,17 +517,24 @@ private void prewarmCache(ActionListener<Void> listener) {
continue;
}
recoveryState.getIndex().addFileDetail(file.physicalName(), file.length(), false);
boolean submitted = false;
try {
final IndexInput input = openInput(file.physicalName(), CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();

final int numberOfParts = file.numberOfParts();
final StepListener<Collection<Void>> fileCompletionListener = new StepListener<>();
fileCompletionListener.whenComplete(voids -> input.close(), e -> IOUtils.closeWhileHandlingException(input));
fileCompletionListener.addListener(completionListener.map(voids -> null));
fileCompletionListener.whenComplete(voids -> {
logger.debug("{} file [{}] prewarmed", shardId, file.physicalName());
input.close();
}, e -> {
logger.warn(() -> new ParameterizedMessage("{} prewarming failed for file [{}]", shardId, file.physicalName()), e);
IOUtils.closeWhileHandlingException(input);
});

final GroupedActionListener<Void> partsListener = new GroupedActionListener<>(fileCompletionListener, numberOfParts);

submitted = true;
for (int p = 0; p < numberOfParts; p++) {
final int part = p;
queue.add(Tuple.tuple(partsListener, () -> {
Expand Down Expand Up @@ -555,6 +563,9 @@ private void prewarmCache(ActionListener<Void> listener) {
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("{} unable to prewarm file [{}]", shardId, file.physicalName()), e);
if (submitted == false) {
completionListener.onFailure(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this attempts to ensure that the recovery state moves to DONE? I guess that is more correct than FINALIZE, but I think we should add a test for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It attempts to complete the grouped listener but has not other side effect besides logging a warning message if the shard is a relocation target:

if (success && shardRouting.isRelocationTarget()) {
final Runnable preWarmCondition = indexShard.addCleanFilesDependency();
preWarmListener.whenComplete(v -> preWarmCondition.run(), e -> {
logger.warn(
new ParameterizedMessage(
"pre-warm operation failed for [{}] while it was the target of primary relocation [{}]",
shardRouting.shardId(),
shardRouting
),
e
);
preWarmCondition.run();
});

I'm OK with leaving the recovery state in FINALIZE as this is what allowed to detect a non-fully prewarmed shard at the first place. I think that adding more logs would have helped to troubleshoot some issues when accessing repository files.

}
}
}

Expand Down