Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/96662.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 96662
summary: Refactor `RestoreClusterStateListener` to use `ClusterStateObserver`
area: Snapshot/Restore
type: bug
issues:
- 96425
Original file line number Diff line number Diff line change
Expand Up @@ -11,84 +11,107 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService;

import java.util.Map;
import java.util.function.Supplier;
import java.util.function.Predicate;

import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;

public class RestoreClusterStateListener implements ClusterStateListener {
public class RestoreClusterStateListener {

private static final Logger logger = LogManager.getLogger(RestoreClusterStateListener.class);

private final ClusterService clusterService;
private final String uuid;
private final ActionListener<RestoreSnapshotResponse> listener;
private final Supplier<ThreadContext.StoredContext> contextSupplier;
private RestoreClusterStateListener() {}

private RestoreClusterStateListener(
/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called when the restore is complete.
*/
public static void createAndRegisterListener(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener,
Supplier<ThreadContext.StoredContext> contextSupplier
ThreadContext threadContext
) {
this.clusterService = clusterService;
this.uuid = response.getUuid();
this.listener = listener;
this.contextSupplier = contextSupplier;
}

@Override
public void clusterChanged(ClusterChangedEvent changedEvent) {
try (ThreadContext.StoredContext stored = contextSupplier.get()) {
final RestoreInProgress.Entry prevEntry = restoreInProgress(changedEvent.previousState(), uuid);
final RestoreInProgress.Entry newEntry = restoreInProgress(changedEvent.state(), uuid);
if (prevEntry == null) {
// When there is a master failure after a restore has been started, this listener might not be registered
// on the current master and as such it might miss some intermediary cluster states due to batching.
// Clean up listener in that case and acknowledge completion of restore operation to client.
clusterService.removeListener(this);
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
} else if (newEntry == null) {
clusterService.removeListener(this);
Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards = prevEntry.shards();
assert prevEntry.state().completed() : "expected completed snapshot state but was " + prevEntry.state();
final String uuid = response.getUuid();
final DiscoveryNode localNode = clusterService.localNode();
waitForClusterState(clusterService, threadContext, new RestoreListener(listener, localNode) {
@Override
public void onNewClusterState(ClusterState state) {
var restoreState = restoreInProgress(state, uuid);
if (restoreState == null) {
// we are too late and the restore is gone from the cluster state already
listener.onResponse(new RestoreSnapshotResponse((RestoreInfo) null));
return;
}
Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards = restoreState.shards();
assert restoreState.state().completed() : "expected completed snapshot state but was " + restoreState.state();
assert RestoreService.completed(shards) : "expected all restore entries to be completed";
RestoreInfo ri = new RestoreInfo(
prevEntry.snapshot().getSnapshotId().getName(),
prevEntry.indices(),
var restoreInfo = new RestoreInfo(
restoreState.snapshot().getSnapshotId().getName(),
restoreState.indices(),
shards.size(),
shards.size() - RestoreService.failedShards(shards)
);
RestoreSnapshotResponse response = new RestoreSnapshotResponse(ri);
logger.debug("restore of [{}] completed", prevEntry.snapshot().getSnapshotId());
listener.onResponse(response);
} else {
// restore not completed yet, wait for next cluster state update
waitForClusterState(clusterService, threadContext, new RestoreListener(listener, localNode) {
@Override
public void onNewClusterState(ClusterState state) {
logger.debug("restore of [{}] completed", response.getSnapshot().getSnapshotId());
listener.onResponse(new RestoreSnapshotResponse(restoreInfo));
}
}, clusterState -> restoreInProgress(clusterState, uuid) == null);
}
}
}, clusterState -> {
var restoreState = restoreInProgress(clusterState, uuid);
return restoreState == null || RestoreService.completed(restoreState.shards());
});
}

/**
* Creates a cluster state listener and registers it with the cluster service. The listener passed as a
* parameter will be called when the restore is complete.
*/
public static void createAndRegisterListener(
private static void waitForClusterState(
ClusterService clusterService,
RestoreService.RestoreCompletionResponse response,
ActionListener<RestoreSnapshotResponse> listener,
ThreadContext threadContext
ThreadContext threadContext,
RestoreListener restoreListener,
Predicate<ClusterState> statePredicate
) {
clusterService.addListener(
new RestoreClusterStateListener(clusterService, response, listener, threadContext.newRestorableContext(true))
);
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadContext);
ClusterState state = observer.setAndGetObservedState();
if (statePredicate.test(state)) {
restoreListener.onNewClusterState(state);
} else {
observer.waitForNextChange(restoreListener, statePredicate);
}
}

private abstract static class RestoreListener implements ClusterStateObserver.Listener {

protected final ActionListener<RestoreSnapshotResponse> listener;

private final DiscoveryNode localNode;

protected RestoreListener(ActionListener<RestoreSnapshotResponse> listener, DiscoveryNode localNode) {
this.listener = listener;
this.localNode = localNode;
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(localNode));
}

@Override
public void onTimeout(TimeValue timeout) {
assert false : "impossible, no timeout set";
}
}
}