Skip to content
8 changes: 8 additions & 0 deletions docs/changelog/133683.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pr: 133683
summary: Avoid running asynchronous ILM actions while ILM is stopped
area: ILM+SLM
type: bug
issues:
- 99859
- 81234
- 85097
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand All @@ -19,18 +20,22 @@
import org.elasticsearch.xpack.core.ilm.DeleteAction;
import org.elasticsearch.xpack.core.ilm.ForceMergeAction;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.Phase;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
import org.junit.Before;

import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.xpack.TimeSeriesRestDriver.createFullPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createIndexWithSettings;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createNewSingletonPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.createPolicy;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.getStepKeyForIndex;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.index;
import static org.elasticsearch.xpack.TimeSeriesRestDriver.indexDocument;
Expand All @@ -51,6 +56,7 @@ public void refreshIndex() {
index = "index-" + randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
policy = "policy-" + randomAlphaOfLength(5);
alias = "alias-" + randomAlphaOfLength(5);
logger.info("--> running [{}] with index [{}], alias [{}] and policy [{}]", getTestName(), index, alias, policy);
}

public void testMoveToAllocateStep() throws Exception {
Expand Down Expand Up @@ -245,6 +251,66 @@ public void testMoveToStepRereadsPolicy() throws Exception {
assertBusy(() -> { indexExists("test-000002"); });
}

/**
* Test that an async action does not execute when the Move To Step API is used while ILM is stopped.
* Unfortunately, this test doesn't prove that the async action never executes, as it's hard to prove that an asynchronous process
* never happens - waiting for a certain period would only increase our confidence but not actually prove it, and it would increase the
* runtime of the test significantly. We also assert that the remainder of the policy executes after ILM is started again to ensure that
* the index is not stuck in the async action step.
*/
public void testAsyncActionDoesNotExecuteAfterILMStop() throws Exception {
String originalIndex = index + "-000001";
// Create a simply policy with the most important aspect being the readonly action, which contains the ReadOnlyStep AsyncActionStep.
var actions = Map.of(
"rollover",
new RolloverAction(RolloverConditions.newBuilder().addMaxIndexAgeCondition(TimeValue.timeValueHours(1)).build()),
"readonly",
new ReadOnlyAction()
);
Phase phase = new Phase("hot", TimeValue.ZERO, actions);
createPolicy(client(), policy, phase, null, null, null, null);

createIndexWithSettings(
client(),
originalIndex,
alias,
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, policy).put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, alias)
);

// Wait for ILM to do everything it can for this index
assertBusy(() -> assertEquals(new StepKey("hot", "rollover", "check-rollover-ready"), getStepKeyForIndex(client(), originalIndex)));

// Stop ILM
client().performRequest(new Request("POST", "/_ilm/stop"));

// Move ILM to the readonly step, which is an async action step.
Request moveToStepRequest = new Request("POST", "_ilm/move/" + originalIndex);
moveToStepRequest.setJsonEntity("""
{
"current_step": {
"phase": "hot",
"action": "rollover",
"name": "check-rollover-ready"
},
"next_step": {
"phase": "hot",
"action": "readonly",
"name": "readonly"
}
}""");
client().performRequest(moveToStepRequest);

// Since ILM is stopped, the async action should not execute and the index should remain in the readonly step.
// This is the tricky part of the test, as we can't really verify that the async action will never happen.
assertEquals(new StepKey("hot", "readonly", "readonly"), getStepKeyForIndex(client(), originalIndex));
Comment on lines +303 to +305
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestions for improved assertions are welcome. Adding a Thread.sleep here would increase our confidence, but still wouldn't make any guarantee.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we write a test that wraps the Client passed to IndexLifecycleService with a wrapper that asserts that we never perform some particular ILM action?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test to IndexLifecycleRunnerTests in 370d966 that verifies the action isn't executed when ILM is stopped. Let me know if that's what you had in mind. I chose for testing IndexLifecycleRunner#maybeRunAsyncAction instead of IndexLifecycleService#maybeRunAsyncAction, as the latter is only used by APIs and the former is also used internally by ILM.


// Restart ILM
client().performRequest(new Request("POST", "/_ilm/start"));

// Make sure we actually complete the remainder of the policy after ILM is started again.
assertBusy(() -> assertEquals(new StepKey("hot", "complete", "complete"), getStepKeyForIndex(client(), originalIndex)));
}

