Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo;

import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Optional;

/**
Expand Down Expand Up @@ -49,10 +53,31 @@ public boolean isRetryable() {

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
XPackInfoRequest xPackInfoRequest = new XPackInfoRequest();
xPackInfoRequest.setCategories(EnumSet.of(XPackInfoRequest.Category.FEATURES));
getClient().execute(
XPackInfoAction.INSTANCE,
xPackInfoRequest,
ActionListener.wrap((xPackInfoResponse) -> {
XPackInfoResponse.FeatureSetsInfo featureSetsInfo = xPackInfoResponse.getFeatureSetsInfo();
if (featureSetsInfo != null) {
XPackInfoResponse.FeatureSetsInfo.FeatureSet featureSet = featureSetsInfo.getFeatureSets().get(CCR_LEASE_KEY);
if (featureSet != null && (featureSet.available() == false || featureSet.enabled() == false)){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's please just check if the featureSet is enabled here.
e.g.

if (featureSet != null && featureSet.enabled() == false) 

We'd like to maintain the same behaviour for this step even if CCR is unavailable (e.g. license is expired)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd like to maintain the same behaviour for this step even if CCR is unavailable (e.g. license is expired)

LGTM

listener.onResponse(true, null);
return;
}
}
leaderIndexCheck(metadata, index, listener, masterTimeout);
}, listener::onFailure)
);
}

private void leaderIndexCheck(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
String indexName = index.getName();
request.indices(indexName);

getClient().admin().indices().stats(request, ActionListener.wrap((response) -> {
IndexStats indexStats = response.getIndex(indexName);
if (indexStats == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.mockito.Mockito;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;

import static org.elasticsearch.xpack.core.ilm.WaitForNoFollowersStep.CCR_LEASE_KEY;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -64,7 +68,7 @@ protected WaitForNoFollowersStep copyInstance(WaitForNoFollowersStep instance) {
return new WaitForNoFollowersStep(instance.getKey(), instance.getNextStepKey(), instance.getClient());
}

public void testConditionMet() {
public void testConditionMetWhenEnable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we name this testConditionMetWhenCCREnabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

WaitForNoFollowersStep step = createRandomInstance();

String indexName = randomAlphaOfLengthBetween(5, 10);
Expand All @@ -76,6 +80,41 @@ public void testConditionMet() {
.numberOfReplicas(randomIntBetween(1, 10))
.build();

mockXPackInfo(true, true);
mockIndexStatsCall(indexName, randomIndexStats(false, numberOfShards));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
final SetOnce<ToXContentObject> stepInfoHolder = new SetOnce<>();
step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject infomationContext) {
conditionMetHolder.set(conditionMet);
stepInfoHolder.set(infomationContext);
}

@Override
public void onFailure(Exception e) {
fail("onFailure should not be called in this test, called with exception: " + e.getMessage());
}
}, MASTER_TIMEOUT);

assertTrue(conditionMetHolder.get());
assertNull(stepInfoHolder.get());
}

public void testConditionMetWhenDisabled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, shall we name this test to include CCRDisabled ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

WaitForNoFollowersStep step = createRandomInstance();

String indexName = randomAlphaOfLengthBetween(5, 10);

int numberOfShards = randomIntBetween(1, 100);
final IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(numberOfShards)
.numberOfReplicas(randomIntBetween(1, 10))
.build();

mockXPackInfo(false, false);
mockIndexStatsCall(indexName, randomIndexStats(false, numberOfShards));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
Expand Down Expand Up @@ -109,6 +148,7 @@ public void testConditionNotMet() {
.numberOfReplicas(randomIntBetween(1, 10))
.build();

mockXPackInfo(true, true);
mockIndexStatsCall(indexName, randomIndexStats(true, numberOfShards));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
Expand Down Expand Up @@ -148,6 +188,8 @@ public void testNoShardStats() {
ShardStats sStats = new ShardStats(null, mockShardPath(), null, null, null, null);
ShardStats[] shardStats = new ShardStats[1];
shardStats[0] = sStats;

mockXPackInfo(true, true);
mockIndexStatsCall(indexName, new IndexStats(indexName, "uuid", ClusterHealthStatus.GREEN, IndexMetadata.State.OPEN, shardStats));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
Expand Down Expand Up @@ -183,6 +225,7 @@ public void testFailure() {

final Exception expectedException = new RuntimeException(randomAlphaOfLength(5));

mockXPackInfo(true, true);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesStatsResponse> listener = (ActionListener<IndicesStatsResponse>) invocationOnMock.getArguments()[1];
Expand Down Expand Up @@ -211,6 +254,22 @@ public void onFailure(Exception e) {
assertThat(exceptionHolder.get(), equalTo(expectedException));
}

private void mockXPackInfo(boolean available, boolean enabled) {
Mockito.doAnswer(invocationOnMock -> {

@SuppressWarnings("unchecked")
ActionListener<XPackInfoResponse> listener = (ActionListener<XPackInfoResponse>) invocationOnMock.getArguments()[2];

Set<XPackInfoResponse.FeatureSetsInfo.FeatureSet> featureSets = new HashSet<>();
featureSets.add(new XPackInfoResponse.FeatureSetsInfo.FeatureSet("ccr", available, enabled));
XPackInfoResponse.FeatureSetsInfo featureInfos = new XPackInfoResponse.FeatureSetsInfo(featureSets);
XPackInfoResponse xpackInfo = new XPackInfoResponse(null, null, featureInfos);

listener.onResponse(xpackInfo);
return null;
}).when(client).execute(Mockito.same(XPackInfoAction.INSTANCE), Mockito.any(), Mockito.any());
}

private void mockIndexStatsCall(String expectedIndexName, IndexStats indexStats) {
Mockito.doAnswer(invocationOnMock -> {
IndicesStatsRequest request = (IndicesStatsRequest) invocationOnMock.getArguments()[0];
Expand Down