Skip to content

Commit 9929bc9

Browse files
committed
[CCR] Cleanup pause follow action (elastic#34183)
* Change the `TransportPauseFollowAction` to extend from `TransportMasterNodeAction` instead of `HandledAction`, this removes a sync cluster state api call. * Introduced `ResponseHandler` that removes duplicated code in `TransportPauseFollowAction` and `TransportResumeFollowAction`. * Changed `PauseFollowAction.Request` to not use `readFrom()`.
1 parent aa8b5e8 commit 9929bc9

File tree

7 files changed

+131
-137
lines changed

7 files changed

+131
-137
lines changed
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.action;
7+
8+
import org.elasticsearch.action.ActionListener;
9+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
10+
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicReferenceArray;
13+
14+
final class ResponseHandler {
15+
16+
private final AtomicInteger counter;
17+
private final AtomicReferenceArray<Object> responses;
18+
private final ActionListener<AcknowledgedResponse> listener;
19+
20+
ResponseHandler(int numRequests, ActionListener<AcknowledgedResponse> listener) {
21+
this.counter = new AtomicInteger(numRequests);
22+
this.responses = new AtomicReferenceArray<>(numRequests);
23+
this.listener = listener;
24+
}
25+
26+
<T> ActionListener<T> getActionListener(final int requestId) {
27+
return new ActionListener<T>() {
28+
29+
@Override
30+
public void onResponse(T response) {
31+
responses.set(requestId, response);
32+
finalizeResponse();
33+
}
34+
35+
@Override
36+
public void onFailure(Exception e) {
37+
responses.set(requestId, e);
38+
finalizeResponse();
39+
}
40+
};
41+
}
42+
43+
private void finalizeResponse() {
44+
Exception error = null;
45+
if (counter.decrementAndGet() == 0) {
46+
for (int j = 0; j < responses.length(); j++) {
47+
Object response = responses.get(j);
48+
if (response instanceof Exception) {
49+
if (error == null) {
50+
error = (Exception) response;
51+
} else {
52+
error.addSuppressed((Exception) response);
53+
}
54+
}
55+
}
56+
57+
if (error == null) {
58+
listener.onResponse(new AcknowledgedResponse(true));
59+
} else {
60+
listener.onFailure(error);
61+
}
62+
}
63+
}
64+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPauseFollowAction.java

Lines changed: 49 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
package org.elasticsearch.xpack.ccr.action;
88

99
import org.elasticsearch.action.ActionListener;
10-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
1110
import org.elasticsearch.action.support.ActionFilters;
12-
import org.elasticsearch.action.support.HandledTransportAction;
1311
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14-
import org.elasticsearch.client.Client;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.block.ClusterBlockException;
15+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1516
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
17+
import org.elasticsearch.cluster.service.ClusterService;
1618
import org.elasticsearch.common.inject.Inject;
1719
import org.elasticsearch.common.settings.Settings;
1820
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@@ -22,13 +24,10 @@
2224
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
2325

2426
import java.util.List;
25-
import java.util.concurrent.atomic.AtomicInteger;
26-
import java.util.concurrent.atomic.AtomicReferenceArray;
2727
import java.util.stream.Collectors;
2828

29-
public class TransportPauseFollowAction extends HandledTransportAction<PauseFollowAction.Request, AcknowledgedResponse> {
29+
public class TransportPauseFollowAction extends TransportMasterNodeAction<PauseFollowAction.Request, AcknowledgedResponse> {
3030

31-
private final Client client;
3231
private final PersistentTasksService persistentTasksService;
3332

3433
@Inject
@@ -38,84 +37,58 @@ public TransportPauseFollowAction(
3837
final TransportService transportService,
3938
final ActionFilters actionFilters,
4039
final IndexNameExpressionResolver indexNameExpressionResolver,
41-
final Client client,
40+
final ClusterService clusterService,
4241
final PersistentTasksService persistentTasksService) {
43-
super(settings, PauseFollowAction.NAME, threadPool, transportService, actionFilters,
42+
super(settings, PauseFollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
4443
indexNameExpressionResolver, PauseFollowAction.Request::new);
45-
this.client = client;
4644
this.persistentTasksService = persistentTasksService;
4745
}
4846

4947
@Override
50-
protected void doExecute(final PauseFollowAction.Request request, final ActionListener<AcknowledgedResponse> listener) {
51-
52-
client.admin().cluster().state(new ClusterStateRequest(), ActionListener.wrap(r -> {
53-
PersistentTasksCustomMetaData persistentTasksMetaData = r.getState().metaData().custom(PersistentTasksCustomMetaData.TYPE);
54-
if (persistentTasksMetaData == null) {
55-
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
56-
return;
57-
}
58-
59-
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
60-
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
61-
.filter(persistentTask -> {
62-
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
63-
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
64-
})
65-
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
66-
.collect(Collectors.toList());
67-
68-
if (shardFollowTaskIds.isEmpty()) {
69-
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
70-
return;
71-
}
72-
73-
final AtomicInteger counter = new AtomicInteger(shardFollowTaskIds.size());
74-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(shardFollowTaskIds.size());
75-
int i = 0;
76-
77-
for (String taskId : shardFollowTaskIds) {
78-
final int shardId = i++;
79-
persistentTasksService.sendRemoveRequest(taskId,
80-
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
81-
@Override
82-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> task) {
83-
responses.set(shardId, task);
84-
finalizeResponse();
85-
}
48+
protected String executor() {
49+
return ThreadPool.Names.SAME;
50+
}
8651

87-
@Override
88-
public void onFailure(Exception e) {
89-
responses.set(shardId, e);
90-
finalizeResponse();
91-
}
52+
@Override
53+
protected AcknowledgedResponse newResponse() {
54+
return new AcknowledgedResponse();
55+
}
9256

93-
void finalizeResponse() {
94-
Exception error = null;
95-
if (counter.decrementAndGet() == 0) {
96-
for (int j = 0; j < responses.length(); j++) {
97-
Object response = responses.get(j);
98-
if (response instanceof Exception) {
99-
if (error == null) {
100-
error = (Exception) response;
101-
} else {
102-
error.addSuppressed((Throwable) response);
103-
}
104-
}
105-
}
57+
@Override
58+
protected void masterOperation(PauseFollowAction.Request request,
59+
ClusterState state,
60+
ActionListener<AcknowledgedResponse> listener) throws Exception {
61+
PersistentTasksCustomMetaData persistentTasksMetaData = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
62+
if (persistentTasksMetaData == null) {
63+
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
64+
return;
65+
}
66+
67+
List<String> shardFollowTaskIds = persistentTasksMetaData.tasks().stream()
68+
.filter(persistentTask -> ShardFollowTask.NAME.equals(persistentTask.getTaskName()))
69+
.filter(persistentTask -> {
70+
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
71+
return shardFollowTask.getFollowShardId().getIndexName().equals(request.getFollowIndex());
72+
})
73+
.map(PersistentTasksCustomMetaData.PersistentTask::getId)
74+
.collect(Collectors.toList());
75+
76+
if (shardFollowTaskIds.isEmpty()) {
77+
listener.onFailure(new IllegalArgumentException("no shard follow tasks for [" + request.getFollowIndex() + "]"));
78+
return;
79+
}
80+
81+
int i = 0;
82+
final ResponseHandler responseHandler = new ResponseHandler(shardFollowTaskIds.size(), listener);
83+
for (String taskId : shardFollowTaskIds) {
84+
final int taskSlot = i++;
85+
persistentTasksService.sendRemoveRequest(taskId, responseHandler.getActionListener(taskSlot));
86+
}
87+
}
10688

107-
if (error == null) {
108-
// include task ids?
109-
listener.onResponse(new AcknowledgedResponse(true));
110-
} else {
111-
// TODO: cancel all started tasks
112-
listener.onFailure(error);
113-
}
114-
}
115-
}
116-
});
117-
}
118-
}, listener::onFailure));
89+
@Override
90+
protected ClusterBlockException checkBlock(PauseFollowAction.Request request, ClusterState state) {
91+
return state.blocks().indexBlockedException(ClusterBlockLevel.METADATA_WRITE, request.getFollowIndex());
11992
}
12093

12194
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

Lines changed: 4 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.indices.IndicesRequestCache;
3434
import org.elasticsearch.indices.IndicesService;
3535
import org.elasticsearch.license.LicenseUtils;
36-
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3736
import org.elasticsearch.persistent.PersistentTasksService;
3837
import org.elasticsearch.threadpool.ThreadPool;
3938
import org.elasticsearch.transport.TransportService;
@@ -49,8 +48,6 @@
4948
import java.util.Map;
5049
import java.util.Objects;
5150
import java.util.Set;
52-
import java.util.concurrent.atomic.AtomicInteger;
53-
import java.util.concurrent.atomic.AtomicReferenceArray;
5451
import java.util.stream.Collectors;
5552

5653
public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
@@ -145,62 +142,22 @@ void start(
145142
IndexMetaData leaderIndexMetadata,
146143
IndexMetaData followIndexMetadata,
147144
String[] leaderIndexHistoryUUIDs,
148-
ActionListener<AcknowledgedResponse> handler) throws IOException {
145+
ActionListener<AcknowledgedResponse> listener) throws IOException {
149146

150147
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null;
151148
validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService);
152149
final int numShards = followIndexMetadata.getNumberOfShards();
153-
final AtomicInteger counter = new AtomicInteger(numShards);
154-
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards());
150+
final ResponseHandler handler = new ResponseHandler(numShards, listener);
155151
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream()
156152
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey()))
157153
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
158154

159-
for (int i = 0; i < numShards; i++) {
160-
final int shardId = i;
155+
for (int shardId = 0; shardId < numShards; shardId++) {
161156
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId;
162157

163158
final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request,
164159
leaderIndexMetadata, followIndexMetadata, filteredHeaders);
165-
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,
166-
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() {
167-
@Override
168-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask> task) {
169-
responses.set(shardId, task);
170-
finalizeResponse();
171-
}
172-
173-
@Override
174-
public void onFailure(Exception e) {
175-
responses.set(shardId, e);
176-
finalizeResponse();
177-
}
178-
179-
void finalizeResponse() {
180-
Exception error = null;
181-
if (counter.decrementAndGet() == 0) {
182-
for (int j = 0; j < responses.length(); j++) {
183-
Object response = responses.get(j);
184-
if (response instanceof Exception) {
185-
if (error == null) {
186-
error = (Exception) response;
187-
} else {
188-
error.addSuppressed((Throwable) response);
189-
}
190-
}
191-
}
192-
193-
if (error == null) {
194-
// include task ids?
195-
handler.onResponse(new AcknowledgedResponse(true));
196-
} else {
197-
// TODO: cancel all started tasks
198-
handler.onFailure(error);
199-
}
200-
}
201-
}
202-
}
203-
);
160+
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, handler.getActionListener(shardId));
204161
}
205162
}
206163

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseFollowAction.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ public String getName() {
3131

3232
@Override
3333
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
34-
Request request = new Request();
35-
request.setFollowIndex(restRequest.param("index"));
34+
Request request = new Request(restRequest.param("index"));
3635
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
3736
}
3837
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,8 +359,7 @@ public void testFollowIndexWithNestedField() throws Exception {
359359
}
360360

