- Notifications
You must be signed in to change notification settings - Fork 25.8k
Make TransportLocalClusterStateAction wait for cluster to unblock #117230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b4f4cf5 725436d 42b2f89 9a378b4 6fc9f81 a5199fe dd0d486 ae2fa02 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| pr: 117230 | ||
| summary: Make various alias retrieval APIs wait for cluster to unblock | ||
| area: Distributed | ||
| type: enhancement | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -147,6 +147,7 @@ | |
| exports org.elasticsearch.action.support.master; | ||
| exports org.elasticsearch.action.support.master.info; | ||
| exports org.elasticsearch.action.support.nodes; | ||
| exports org.elasticsearch.action.support.local; | ||
| Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved | ||
| exports org.elasticsearch.action.support.replication; | ||
| exports org.elasticsearch.action.support.single.instance; | ||
| exports org.elasticsearch.action.support.single.shard; | ||
| | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -8,21 +8,20 @@ | |
| */ | ||
| package org.elasticsearch.action.admin.indices.alias.get; | ||
| | ||
| import org.elasticsearch.action.ActionRequest; | ||
| import org.elasticsearch.action.ActionRequestValidationException; | ||
| import org.elasticsearch.action.AliasesRequest; | ||
| import org.elasticsearch.action.support.IndicesOptions; | ||
| import org.elasticsearch.action.support.TransportAction; | ||
| import org.elasticsearch.action.support.local.LocalClusterStateRequest; | ||
| import org.elasticsearch.action.support.master.MasterNodeRequest; | ||
| import org.elasticsearch.common.Strings; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.tasks.CancellableTask; | ||
| import org.elasticsearch.tasks.Task; | ||
| import org.elasticsearch.tasks.TaskId; | ||
| | ||
| import java.io.IOException; | ||
| import java.util.Map; | ||
| | ||
| public class GetAliasesRequest extends ActionRequest implements AliasesRequest { | ||
| public class GetAliasesRequest extends LocalClusterStateRequest implements AliasesRequest { | ||
| | ||
| public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandHidden(); | ||
| | ||
| | @@ -31,18 +30,20 @@ public class GetAliasesRequest extends ActionRequest implements AliasesRequest { | |
| private String[] indices = Strings.EMPTY_ARRAY; | ||
| private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; | ||
| | ||
| @Deprecated | ||
| public GetAliasesRequest(String... aliases) { | ||
| this.aliases = aliases; | ||
| this.originalAliases = aliases; | ||
| this(MasterNodeRequest.TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, aliases); | ||
| Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It felt out of scope for this PR to refactor all these constructor callers to provide an explicit master timeout - that will have to be done in a follow-up PR. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would rather we did those changes first to set things up for this PR to go through without having to add back this trappy parameter. Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How would we do those changes first if there is currently - on Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, hm, good point. Ok, let's do it this way round. | ||
| } | ||
| | ||
| @Deprecated | ||
| public GetAliasesRequest() { | ||
| this(Strings.EMPTY_ARRAY); | ||
| } | ||
| | ||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| TransportAction.localOnly(); | ||
| public GetAliasesRequest(TimeValue masterTimeout, String... aliases) { | ||
| super(masterTimeout); | ||
| this.aliases = aliases; | ||
| this.originalAliases = aliases; | ||
| } | ||
| | ||
| @Override | ||
| | ||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
| | ||
| package org.elasticsearch.action.support.local; | ||
| | ||
| import org.elasticsearch.action.ActionRequest; | ||
| import org.elasticsearch.action.support.TransportAction; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.core.TimeValue; | ||
| | ||
| import java.io.IOException; | ||
| import java.util.Objects; | ||
| | ||
| /** | ||
| * A base request for actions that are executed locally on the node that receives the request. | ||
| */ | ||
| public abstract class LocalClusterStateRequest extends ActionRequest { | ||
| | ||
| /** | ||
| * The timeout for waiting until the cluster is unblocked. | ||
| * We use the name <code>masterTimeout</code> to be consistent with the master node actions. | ||
| */ | ||
| private final TimeValue masterTimeout; | ||
| | ||
| protected LocalClusterStateRequest(TimeValue masterTimeout) { | ||
| this.masterTimeout = Objects.requireNonNull(masterTimeout); | ||
| } | ||
| | ||
| @Override | ||
| public final void writeTo(StreamOutput out) throws IOException { | ||
| TransportAction.localOnly(); | ||
| } | ||
| | ||
| public TimeValue masterTimeout() { | ||
| return masterTimeout; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,128 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the "Elastic License | ||
| * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
| * Public License v 1"; you may not use this file except in compliance with, at | ||
| * your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
| * License v3.0 only", or the "Server Side Public License, v 1". | ||
| */ | ||
| | ||
| package org.elasticsearch.action.support.local; | ||
| | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.elasticsearch.ElasticsearchTimeoutException; | ||
| import org.elasticsearch.action.ActionListener; | ||
| import org.elasticsearch.action.ActionResponse; | ||
| import org.elasticsearch.action.ActionRunnable; | ||
| import org.elasticsearch.action.support.ActionFilters; | ||
| import org.elasticsearch.action.support.TransportAction; | ||
| import org.elasticsearch.cluster.ClusterState; | ||
| import org.elasticsearch.cluster.ClusterStateObserver; | ||
| import org.elasticsearch.cluster.block.ClusterBlockException; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
| import org.elasticsearch.common.util.concurrent.EsExecutors; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.node.NodeClosedException; | ||
| import org.elasticsearch.tasks.CancellableTask; | ||
| import org.elasticsearch.tasks.Task; | ||
| import org.elasticsearch.tasks.TaskManager; | ||
| | ||
| import java.util.concurrent.Executor; | ||
| | ||
| import static org.elasticsearch.common.Strings.format; | ||
| | ||
| /** | ||
| * Analogue of {@link org.elasticsearch.action.support.master.TransportMasterNodeReadAction} except that it runs on the local node rather | ||
| * than delegating to the master. | ||
| */ | ||
| public abstract class TransportLocalClusterStateAction<Request extends LocalClusterStateRequest, Response extends ActionResponse> extends | ||
| TransportAction<Request, Response> { | ||
| | ||
| private static final Logger logger = LogManager.getLogger(TransportLocalClusterStateAction.class); | ||
| | ||
| protected final ClusterService clusterService; | ||
| protected final Executor executor; | ||
| | ||
| protected TransportLocalClusterStateAction( | ||
| String actionName, | ||
| ActionFilters actionFilters, | ||
| TaskManager taskManager, | ||
| ClusterService clusterService, | ||
| Executor executor | ||
| ) { | ||
| // TODO replace DIRECT_EXECUTOR_SERVICE when removing workaround for https://github.com/elastic/elasticsearch/issues/97916 | ||
| super(actionName, actionFilters, taskManager, EsExecutors.DIRECT_EXECUTOR_SERVICE); | ||
| this.clusterService = clusterService; | ||
| this.executor = executor; | ||
| } | ||
| | ||
| protected abstract ClusterBlockException checkBlock(Request request, ClusterState state); | ||
| | ||
| protected abstract void localClusterStateOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) | ||
| throws Exception; | ||
| | ||
| @Override | ||
| protected final void doExecute(Task task, Request request, ActionListener<Response> listener) { | ||
| final var state = clusterService.state(); | ||
| final var clusterBlockException = checkBlock(request, state); | ||
| if (clusterBlockException != null) { | ||
| if (clusterBlockException.retryable() == false) { | ||
| listener.onFailure(clusterBlockException); | ||
| } else { | ||
| waitForClusterUnblock(task, request, listener, state, clusterBlockException); | ||
| } | ||
| } else { | ||
| innerDoExecute(task, request, listener, state); | ||
| } | ||
| } | ||
| | ||
| private void innerDoExecute(Task task, Request request, ActionListener<Response> listener, ClusterState state) { | ||
| if (task instanceof CancellableTask cancellableTask && cancellableTask.notifyIfCancelled(listener)) { | ||
| return; | ||
| } | ||
| // Workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can | ||
| executor.execute(ActionRunnable.wrap(listener, l -> localClusterStateOperation(task, request, state, l))); | ||
| } | ||
| | ||
| private void waitForClusterUnblock( | ||
| Task task, | ||
| Request request, | ||
| ActionListener<Response> listener, | ||
| ClusterState initialState, | ||
| ClusterBlockException exception | ||
| ) { | ||
| var observer = new ClusterStateObserver( | ||
| initialState, | ||
| clusterService, | ||
| request.masterTimeout(), | ||
| logger, | ||
| clusterService.threadPool().getThreadContext() | ||
| ); | ||
| observer.waitForNextChange(new ClusterStateObserver.Listener() { | ||
| @Override | ||
| public void onNewClusterState(ClusterState state) { | ||
| logger.trace("retrying with cluster state version [{}]", state.version()); | ||
| innerDoExecute(task, request, listener, state); | ||
| } | ||
| | ||
| @Override | ||
| public void onClusterServiceClose() { | ||
| listener.onFailure(new NodeClosedException(clusterService.localNode())); | ||
| } | ||
| | ||
| @Override | ||
| public void onTimeout(TimeValue timeout) { | ||
| logger.debug( | ||
| () -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout), | ||
| exception | ||
| ); | ||
| listener.onFailure(new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception)); | ||
| } | ||
| }, clusterState -> isTaskCancelled(task) || checkBlock(request, clusterState) == null); | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This means that if the task is cancelled while waiting for an unblocking cluster state then it'll wait for the next cluster state update before completing, which could be arbitrarily far in the future. Could we use Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea 👍. It looks like we're not doing that in Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eh we probably should. But that's a good point, let's leave this with the same behaviour as in Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened #117971 for this. | ||
| } | ||
| | ||
| private boolean isTaskCancelled(Task task) { | ||
| return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if these docs and rest-api-spec changes are out of scope or not for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems ok to me, tho note that we have never before supported
?master_timeouton these APIs, this isn't reinstating a parameter that previously was removed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I realized that, but I figured it made sense to add them if they're parameters we're accepting.