Skip to content
5 changes: 5 additions & 0 deletions docs/changelog/94510.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 94510
summary: Reduce WaitForNoFollowersStep requests indices shard stats
area: ILM+SLM
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.ilm.step.info.SingleMessageFieldInfo;

import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Optional;

/**
Expand Down Expand Up @@ -49,10 +53,24 @@ public boolean isRetryable() {

@Override
public void evaluateCondition(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
XPackInfoRequest xPackInfoRequest = new XPackInfoRequest();
xPackInfoRequest.setCategories(EnumSet.of(XPackInfoRequest.Category.FEATURES));
getClient().execute(XPackInfoFeatureAction.CCR, xPackInfoRequest, ActionListener.wrap((xPackInfoResponse) -> {
XPackInfoResponse.FeatureSetsInfo.FeatureSet featureSet = xPackInfoResponse.getInfo();
if (featureSet != null && featureSet.enabled() == false) {
listener.onResponse(true, null);
return;
}
leaderIndexCheck(metadata, index, listener, masterTimeout);
}, listener::onFailure));
}

private void leaderIndexCheck(Metadata metadata, Index index, Listener listener, TimeValue masterTimeout) {
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
String indexName = index.getName();
request.indices(indexName);

getClient().admin().indices().stats(request, ActionListener.wrap((response) -> {
IndexStats indexStats = response.getIndex(indexName);
if (indexStats == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse;
import org.mockito.Mockito;

import java.nio.file.Path;
Expand Down Expand Up @@ -64,7 +67,7 @@ protected WaitForNoFollowersStep copyInstance(WaitForNoFollowersStep instance) {
return new WaitForNoFollowersStep(instance.getKey(), instance.getNextStepKey(), instance.getClient());
}

public void testConditionMet() {
public void testConditionMetWhenCCREnabled() {
WaitForNoFollowersStep step = createRandomInstance();

String indexName = randomAlphaOfLengthBetween(5, 10);
Expand All @@ -76,6 +79,41 @@ public void testConditionMet() {
.numberOfReplicas(randomIntBetween(1, 10))
.build();

mockXPackInfo(true, true);
mockIndexStatsCall(indexName, randomIndexStats(false, numberOfShards));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
final SetOnce<ToXContentObject> stepInfoHolder = new SetOnce<>();
step.evaluateCondition(Metadata.builder().put(indexMetadata, true).build(), indexMetadata.getIndex(), new AsyncWaitStep.Listener() {
@Override
public void onResponse(boolean conditionMet, ToXContentObject infomationContext) {
conditionMetHolder.set(conditionMet);
stepInfoHolder.set(infomationContext);
}

@Override
public void onFailure(Exception e) {
fail("onFailure should not be called in this test, called with exception: " + e.getMessage());
}
}, MASTER_TIMEOUT);

assertTrue(conditionMetHolder.get());
assertNull(stepInfoHolder.get());
}

public void testConditionMetWhenCCRDisabled() {
WaitForNoFollowersStep step = createRandomInstance();

String indexName = randomAlphaOfLengthBetween(5, 10);

int numberOfShards = randomIntBetween(1, 100);
final IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.settings(settings(Version.CURRENT))
.numberOfShards(numberOfShards)
.numberOfReplicas(randomIntBetween(1, 10))
.build();

mockXPackInfo(false, false);
mockIndexStatsCall(indexName, randomIndexStats(false, numberOfShards));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
Expand Down Expand Up @@ -109,6 +147,7 @@ public void testConditionNotMet() {
.numberOfReplicas(randomIntBetween(1, 10))
.build();

mockXPackInfo(true, true);
mockIndexStatsCall(indexName, randomIndexStats(true, numberOfShards));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
Expand Down Expand Up @@ -148,6 +187,8 @@ public void testNoShardStats() {
ShardStats sStats = new ShardStats(null, mockShardPath(), null, null, null, null);
ShardStats[] shardStats = new ShardStats[1];
shardStats[0] = sStats;

mockXPackInfo(true, true);
mockIndexStatsCall(indexName, new IndexStats(indexName, "uuid", ClusterHealthStatus.GREEN, IndexMetadata.State.OPEN, shardStats));

final SetOnce<Boolean> conditionMetHolder = new SetOnce<>();
Expand Down Expand Up @@ -183,6 +224,7 @@ public void testFailure() {

final Exception expectedException = new RuntimeException(randomAlphaOfLength(5));

mockXPackInfo(true, true);
Mockito.doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<IndicesStatsResponse> listener = (ActionListener<IndicesStatsResponse>) invocationOnMock.getArguments()[1];
Expand Down Expand Up @@ -211,6 +253,19 @@ public void onFailure(Exception e) {
assertThat(exceptionHolder.get(), equalTo(expectedException));
}

private void mockXPackInfo(boolean available, boolean enabled) {
Mockito.doAnswer(invocationOnMock -> {

@SuppressWarnings("unchecked")
ActionListener<XPackInfoFeatureResponse> listener = (ActionListener<XPackInfoFeatureResponse>) invocationOnMock
.getArguments()[2];
var featureSet = new XPackInfoResponse.FeatureSetsInfo.FeatureSet("ccr", available, enabled);
XPackInfoFeatureResponse xPackInfoFeatureResponse = new XPackInfoFeatureResponse(featureSet);
listener.onResponse(xPackInfoFeatureResponse);
return null;
}).when(client).execute(Mockito.same(XPackInfoFeatureAction.CCR), Mockito.any(), Mockito.any());
}

private void mockIndexStatsCall(String expectedIndexName, IndexStats indexStats) {
Mockito.doAnswer(invocationOnMock -> {
IndicesStatsRequest request = (IndicesStatsRequest) invocationOnMock.getArguments()[0];
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugin/ilm/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ archivesBaseName = 'x-pack-ilm'
dependencies {
compileOnly project(path: xpackModule('core'))
testImplementation(testArtifact(project(xpackModule('core'))))
testImplementation project(xpackModule('ccr'))
testImplementation project(':modules:data-streams')
testImplementation project(':modules:dlm')
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleRequest;
Expand Down Expand Up @@ -68,7 +69,7 @@ protected boolean ignoreExternalCluster() {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class);
return Arrays.asList(LocalStateCompositeXPackPlugin.class, IndexLifecycle.class, Ccr.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleRequest;
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleResponse;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleExplainResponse;
Expand Down Expand Up @@ -46,7 +48,7 @@ public class ILMMultiNodeIT extends ESIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, DataStreamsPlugin.class, IndexLifecycle.class);
return Arrays.asList(LocalStateCompositeXPackPlugin.class, DataStreamsPlugin.class, IndexLifecycle.class, Ccr.class);
}

@Override
Expand All @@ -56,6 +58,7 @@ protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
// This just generates less churn and makes it easier to read the log file if needed
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
.put(XPackSettings.CCR_ENABLED_SETTING.getKey(), true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ilm;

import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleRequest;
import org.elasticsearch.xpack.core.ilm.ExplainLifecycleResponse;
import org.elasticsearch.xpack.core.ilm.IndexLifecycleExplainResponse;
import org.elasticsearch.xpack.core.ilm.LifecycleAction;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.action.ExplainLifecycleAction;
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.core.ilm.ShrinkIndexNameSupplier.SHRUNKEN_INDEX_PREFIX;
import static org.hamcrest.Matchers.equalTo;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
public class ILMMultiNodeWithCCRDisabledIT extends ESIntegTestCase {
private static final String index = "myindex";

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(LocalStateCompositeXPackPlugin.class, DataStreamsPlugin.class, IndexLifecycle.class, Ccr.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s")
// This just generates less churn and makes it easier to read the log file if needed
.put(LifecycleSettings.LIFECYCLE_HISTORY_INDEX_ENABLED, false)
.put(XPackSettings.CCR_ENABLED_SETTING.getKey(), false)
.build();
}

public void testShrinkOnTiers() throws Exception {
startHotOnlyNode();
startWarmOnlyNode();
ensureGreen();
Map<String, LifecycleAction> actions = new HashMap<>();
RolloverAction rolloverAction = new RolloverAction(null, null, null, 1L, null, null, null, null, null, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can skip the hot phase completely to save a few cycles in this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can skip the hot phase completely to save a few cycles in this test

I need ShrinkAction to trigger WaitForNoFollowersStep and shrink action need a read-only index. so i combine the RolloverAction which is for enable read-only index AND ShrinkAction into one hot phase. and skip the warm phase

ShrinkAction shrinkAction = new ShrinkAction(1, null);
actions.put(rolloverAction.getWriteableName(), rolloverAction);
actions.put(shrinkAction.getWriteableName(), shrinkAction);
Phase hotPhase = new Phase("hot", TimeValue.ZERO, actions);

LifecyclePolicy lifecyclePolicy = new LifecyclePolicy("shrink-policy", Collections.singletonMap(hotPhase.getName(), hotPhase));
client().execute(PutLifecycleAction.INSTANCE, new PutLifecycleAction.Request(lifecyclePolicy)).get();

Template t = new Template(
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(LifecycleSettings.LIFECYCLE_NAME, "shrink-policy")
.build(),
null,
null
);

ComposableIndexTemplate template = new ComposableIndexTemplate(
Collections.singletonList(index),
t,
null,
null,
null,
null,
new ComposableIndexTemplate.DataStreamTemplate(),
null
);
client().execute(
PutComposableIndexTemplateAction.INSTANCE,
new PutComposableIndexTemplateAction.Request("template").indexTemplate(template)
).actionGet();
client().prepareIndex(index).setCreate(true).setId("1").setSource("@timestamp", "2020-09-09").get();

assertBusy(() -> {
ExplainLifecycleResponse explain = client().execute(ExplainLifecycleAction.INSTANCE, new ExplainLifecycleRequest().indices("*"))
.get();
logger.info("--> explain: {}", Strings.toString(explain));

String backingIndexName = DataStream.getDefaultBackingIndexName(index, 1);
IndexLifecycleExplainResponse indexResp = null;
for (Map.Entry<String, IndexLifecycleExplainResponse> indexNameAndResp : explain.getIndexResponses().entrySet()) {
if (indexNameAndResp.getKey().startsWith(SHRUNKEN_INDEX_PREFIX) && indexNameAndResp.getKey().contains(backingIndexName)) {
indexResp = indexNameAndResp.getValue();
assertNotNull(indexResp);
assertThat(indexResp.getPhase(), equalTo("hot"));
assertThat(indexResp.getStep(), equalTo("complete"));
break;
}
}

assertNotNull("Unable to find an ilm explain output for the shrunk index of " + index, indexResp);
}, 30, TimeUnit.SECONDS);
}

public void startHotOnlyNode() {
Settings nodeSettings = Settings.builder().putList("node.roles", Arrays.asList("master", "data_hot", "ingest")).build();
internalCluster().startNode(nodeSettings);
}

public void startWarmOnlyNode() {
Settings nodeSettings = Settings.builder().putList("node.roles", Arrays.asList("master", "data_warm", "ingest")).build();
internalCluster().startNode(nodeSettings);
}
}