361361
public void testUnfollowNonExistingIndex() {
362-
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
363-
unfollowRequest.setFollowIndex("non-existing-index");
362+
PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request("non-existing-index");
364363
expectThrows(IllegalArgumentException.class,
365364
() -> followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).actionGet());
366365
}
@@ -753,8 +752,7 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
753752

754753
private void pauseFollow(String... indices) throws Exception {
755754
for (String index : indices) {
756-
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request();
757-
unfollowRequest.setFollowIndex(index);
755+
final PauseFollowAction.Request unfollowRequest = new PauseFollowAction.Request(index);
758756
followerClient().execute(PauseFollowAction.INSTANCE, unfollowRequest).get();
759757
}
760758
ensureNoCcrTasks();

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/LocalIndexFollowingIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ public void testFollowIndex() throws Exception {
5252
assertThat(client().prepareSearch("follower").get().getHits().totalHits, equalTo(firstBatchNumDocs + secondBatchNumDocs));
5353
});
5454

55-
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request();
56-
pauseRequest.setFollowIndex("follower");
55+
PauseFollowAction.Request pauseRequest = new PauseFollowAction.Request("follower");
5756
client().execute(PauseFollowAction.INSTANCE, pauseRequest);
5857

5958
final long thirdBatchNumDocs = randomIntBetween(2, 64);

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/PauseFollowAction.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@
77
package org.elasticsearch.xpack.core.ccr.action;
88

