Skip to content

Commit 1f83fe6

Browse files
committed
[CCR] Change resume follow api to be a master node action (elastic#35249)
In order to start shard follow tasks, the resume follow api already needs execute N requests to the elected master node. The pause follow API is also a master node action, which would make how both APIs execute more consistent.
1 parent 98c8d12 commit 1f83fe6

File tree

2 files changed

+25
-10
lines changed

2 files changed

+25
-10
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportResumeFollowAction.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.support.ActionFilters;
11-
import org.elasticsearch.action.support.HandledTransportAction;
1211
import org.elasticsearch.action.support.master.AcknowledgedResponse;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
1313
import org.elasticsearch.client.Client;
1414
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.block.ClusterBlockException;
16+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1517
import org.elasticsearch.cluster.metadata.IndexMetaData;
1618
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1719
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -50,7 +52,7 @@
5052
import java.util.Set;
5153
import java.util.stream.Collectors;
5254

53-
public class TransportResumeFollowAction extends HandledTransportAction<ResumeFollowAction.Request, AcknowledgedResponse> {
55+
public class TransportResumeFollowAction extends TransportMasterNodeAction<ResumeFollowAction.Request, AcknowledgedResponse> {
5456

5557
static final ByteSizeValue DEFAULT_MAX_READ_REQUEST_SIZE = new ByteSizeValue(32, ByteSizeUnit.MB);
5658
static final ByteSizeValue DEFAULT_MAX_WRITE_REQUEST_SIZE = new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES);
@@ -64,7 +66,6 @@ public class TransportResumeFollowAction extends HandledTransportAction<ResumeFo
6466
static final TimeValue DEFAULT_READ_POLL_TIMEOUT = TimeValue.timeValueMinutes(1);
6567

6668
private final Client client;
67-
private final ClusterService clusterService;
6869
private final PersistentTasksService persistentTasksService;
6970
private final IndicesService indicesService;
7071
private final CcrLicenseChecker ccrLicenseChecker;
@@ -81,24 +82,38 @@ public TransportResumeFollowAction(
8182
final PersistentTasksService persistentTasksService,
8283
final IndicesService indicesService,
8384
final CcrLicenseChecker ccrLicenseChecker) {
84-
super(settings, ResumeFollowAction.NAME, threadPool, transportService, actionFilters,
85+
super(settings, ResumeFollowAction.NAME, true, transportService, clusterService, threadPool, actionFilters,
8586
ResumeFollowAction.Request::new, indexNameExpressionResolver);
8687
this.client = client;
87-
this.clusterService = clusterService;
8888
this.persistentTasksService = persistentTasksService;
8989
this.indicesService = indicesService;
9090
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
9191
}
9292

9393
@Override
94-
protected void doExecute(final ResumeFollowAction.Request request,
95-
final ActionListener<AcknowledgedResponse> listener) {
94+
protected String executor() {
95+
return ThreadPool.Names.SAME;
96+
}
97+
98+
@Override
99+
protected AcknowledgedResponse newResponse() {
100+
return new AcknowledgedResponse();
101+
}
102+
103+
@Override
104+
protected ClusterBlockException checkBlock(ResumeFollowAction.Request request, ClusterState state) {
105+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
106+
}
107+
108+
@Override
109+
protected void masterOperation(final ResumeFollowAction.Request request,
110+
ClusterState state,
111+
final ActionListener<AcknowledgedResponse> listener) throws Exception {
96112
if (ccrLicenseChecker.isCcrAllowed() == false) {
97113
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
98114
return;
99115
}
100116

101-
final ClusterState state = clusterService.state();
102117
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex());
103118
if (followerIndexMetadata == null) {
104119
listener.onFailure(new IndexNotFoundException(request.getFollowerIndex()));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ResumeFollowAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
package org.elasticsearch.xpack.core.ccr.action;
88

99
import org.elasticsearch.action.Action;
10-
import org.elasticsearch.action.ActionRequest;
1110
import org.elasticsearch.action.ActionRequestBuilder;
1211
import org.elasticsearch.action.ActionRequestValidationException;
1312
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1413
import org.elasticsearch.client.ElasticsearchClient;
14+
import org.elasticsearch.action.support.master.MasterNodeRequest;
1515
import org.elasticsearch.common.ParseField;
1616
import org.elasticsearch.common.io.stream.StreamInput;
1717
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -46,7 +46,7 @@ public AcknowledgedResponse newResponse() {
4646
return new AcknowledgedResponse();
4747
}
4848

49-
public static class Request extends ActionRequest implements ToXContentObject {
49+
public static class Request extends MasterNodeRequest<Request> implements ToXContentObject {
5050

5151
static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
5252
static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count");

0 commit comments

Comments
 (0)