Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/122857.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 122857
summary: Run `TransportGetWatcherSettingsAction` on local node
area: Watcher
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Map;

public class GetWatcherSettingsAction extends ActionType<GetWatcherSettingsAction.Response> {

Expand All @@ -30,12 +35,17 @@ public GetWatcherSettingsAction() {
super(NAME);
}

public static class Request extends MasterNodeReadRequest<Request> {
public static class Request extends LocalClusterStateRequest {

public Request(TimeValue masterNodeTimeout) {
super(masterNodeTimeout);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
public static Request readFrom(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
return new Request(in);
Expand All @@ -49,15 +59,13 @@ private Request(StreamInput in) throws IOException {
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) {
super.writeTo(out);
}
public ActionRequestValidationException validate() {
return null;
}

@Override
public ActionRequestValidationException validate() {
return null;
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new CancellableTask(id, type, action, "", parentTaskId, headers);
}
}

Expand All @@ -69,10 +77,11 @@ public Response(Settings settings) {
this.settings = settings;
}

public Response(StreamInput in) throws IOException {
this.settings = Settings.readSettingsFromStream(in);
}

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@Override
public void writeTo(StreamOutput out) throws IOException {
this.settings.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction;

Expand All @@ -37,6 +38,10 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
GetWatcherSettingsAction.Request req = new GetWatcherSettingsAction.Request(RestUtils.getMasterNodeTimeout(request));
return channel -> client.execute(GetWatcherSettingsAction.INSTANCE, req, new RestToXContentListener<>(channel));
return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).execute(
GetWatcherSettingsAction.INSTANCE,
req,
new RestToXContentListener<>(channel)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -18,9 +19,10 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.watcher.transport.actions.put.GetWatcherSettingsAction;

Expand All @@ -30,40 +32,52 @@
import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_NAME;
import static org.elasticsearch.xpack.watcher.transport.actions.TransportUpdateWatcherSettingsAction.WATCHER_INDEX_REQUEST;

public class TransportGetWatcherSettingsAction extends TransportMasterNodeAction<
public class TransportGetWatcherSettingsAction extends TransportLocalClusterStateAction<
GetWatcherSettingsAction.Request,
GetWatcherSettingsAction.Response> {

private final IndexNameExpressionResolver indexNameExpressionResolver;

/**
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
* we no longer need to support calling this action remotely.
*/
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
@SuppressWarnings("this-escape")
@Inject
public TransportGetWatcherSettingsAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
) {
super(
GetWatcherSettingsAction.NAME,
transportService,
clusterService,
threadPool,
actionFilters,
GetWatcherSettingsAction.Request::readFrom,
GetWatcherSettingsAction.Response::new,
transportService.getTaskManager(),
clusterService,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.indexNameExpressionResolver = indexNameExpressionResolver;

transportService.registerRequestHandler(
actionName,
executor,
false,
true,
GetWatcherSettingsAction.Request::readFrom,
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
);
}

@Override
protected void masterOperation(
protected void localClusterStateOperation(
Task task,
GetWatcherSettingsAction.Request request,
ClusterState state,
ActionListener<GetWatcherSettingsAction.Response> listener
) {
((CancellableTask) task).ensureNotCancelled();
IndexMetadata metadata = state.metadata().index(WATCHER_INDEX_NAME);
if (metadata == null) {
listener.onResponse(new GetWatcherSettingsAction.Response(Settings.EMPTY));
Expand Down