Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/117230.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 117230
summary: Make various alias retrieval APIs wait for cluster to unblock
area: Distributed
type: enhancement
issues: []
2 changes: 2 additions & 0 deletions docs/reference/cat/alias.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=cat-v]

include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=expand-wildcards]

include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if these docs and rest-api-spec changes are out of scope or not for this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems ok to me, tho note that we have never before supported ?master_timeout on these APIs, this isn't reinstating a parameter that previously was removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I realized that, but I figured it made sense to add them if they're parameters we're accepting.


[[cat-alias-api-example]]
==== {api-examples-title}

Expand Down
2 changes: 2 additions & 0 deletions docs/reference/indices/alias-exists.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Defaults to `all`.
(Optional, Boolean) If `false`, requests that include a missing data stream or
index in the `<target>` return an error. Defaults to `false`.

include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]

[[alias-exists-api-response-codes]]
==== {api-response-codes-title}

Expand Down
2 changes: 2 additions & 0 deletions docs/reference/indices/get-alias.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,5 @@ Defaults to `all`.
`ignore_unavailable`::
(Optional, Boolean) If `false`, requests that include a missing data stream or
index in the `<target>` return an error. Defaults to `false`.

include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@
],
"default": "all",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"master_timeout":{
"type":"time",
"description":"Timeout for waiting for new cluster state in case it is blocked",
"default":"30s"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
],
"default":"all",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"master_timeout":{
"type":"time",
"description":"Timeout for waiting for new cluster state in case it is blocked",
"default":"30s"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
],
"default": "all",
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"master_timeout":{
"type":"time",
"description":"Timeout for waiting for new cluster state in case it is blocked",
"default":"30s"
}
}
}
Expand Down
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@
exports org.elasticsearch.action.support.master;
exports org.elasticsearch.action.support.master.info;
exports org.elasticsearch.action.support.nodes;
exports org.elasticsearch.action.support.local;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved TransportLocalClusterStateAction to its own local package - together with the new LocalClusterStateRequest.

exports org.elasticsearch.action.support.replication;
exports org.elasticsearch.action.support.single.instance;
exports org.elasticsearch.action.support.single.shard;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,20 @@
*/
package org.elasticsearch.action.admin.indices.alias.get;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.AliasesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;