public void testMoveToStepWithInvalidNextStep() throws Exception {
createNewSingletonPolicy(client(), policy, "delete", DeleteAction.WITH_SNAPSHOT_DELETE, TimeValue.timeValueDays(100));
createIndexWithSettings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xpack.core.ilm.ClusterStateWaitStep;
import org.elasticsearch.xpack.core.ilm.ErrorStep;
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.PhaseCompleteStep;
import org.elasticsearch.xpack.core.ilm.Step;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;
Expand All @@ -48,6 +49,7 @@

import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;

class IndexLifecycleRunner {
private static final Logger logger = LogManager.getLogger(IndexLifecycleRunner.class);
Expand Down Expand Up @@ -308,6 +310,12 @@ void onErrorMaybeRetryFailedStep(ProjectId projectId, String policy, StepKey cur
void maybeRunAsyncAction(ProjectState state, IndexMetadata indexMetadata, String policy, StepKey expectedStepKey) {
final var projectId = state.projectId();
String index = indexMetadata.getIndex().getName();
OperationMode currentMode = currentILMMode(state.metadata());
if (OperationMode.RUNNING.equals(currentMode) == false) {
logger.info("[{}] not running async action in policy [{}] because ILM is [{}]", index, policy, currentMode);
return;
}

if (LifecycleSettings.LIFECYCLE_SKIP_SETTING.get(indexMetadata.getSettings())) {
logger.info("[{}] skipping policy [{}] because [{}] is true", index, policy, LifecycleSettings.LIFECYCLE_SKIP);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,88 +184,78 @@ void onMaster(ClusterState clusterState) {
maybeScheduleJob();

for (var projectId : clusterState.metadata().projects().keySet()) {
onMaster(clusterState.projectState(projectId));
maybeRunAsyncActions(clusterState.projectState(projectId));
}
}

void onMaster(ProjectState state) {
/**
* Kicks off any async actions that may not have been run due to either master failover or ILM being manually stopped.
*/
private void maybeRunAsyncActions(ProjectState state) {
final ProjectMetadata projectMetadata = state.metadata();
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata != null) {
OperationMode currentMode = currentILMMode(projectMetadata);
if (OperationMode.STOPPED.equals(currentMode)) {
return;
}

boolean safeToStop = true; // true until proven false by a run policy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this method aren't stricly necessary, I just took this opportunity to clean this method up a bit, as it was becoming hard to read. If there are concerns with this refactor, I can revert these stylistic changes and stick to the bug fix.

// If we just became master, we need to kick off any async actions that
// may have not been run due to master rollover
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
if (projectMetadata.isIndexManagedByILM(idxMeta)) {
String policyName = idxMeta.getLifecyclePolicyName();
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);

try {
if (OperationMode.STOPPING == currentMode) {
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
logger.info(
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(),
policyName,
stepKey.name()
);
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info(
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
stepKey == null ? "n/a" : stepKey.name(),
idxMeta.getIndex().getName(),
policyName
);
}
} else {
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
}
} catch (Exception e) {
if (logger.isTraceEnabled()) {
logger.warn(
() -> format(
"async action execution failed during master election trigger"
+ " for index [%s] with policy [%s] in step [%s], lifecycle state: [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey,
lifecycleState.asMap()
),
e
);
} else {
logger.warn(
() -> format(
"async action execution failed during master election trigger"
+ " for index [%s] with policy [%s] in step [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey
),
e
);
if (currentMetadata == null) {
return;
}
OperationMode currentMode = currentILMMode(projectMetadata);
if (OperationMode.STOPPED.equals(currentMode)) {
return;
}

}
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}
boolean safeToStop = true; // true until proven false by a run policy
for (IndexMetadata idxMeta : projectMetadata.indices().values()) {
if (projectMetadata.isIndexManagedByILM(idxMeta) == false) {
continue;
}
String policyName = idxMeta.getLifecyclePolicyName();
final LifecycleExecutionState lifecycleState = idxMeta.getLifecycleExecutionState();
StepKey stepKey = Step.getCurrentStepKey(lifecycleState);

try {
if (currentMode == OperationMode.RUNNING) {
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
continue;
}
// We only get here if ILM is in STOPPING mode. In that case, we need to check if there is any index that is in a step
// that we can't stop ILM in. If there is, we don't stop ILM yet.
if (stepKey != null && IGNORE_STEPS_MAINTENANCE_REQUESTED.contains(stepKey.name())) {
logger.info(
"waiting to stop ILM because index [{}] with policy [{}] is currently in step [{}]",
idxMeta.getIndex().getName(),
policyName,
stepKey.name()
);
lifecycleRunner.maybeRunAsyncAction(state, idxMeta, policyName, stepKey);
// ILM is trying to stop, but this index is in a Shrink step (or other dangerous step) so we can't stop
safeToStop = false;
} else {
logger.info(
"skipping policy execution of step [{}] for index [{}] with policy [{}]" + " because ILM is stopping",
stepKey == null ? "n/a" : stepKey.name(),
idxMeta.getIndex().getName(),
policyName
);
}
} catch (Exception e) {
String logMessage = format(
"async action execution failed during master election trigger for index [%s] with policy [%s] in step [%s]",
idxMeta.getIndex().getName(),
policyName,
stepKey
);
if (logger.isTraceEnabled()) {
logMessage += format(", lifecycle state: [%s]", lifecycleState.asMap());
}
logger.warn(logMessage, e);

if (safeToStop && OperationMode.STOPPING == currentMode) {
stopILM(state.projectId());
// Don't rethrow the exception, we don't want a failure for one index to be
// called to cause actions not to be triggered for further indices
}
}

if (safeToStop && OperationMode.STOPPING == currentMode) {
stopILM(state.projectId());
}
}

private void stopILM(ProjectId projectId) {
Expand Down Expand Up @@ -333,6 +323,20 @@ public void clusterChanged(ClusterChangedEvent event) {
cancelJob();
policyRegistry.clear();
}
} else if (this.isMaster) {
// If we are the master and we were before, check if any projects changed their ILM mode from non-RUNNING to RUNNING.
// If so, kick off any async actions that may not have run while not in RUNNING mode.
for (ProjectMetadata project : event.state().metadata().projects().values()) {
final var previousProject = event.previousState().metadata().projects().get(project.id());
if (previousProject == null || project == previousProject) {
continue;
}
final OperationMode currentMode = currentILMMode(project);
final OperationMode previousMode = currentILMMode(previousProject);
if (currentMode == OperationMode.RUNNING && previousMode != OperationMode.RUNNING) {
maybeRunAsyncActions(event.state().projectState(project.id()));
}
}
}

// if we're the master, then process deleted indices and trigger policies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,34 @@ public void testSkip_asyncAction() {
Mockito.verifyNoMoreInteractions(clusterService);
}

/**
* Test that an async action step is not executed when ILM is stopped.
*/
public void testNotRunningAsyncActionWhenILMIsStopped() {
String policyName = "stopped_policy";
Step.StepKey stepKey = new Step.StepKey("phase", "action", "async_action_step");

MockAsyncActionStep step = new MockAsyncActionStep(stepKey, null);

PolicyStepsRegistry stepRegistry = createOneStepPolicyStepRegistry(policyName, step);
ClusterService clusterService = mock(ClusterService.class);
newMockTaskQueue(clusterService); // ensure constructor call to createTaskQueue is satisfied
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);

IndexMetadata indexMetadata = IndexMetadata.builder("test")
.settings(randomIndexSettings().put(LifecycleSettings.LIFECYCLE_NAME, policyName))
.build();

IndexLifecycleMetadata ilm = new IndexLifecycleMetadata(Map.of(), OperationMode.STOPPED);
final var project = ProjectMetadata.builder(randomProjectIdOrDefault())
.put(indexMetadata, true)
.putCustom(IndexLifecycleMetadata.TYPE, ilm)
.build();
runner.maybeRunAsyncAction(projectStateFromProject(project), indexMetadata, policyName, stepKey);

assertThat(step.getExecuteCount(), equalTo(0L));
}

public void testRunPolicyErrorStepOnRetryableFailedStep() {
String policyName = "rollover_policy";
String phaseName = "hot";
Expand Down Expand Up @@ -586,7 +614,6 @@ public void testRunStateChangePolicyWithAsyncActionNextStep() throws Exception {
.putProjectMetadata(project)
.nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId()))
.build();
logger.info("--> state: {}", state);
ClusterServiceUtils.setState(clusterService, state);
IndexLifecycleRunner runner = new IndexLifecycleRunner(stepRegistry, historyStore, clusterService, threadPool, () -> 0L);

Expand Down