99
import org.elasticsearch.action.Action;
10-
import org.elasticsearch.action.ActionRequest;
1110
import org.elasticsearch.action.ActionRequestBuilder;
1211
import org.elasticsearch.action.ActionRequestValidationException;
1312
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1413
import org.elasticsearch.client.ElasticsearchClient;
14+
import org.elasticsearch.action.support.master.MasterNodeRequest;
1515
import org.elasticsearch.common.io.stream.StreamInput;
1616
import org.elasticsearch.common.io.stream.StreamOutput;
1717

1818
import java.io.IOException;
19+
import java.util.Objects;
1920

2021
public class PauseFollowAction extends Action<PauseFollowAction.Request, AcknowledgedResponse, PauseFollowAction.RequestBuilder> {
2122

@@ -31,16 +32,19 @@ public AcknowledgedResponse newResponse() {
3132
return new AcknowledgedResponse();
3233
}
3334

34-
public static class Request extends ActionRequest {
35+
public static class Request extends MasterNodeRequest<Request> {
3536

3637
private String followIndex;
3738

38-
public String getFollowIndex() {
39-
return followIndex;
39+
public Request(String followIndex) {
40+
this.followIndex = Objects.requireNonNull(followIndex, "followIndex");
41+
}
42+
43+
public Request() {
4044
}
4145

42-
public void setFollowIndex(final String followIndex) {
43-
this.followIndex = followIndex;
46+
public String getFollowIndex() {
47+
return followIndex;
4448
}
4549

4650
@Override

0 commit comments

Comments
 (0)