public class GetAliasesRequest extends ActionRequest implements AliasesRequest {
public class GetAliasesRequest extends LocalClusterStateRequest implements AliasesRequest {

public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.strictExpandHidden();

Expand All @@ -31,18 +30,20 @@ public class GetAliasesRequest extends ActionRequest implements AliasesRequest {
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS;

@Deprecated
public GetAliasesRequest(String... aliases) {
this.aliases = aliases;
this.originalAliases = aliases;
this(MasterNodeRequest.TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, aliases);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It felt out of scope for this PR to refactor all these constructor callers to provide an explicit master timeout - that will have to be done in a follow-up PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather we did those changes first to set things up for this PR to go through without having to add back this trappy parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we do those changes first if there is currently - on main - no timeout to set in these constructors? Are you suggesting adding a constructor on main that takes a timeout but doesn't actually do anything with it and swapping that constructor with the ones on this branch afterward?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, hm, good point. Ok, let's do it this way round.

}

@Deprecated
public GetAliasesRequest() {
this(Strings.EMPTY_ARRAY);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
public GetAliasesRequest(TimeValue masterTimeout, String... aliases) {
super(masterTimeout);
this.aliases = aliases;
this.originalAliases = aliases;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportLocalClusterStateAction;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.support.local;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.TimeValue;

import java.io.IOException;
import java.util.Objects;

/**
* A base request for actions that are executed locally on the node that receives the request.
*/
public abstract class LocalClusterStateRequest extends ActionRequest {

/**
* The timeout for waiting until the cluster is unblocked.
* We use the name <code>masterTimeout</code> to be consistent with the master node actions.
*/
private final TimeValue masterTimeout;

protected LocalClusterStateRequest(TimeValue masterTimeout) {
this.masterTimeout = Objects.requireNonNull(masterTimeout);
}

@Override
public final void writeTo(StreamOutput out) throws IOException {
TransportAction.localOnly();
}

public TimeValue masterTimeout() {
return masterTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.support.local;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;

import java.util.concurrent.Executor;

import static org.elasticsearch.common.Strings.format;

/**
* Analogue of {@link org.elasticsearch.action.support.master.TransportMasterNodeReadAction} except that it runs on the local node rather
* than delegating to the master.
*/
public abstract class TransportLocalClusterStateAction<Request extends LocalClusterStateRequest, Response extends ActionResponse> extends
TransportAction<Request, Response> {

private static final Logger logger = LogManager.getLogger(TransportLocalClusterStateAction.class);

protected final ClusterService clusterService;
protected final Executor executor;

protected TransportLocalClusterStateAction(
String actionName,
ActionFilters actionFilters,
TaskManager taskManager,
ClusterService clusterService,
Executor executor
) {
// TODO replace DIRECT_EXECUTOR_SERVICE when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
super(actionName, actionFilters, taskManager, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.clusterService = clusterService;
this.executor = executor;
}

protected abstract ClusterBlockException checkBlock(Request request, ClusterState state);

protected abstract void localClusterStateOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
throws Exception;

@Override
protected final void doExecute(Task task, Request request, ActionListener<Response> listener) {
final var state = clusterService.state();
final var clusterBlockException = checkBlock(request, state);
if (clusterBlockException != null) {
if (clusterBlockException.retryable() == false) {
listener.onFailure(clusterBlockException);
} else {
waitForClusterUnblock(task, request, listener, state, clusterBlockException);
}
} else {
innerDoExecute(task, request, listener, state);
}
}

private void innerDoExecute(Task task, Request request, ActionListener<Response> listener, ClusterState state) {
if (task instanceof CancellableTask cancellableTask && cancellableTask.notifyIfCancelled(listener)) {
return;
}
// Workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
executor.execute(ActionRunnable.wrap(listener, l -> localClusterStateOperation(task, request, state, l)));
}

private void waitForClusterUnblock(
Task task,
Request request,
ActionListener<Response> listener,
ClusterState initialState,
ClusterBlockException exception
) {
var observer = new ClusterStateObserver(
initialState,
clusterService,
request.masterTimeout(),
logger,
clusterService.threadPool().getThreadContext()
);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
logger.trace("retrying with cluster state version [{}]", state.version());
innerDoExecute(task, request, listener, state);
}

@Override
public void onClusterServiceClose() {
listener.onFailure(new NodeClosedException(clusterService.localNode()));
}

@Override
public void onTimeout(TimeValue timeout) {
logger.debug(
() -> format("timed out while waiting for cluster to unblock in [%s] (timeout [%s])", actionName, timeout),
exception
);
listener.onFailure(new ElasticsearchTimeoutException("timed out while waiting for cluster to unblock", exception));
}
}, clusterState -> isTaskCancelled(task) || checkBlock(request, clusterState) == null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that if the task is cancelled while waiting for an unblocking cluster state then it'll wait for the next cluster state update before completing, which could be arbitrarily far in the future. Could we use org.elasticsearch.tasks.CancellableTask#addListener to react more promptly to cancellation instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea 👍. It looks like we're not doing that in TransportMasterNodeAction either. Am I missing something or should we add it there too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh we probably should. But that's a good point, let's leave this with the same behaviour as in TransportMasterNodeAction and we can come back to this change later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #117971 for this.

}

private boolean isTaskCancelled(Task task) {
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestBuilderListener;
Expand Down Expand Up @@ -207,7 +208,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

final boolean namesProvided = request.hasParam("name");
final String[] aliases = request.paramAsStringArrayOrEmptyIfAll("name");
final GetAliasesRequest getAliasesRequest = new GetAliasesRequest(aliases);
final var masterNodeTimeout = RestUtils.getMasterNodeTimeout(request);
final GetAliasesRequest getAliasesRequest = new GetAliasesRequest(masterNodeTimeout, aliases);
final String[] indices = Strings.splitStringByCommaToArray(request.param("index"));
getAliasesRequest.indices(indices);
getAliasesRequest.indicesOptions(IndicesOptions.fromRequest(request, getAliasesRequest.indicesOptions()));
Expand Down
Loading