Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@

package org.elasticsearch.xpack.enrich;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
Expand All @@ -30,6 +36,8 @@

public class EnrichPolicyExecutor {

private static final Logger logger = LogManager.getLogger(EnrichPolicyExecutor.class);

public static final String TASK_ACTION = "policy_execution";

private final ClusterService clusterService;
Expand Down Expand Up @@ -69,6 +77,7 @@ public void coordinatePolicyExecution(
ActionListener<ExecuteEnrichPolicyAction.Response> listener
) {
long nowTimestamp = nowSupplier.getAsLong();
String policyName = request.getName();
String enrichIndexName = EnrichPolicy.getIndexName(request.getName(), nowTimestamp);
Releasable policyLock = tryLockingPolicy(request.getName(), enrichIndexName);
try {
Expand All @@ -77,14 +86,19 @@ public void coordinatePolicyExecution(
internalRequest.setParentTask(request.getParentTask());
client.execute(InternalExecutePolicyAction.INSTANCE, internalRequest, ActionListener.wrap(response -> {
if (response.getStatus() != null) {
logger.debug("Unlocking enrich policy [{}:{}] on complete with no task scheduled", policyName, enrichIndexName);
policyLock.close();
listener.onResponse(response);
} else {
assert response.getTaskId() != null : "If the execute response does not have a status it must return a task id";
awaitTaskCompletionAndThenRelease(response.getTaskId(), policyLock);
awaitTaskCompletionAndThenRelease(response.getTaskId(), () -> {
logger.debug("Unlocking enrich policy [{}:{}] on completion of task status", policyName, enrichIndexName);
policyLock.close();
}, policyName, enrichIndexName);
listener.onResponse(response);
}
}, e -> {
logger.debug("Unlocking enrich policy [{}:{}] on failure to execute internal action", policyName, enrichIndexName);
policyLock.close();
listener.onFailure(e);
}));
Expand Down Expand Up @@ -138,11 +152,54 @@ private Releasable tryLockingPolicy(String policyName, String enrichIndexName) {
};
}

private void awaitTaskCompletionAndThenRelease(TaskId taskId, Releasable policyLock) {
GetTaskRequest getTaskRequest = new GetTaskRequest();
getTaskRequest.setTaskId(taskId);
getTaskRequest.setWaitForCompletion(true);
client.admin().cluster().getTask(getTaskRequest, ActionListener.running(policyLock::close));
private void awaitTaskCompletionAndThenRelease(
TaskId taskId,
Releasable policyLock,
final String policyName,
final String enrichIndexName
) {
GetTaskRequest getTaskRequest = new GetTaskRequest().setTaskId(taskId).setWaitForCompletion(true).setTimeout(TimeValue.MAX_VALUE);
client.admin().cluster().getTask(getTaskRequest, new ActionListener<>() {
@Override
public void onResponse(GetTaskResponse getTaskResponse) {
policyLock.close();
}

@Override
public void onFailure(Exception exception) {
if (ExceptionsHelper.unwrap(exception, ResourceNotFoundException.class) != null) {
// Could not find task, which means it completed, failed, or the node is gone. Clean up policy lock.
logger.debug(
"Assuming async policy [{}:{}] execution task [{}] has ended after not being able to retrieve it from remote host",
policyName,
enrichIndexName,
taskId
);
policyLock.close();
} else if (ExceptionsHelper.unwrap(exception, ElasticsearchTimeoutException.class) != null) {
// Timeout occurred while waiting for completion, launch the wait again
logger.debug(
"Retrying task wait after encountering timeout during async policy execution result [{}:{}]",
policyName,
enrichIndexName
);
awaitTaskCompletionAndThenRelease(taskId, policyLock, policyName, enrichIndexName);
} else {
// We've encountered an unforeseen problem while waiting for the policy to complete. Could be a network error or
// something else. Instead of keeping the policy locked forever and potentially jamming the enrich feature during
// an unstable cluster event, we should unlock it and accept the possibility of an inconsistent execution.
logger.error(
"Emergency unlock for enrich policy ["
+ policyName
+ ":"
+ enrichIndexName
+ "] on failure to determine task status caused by unhandled exception",
exception
);
policyLock.close();
}
}
});
}

private Runnable createPolicyRunner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.Releasable;

import java.util.HashSet;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -108,11 +108,13 @@ public EnrichPolicyLock lockPolicy(String policyName, String enrichIndexName) {
}

public Set<String> lockedPolices() {
return new HashSet<>(policyLocks.keySet());
// Wrap as unmodifiable instead of copying
return Collections.unmodifiableSet(policyLocks.keySet());
}

public Set<String> inflightPolicyIndices() {
return new HashSet<>(workingIndices.keySet());
// Wrap as unmodifiable instead of copying
return Collections.unmodifiableSet(workingIndices.keySet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

package org.elasticsearch.xpack.enrich;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.get.GetTaskAction;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -21,23 +24,30 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.action.InternalExecutePolicyAction;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.empty;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -178,6 +188,152 @@ public void testMaximumPolicyExecutionLimit() throws InterruptedException {
finalTaskComplete.await();
}

public void testWaitForCompletionConditionRemainsLocked() throws Exception {
String testPolicyName = "test_policy";
String testTaskId = randomAlphaOfLength(10) + ":" + randomIntBetween(100, 300);

// Client calls are forked to a different thread which will await on this latch before actually running anything
CountDownLatch clientBlockingLatch = new CountDownLatch(1);
// A barrier to repeatedly control when the async client will respond with Get Task API results.
CyclicBarrier getTaskActionBlockingBarrier = new CyclicBarrier(2);
// State flag to ensure first Get Task API call will fail.
AtomicBoolean shouldGetTaskApiReturnTimeout = new AtomicBoolean(true);

// Create the async testing client
Client client = new NoOpClient(testThreadPool) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
// Execute all client operations on another thread.
testThreadPool.generic().execute(() -> {
try {
// All client operations should wait until we're ready in the test.
clientBlockingLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

if (GetTaskAction.INSTANCE.equals(action)) {
// Enrich uses GetTaskAction to detect when the task completes during wait_for_completion. The first call will
// throw a timeout, and all remaining calls will return normally.
try {
// Wait until the signal is given to respond to the get task action
getTaskActionBlockingBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
// First call is a timeout to test the recovery logic. Remaining calls will no-op which should complete
// the execution.
if (shouldGetTaskApiReturnTimeout.getAndSet(false)) {
listener.onFailure(new ElasticsearchTimeoutException("Test call has timed out"));
} else {
listener.onResponse(null);
}
} else if (InternalExecutePolicyAction.INSTANCE.equals(action)) {
// Return a fake task id for the run
@SuppressWarnings("unchecked")
Response response = (Response) new ExecuteEnrichPolicyAction.Response(new TaskId(testTaskId));
listener.onResponse(response);
} else {
listener.onResponse(null);
}
});
}
};

// Set up
final EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks();
final EnrichPolicyExecutor testExecutor = new EnrichPolicyExecutor(
Settings.EMPTY,
null,
client,
testThreadPool,
TestIndexNameExpressionResolver.newInstance(testThreadPool.getThreadContext()),
enrichPolicyLocks,
ESTestCase::randomNonNegativeLong
);

// Launch a fake policy run that will block until firstTaskBlock is counted down.
PlainActionFuture<ExecuteEnrichPolicyAction.Response> firstTaskResult = PlainActionFuture.newFuture();
testExecutor.coordinatePolicyExecution(
new ExecuteEnrichPolicyAction.Request(testPolicyName).setWaitForCompletion(true),
firstTaskResult
);

// Check to make sure the policy is locked
if (enrichPolicyLocks.lockedPolices().contains(testPolicyName) == false) {
// If this fails, be a good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions during cleanup
clientBlockingLatch.countDown();
try {
firstTaskResult.get(3, TimeUnit.SECONDS);
} catch (Exception e) {
logger.error("Encountered ignorable exception during test cleanup");
}
try {
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
} catch (BrokenBarrierException e) {
logger.error("Encountered ignorable barrier broken exception during test cleanup");
}
fail("Enrich policy was not locked when it should have been");
}

// Free the client to execute
clientBlockingLatch.countDown();

// Wait for task id to be returned
try {
ExecuteEnrichPolicyAction.Response response = firstTaskResult.actionGet();
assertThat(response.getStatus(), is(nullValue()));
assertThat(response.getTaskId(), is(notNullValue()));
} catch (AssertionError e) {
// conclude the fake runs
try {
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
} catch (BrokenBarrierException be) {
logger.error("Encountered ignorable barrier broken exception during test cleanup");
}
throw e;
}

// Check to make sure the policy is locked still
if (enrichPolicyLocks.lockedPolices().contains(testPolicyName) == false) {
// keep the logs clean
try {
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
} catch (BrokenBarrierException e) {
logger.error("Encountered ignorable barrier broken exception during test cleanup");
}
fail("Enrich policy was not locked when it should have been");
}

// Now lets return a timeout response on the getTaskAPI
try {
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
} catch (BrokenBarrierException e) {
throw new RuntimeException("Unexpected broken barrier exception", e);
}

// Ensure that the policy remains locked during this period of uncertainty
expectThrows(
AssertionError.class,
() -> assertBusy(() -> assertFalse(enrichPolicyLocks.lockedPolices().contains(testPolicyName)), 3, TimeUnit.SECONDS)
);

// If the lock has remained, then the client should have resubmitted the task wait operation. Signal a new response that will
// complete the task wait
try {
getTaskActionBlockingBarrier.await(3, TimeUnit.SECONDS);
} catch (BrokenBarrierException e) {
throw new RuntimeException("Unexpected broken barrier exception", e);
}

// At this point the task should complete and unlock the policy correctly
assertBusy(() -> assertFalse(enrichPolicyLocks.lockedPolices().contains(testPolicyName)), 3, TimeUnit.SECONDS);
}

public void testRunPolicyLocallyMissingPolicy() {
EnrichPolicy enrichPolicy = EnrichPolicyTests.randomEnrichPolicy(XContentType.JSON);
ClusterState clusterState = ClusterState.builder(new ClusterName("_name"))
Expand Down