Skip to content

Commit afdd453

Browse files
authored
Run TransportGetEnrichPolicyAction on local node (#121124)
This action solely needs the cluster state, it can run on any node. Additionally, it needs to be cancellable to avoid doing unnecessary work after a client failure or timeout.
1 parent d88ddca commit afdd453

File tree

10 files changed

+222
-154
lines changed

10 files changed

+222
-154
lines changed

docs/changelog/121124.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121124
2+
summary: Run `TransportGetEnrichPolicyAction` on local node
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

rest-api-spec/src/main/resources/rest-api-spec/api/enrich.get_policy.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"params": {
3131
"master_timeout":{
3232
"type":"time",
33-
"description":"Timeout for processing on master node"
33+
"description":"Timeout for waiting for new cluster state in case it is blocked"
3434
}
3535
}
3636
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/enrich/action/GetEnrichPolicyAction.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@
99
import org.elasticsearch.action.ActionRequestValidationException;
1010
import org.elasticsearch.action.ActionResponse;
1111
import org.elasticsearch.action.ActionType;
12-
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
12+
import org.elasticsearch.action.support.local.LocalClusterStateRequest;
1313
import org.elasticsearch.common.io.stream.StreamInput;
1414
import org.elasticsearch.common.io.stream.StreamOutput;
1515
import org.elasticsearch.core.TimeValue;
16+
import org.elasticsearch.core.UpdateForV10;
17+
import org.elasticsearch.tasks.CancellableTask;
18+
import org.elasticsearch.tasks.Task;
19+
import org.elasticsearch.tasks.TaskId;
1620
import org.elasticsearch.xcontent.ToXContentObject;
1721
import org.elasticsearch.xcontent.XContentBuilder;
1822
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@@ -33,7 +37,7 @@ private GetEnrichPolicyAction() {
3337
super(NAME);
3438
}
3539

36-
public static class Request extends MasterNodeReadRequest<Request> {
40+
public static class Request extends LocalClusterStateRequest {
3741

3842
private final List<String> names;
3943

@@ -42,6 +46,11 @@ public Request(TimeValue masterNodeTimeout, String... names) {
4246
this.names = List.of(names);
4347
}
4448

49+
/**
50+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to read these requests until
51+
* we no longer need to support calling this action remotely.
52+
*/
53+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
4554
public Request(StreamInput in) throws IOException {
4655
super(in);
4756
this.names = in.readStringCollectionAsImmutableList();
@@ -52,14 +61,13 @@ public ActionRequestValidationException validate() {
5261
return null;
5362
}
5463

55-
public List<String> getNames() {
56-
return names;
64+
@Override
65+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
66+
return new CancellableTask(id, type, action, "", parentTaskId, headers);
5767
}
5868

59-
@Override
60-
public void writeTo(StreamOutput out) throws IOException {
61-
super.writeTo(out);
62-
out.writeStringCollection(names);
69+
public List<String> getNames() {
70+
return names;
6371
}
6472

6573
@Override
@@ -89,10 +97,11 @@ public Response(Map<String, EnrichPolicy> policies) {
8997
.collect(Collectors.toList());
9098
}
9199

92-
public Response(StreamInput in) throws IOException {
93-
policies = in.readCollectionAsList(EnrichPolicy.NamedPolicy::new);
94-
}
95-
100+
/**
101+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC we must remain able to write these responses until
102+
* we no longer need to support calling this action remotely.
103+
*/
104+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
96105
@Override
97106
public void writeTo(StreamOutput out) throws IOException {
98107
out.writeCollection(policies);

x-pack/plugin/enrich/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
testImplementation project(path: ':modules:legacy-geo')
2020
testImplementation project(xpackModule('spatial'))
2121
testImplementation(testArtifact(project(xpackModule('monitoring'))))
22+
internalClusterTestImplementation project(':modules:rest-root')
2223
}
2324

2425
addQaCheckDependencies(project)
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.enrich;
9+
10+
import org.apache.http.client.methods.HttpGet;
11+
import org.apache.http.client.methods.HttpPost;
12+
import org.elasticsearch.action.support.CancellableActionTestPlugin;
13+
import org.elasticsearch.action.support.PlainActionFuture;
14+
import org.elasticsearch.action.support.RefCountingListener;
15+
import org.elasticsearch.action.support.SubscribableListener;
16+
import org.elasticsearch.client.Request;
17+
import org.elasticsearch.client.Response;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.common.network.NetworkModule;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.plugins.Plugin;
22+
import org.elasticsearch.rest.root.MainRestPlugin;
23+
import org.elasticsearch.test.ESIntegTestCase;
24+
import org.elasticsearch.test.rest.ObjectPath;
25+
import org.elasticsearch.transport.netty4.Netty4Plugin;
26+
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
27+
28+
import java.io.IOException;
29+
import java.util.Collection;
30+
import java.util.List;
31+
import java.util.concurrent.CancellationException;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.TimeUnit;
34+
35+
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
36+
import static org.elasticsearch.test.TaskAssertions.assertAllTasksHaveFinished;
37+
import static org.hamcrest.Matchers.greaterThan;
38+
import static org.hamcrest.Matchers.oneOf;
39+
40+
public class EnrichRestActionCancellationIT extends ESIntegTestCase {
41+
42+
@Override
43+
protected boolean addMockHttpTransport() {
44+
return false; // enable http
45+
}
46+
47+
@Override
48+
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
49+
return Settings.builder()
50+
.put(super.nodeSettings(nodeOrdinal, otherSettings))
51+
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
52+
.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
53+
.build();
54+
}
55+
56+
@Override
57+
protected Collection<Class<? extends Plugin>> nodePlugins() {
58+
return List.of(getTestTransportPlugin(), MainRestPlugin.class, CancellableActionTestPlugin.class, EnrichPlugin.class);
59+
}
60+
61+
public void testGetEnrichPolicyCancellation() throws IOException {
62+
runRestActionCancellationTest(new Request(HttpGet.METHOD_NAME, "/_enrich/policy"), GetEnrichPolicyAction.NAME);
63+
}
64+
65+
private void runRestActionCancellationTest(Request request, String actionName) {
66+
final var node = usually() ? internalCluster().getRandomNodeName() : internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
67+
68+
try (
69+
var restClient = createRestClient(node);
70+
var capturingAction = CancellableActionTestPlugin.capturingActionOnNode(actionName, node)
71+
) {
72+
final var responseFuture = new PlainActionFuture<Response>();
73+
final var restInvocation = restClient.performRequestAsync(request, wrapAsRestResponseListener(responseFuture));
74+
75+
if (randomBoolean()) {
76+
// cancel by aborting the REST request
77+
capturingAction.captureAndCancel(restInvocation::cancel);
78+
expectThrows(ExecutionException.class, CancellationException.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
79+
} else {
80+
// cancel via the task management API
81+
final var cancelFuture = new PlainActionFuture<Void>();
82+
capturingAction.captureAndCancel(
83+
() -> SubscribableListener
84+
85+
.<ObjectPath>newForked(
86+
l -> restClient.performRequestAsync(
87+
getListTasksRequest(node, actionName),
88+
wrapAsRestResponseListener(l.map(ObjectPath::createFromResponse))
89+
)
90+
)
91+
92+
.<Void>andThen((l, listTasksResponse) -> {
93+
final var taskCount = listTasksResponse.evaluateArraySize("tasks");
94+
assertThat(taskCount, greaterThan(0));
95+
try (var listeners = new RefCountingListener(l)) {
96+
for (int i = 0; i < taskCount; i++) {
97+
final var taskPrefix = "tasks." + i + ".";
98+
assertTrue(listTasksResponse.evaluate(taskPrefix + "cancellable"));
99+
assertFalse(listTasksResponse.evaluate(taskPrefix + "cancelled"));
100+
restClient.performRequestAsync(
101+
getCancelTaskRequest(
102+
listTasksResponse.evaluate(taskPrefix + "node"),
103+
listTasksResponse.evaluate(taskPrefix + "id")
104+
),
105+
wrapAsRestResponseListener(listeners.acquire(EnrichRestActionCancellationIT::assertOK))
106+
);
107+
}
108+
}
109+
})
110+
111+
.addListener(cancelFuture)
112+
);
113+
cancelFuture.get(10, TimeUnit.SECONDS);
114+
expectThrows(Exception.class, () -> responseFuture.get(10, TimeUnit.SECONDS));
115+
}
116+
117+
assertAllTasksHaveFinished(actionName);
118+
} catch (Exception e) {
119+
fail(e);
120+
}
121+
}
122+
123+
private static Request getListTasksRequest(String taskNode, String actionName) {
124+
final var listTasksRequest = new Request(HttpGet.METHOD_NAME, "/_tasks");
125+
listTasksRequest.addParameter("nodes", taskNode);
126+
listTasksRequest.addParameter("actions", actionName);
127+
listTasksRequest.addParameter("group_by", "none");
128+
return listTasksRequest;
129+
}
130+
131+
private static Request getCancelTaskRequest(String taskNode, int taskId) {
132+
final var cancelTaskRequest = new Request(HttpPost.METHOD_NAME, Strings.format("/_tasks/%s:%d/_cancel", taskNode, taskId));
133+
cancelTaskRequest.addParameter("wait_for_completion", null);
134+
return cancelTaskRequest;
135+
}
136+
137+
public static void assertOK(Response response) {
138+
assertThat(response.getStatusLine().getStatusCode(), oneOf(200, 201));
139+
}
140+
141+
}

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,17 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.support.ActionFilters;
11-
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
11+
import org.elasticsearch.action.support.ChannelActionListener;
12+
import org.elasticsearch.action.support.local.TransportLocalClusterStateAction;
1213
import org.elasticsearch.cluster.ClusterState;
1314
import org.elasticsearch.cluster.block.ClusterBlockException;
1415
import org.elasticsearch.cluster.block.ClusterBlockLevel;
15-
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1616
import org.elasticsearch.cluster.service.ClusterService;
1717
import org.elasticsearch.common.util.concurrent.EsExecutors;
18+
import org.elasticsearch.core.UpdateForV10;
1819
import org.elasticsearch.injection.guice.Inject;
20+
import org.elasticsearch.tasks.CancellableTask;
1921
import org.elasticsearch.tasks.Task;
20-
import org.elasticsearch.threadpool.ThreadPool;
2122
import org.elasticsearch.transport.TransportService;
2223
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
2324
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
@@ -26,32 +27,38 @@
2627
import java.util.HashMap;
2728
import java.util.Map;
2829

29-
public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadAction<
30+
public class TransportGetEnrichPolicyAction extends TransportLocalClusterStateAction<
3031
GetEnrichPolicyAction.Request,
3132
GetEnrichPolicyAction.Response> {
3233

34+
/**
35+
* NB prior to 9.0 this was a TransportMasterNodeReadAction so for BwC it must be registered with the TransportService until
36+
* we no longer need to support calling this action remotely.
37+
*/
38+
@UpdateForV10(owner = UpdateForV10.Owner.DATA_MANAGEMENT)
39+
@SuppressWarnings("this-escape")
3340
@Inject
34-
public TransportGetEnrichPolicyAction(
35-
TransportService transportService,
36-
ClusterService clusterService,
37-
ThreadPool threadPool,
38-
ActionFilters actionFilters,
39-
IndexNameExpressionResolver indexNameExpressionResolver
40-
) {
41+
public TransportGetEnrichPolicyAction(TransportService transportService, ClusterService clusterService, ActionFilters actionFilters) {
4142
super(
4243
GetEnrichPolicyAction.NAME,
43-
transportService,
44-
clusterService,
45-
threadPool,
4644
actionFilters,
47-
GetEnrichPolicyAction.Request::new,
48-
GetEnrichPolicyAction.Response::new,
45+
transportService.getTaskManager(),
46+
clusterService,
4947
EsExecutors.DIRECT_EXECUTOR_SERVICE
5048
);
49+
50+
transportService.registerRequestHandler(
51+
actionName,
52+
executor,
53+
false,
54+
true,
55+
GetEnrichPolicyAction.Request::new,
56+
(request, channel, task) -> executeDirect(task, request, new ChannelActionListener<>(channel))
57+
);
5158
}
5259

5360
@Override
54-
protected void masterOperation(
61+
protected void localClusterStateOperation(
5562
Task task,
5663
GetEnrichPolicyAction.Request request,
5764
ClusterState state,
@@ -71,6 +78,7 @@ protected void masterOperation(
7178
}
7279
}
7380
}
81+
((CancellableTask) task).ensureNotCancelled();
7482
listener.onResponse(new GetEnrichPolicyAction.Response(policies));
7583
}
7684

x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/rest/RestGetEnrichPolicyAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.rest.RestUtils;
1414
import org.elasticsearch.rest.Scope;
1515
import org.elasticsearch.rest.ServerlessScope;
16+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1617
import org.elasticsearch.rest.action.RestToXContentListener;
1718
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
1819

@@ -39,6 +40,10 @@ protected RestChannelConsumer prepareRequest(final RestRequest restRequest, fina
3940
RestUtils.getMasterNodeTimeout(restRequest),
4041
Strings.splitStringByCommaToArray(restRequest.param("name"))
4142
);
42-
return channel -> client.execute(GetEnrichPolicyAction.INSTANCE, request, new RestToXContentListener<>(channel));
43+
return channel -> new RestCancellableNodeClient(client, restRequest.getHttpChannel()).execute(
44+
GetEnrichPolicyAction.INSTANCE,
45+
request,
46+
new RestToXContentListener<>(channel)
47+
);
4348
}
4449
}

x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/GetEnrichPolicyActionRequestTests.java

Lines changed: 0 additions & 29 deletions
This file was deleted.

0 commit comments

Comments
 (0)