Skip to content

Commit d9c169f

Browse files
authored
[Transform] Improve reporting status of the transform that is about to finish (#95672)
1 parent 4602570 commit d9c169f

File tree

5 files changed

+64
-25
lines changed

5 files changed

+64
-25
lines changed

docs/changelog/95672.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 95672
2+
summary: Improve reporting status of the transform that is about to finish
3+
area: Transform
4+
type: bug
5+
issues: []

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -473,10 +473,17 @@ protected void startAndWaitForTransform(
473473
// start the transform
474474
startTransform(transformId, authHeader, secondaryAuthHeader, null, warnings);
475475
assertTrue(indexExists(transformIndex));
476-
// wait until the transform has been created and all data is available
477-
waitForTransformCheckpoint(transformId);
478-
479-
waitForTransformStopped(transformId);
476+
assertBusy(() -> {
477+
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
478+
// wait until the transform has been created and all data is available
479+
assertEquals(
480+
"Stats were: " + transformStatsAsMap,
481+
1,
482+
XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap)
483+
);
484+
// wait until the transform is stopped
485+
assertEquals("Stats were: " + transformStatsAsMap, "stopped", XContentMapValues.extractValue("state", transformStatsAsMap));
486+
}, 30, TimeUnit.SECONDS);
480487
refreshIndex(transformIndex);
481488
}
482489

@@ -539,16 +546,16 @@ protected Request createRequestWithAuth(final String method, final String endpoi
539546
return createRequestWithSecondaryAuth(method, endpoint, authHeader, null);
540547
}
541548

542-
void waitForTransformStopped(String transformId) throws Exception {
543-
assertBusy(() -> { assertEquals("stopped", getTransformState(transformId)); }, 15, TimeUnit.SECONDS);
544-
}
545-
546-
void waitForTransformCheckpoint(String transformId) throws Exception {
547-
waitForTransformCheckpoint(transformId, 1L);
548-
}
549-
550549
void waitForTransformCheckpoint(String transformId, long checkpoint) throws Exception {
551-
assertBusy(() -> assertEquals(checkpoint, getTransformCheckpoint(transformId)), 30, TimeUnit.SECONDS);
550+
assertBusy(() -> {
551+
Map<?, ?> transformStatsAsMap = getTransformStateAndStats(transformId);
552+
assertNotEquals("Stats were: " + transformStatsAsMap, "failed", XContentMapValues.extractValue("state", transformStatsAsMap));
553+
assertEquals(
554+
"Stats were: " + transformStatsAsMap,
555+
(int) checkpoint,
556+
XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap)
557+
);
558+
}, 30, TimeUnit.SECONDS);
552559
}
553560

554561
void refreshIndex(String index) throws IOException {
@@ -632,16 +639,6 @@ public static void removeIndices() throws Exception {
632639
wipeAllIndices();
633640
}
634641

635-
static int getTransformCheckpoint(String transformId) throws IOException {
636-
Response statsResponse = client().performRequest(new Request("GET", getTransformEndpoint() + transformId + "/_stats"));
637-
638-
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
639-
640-
// assert that the transform did not fail
641-
assertNotEquals("Stats were: " + transformStatsAsMap, "failed", XContentMapValues.extractValue("state", transformStatsAsMap));
642-
return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
643-
}
644-
645642
protected void setupDataAccessRole(String role, String... indices) throws IOException {
646643
String indicesStr = Arrays.stream(indices).collect(Collectors.joining("\",\"", "\"", "\""));
647644
Request request = new Request("PUT", "/_security/role/" + role);

x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRobustnessIT.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,27 @@ public void testTaskRemovalAfterInternalIndexGotDeleted() throws Exception {
113113
deleteTransform(transformId);
114114
}
115115

116+
public void testCreateAndDeleteTransformInALoop() throws IOException {
117+
createReviewsIndex();
118+
119+
String transformId = "test_create_and_delete_in_a_loop";
120+
String destIndex = transformId + "-dest";
121+
for (int i = 0; i < 100; ++i) {
122+
try {
123+
// Create the batch transform
124+
createPivotReviewsTransform(transformId, destIndex, null);
125+
// Wait until the transform finishes
126+
startAndWaitForTransform(transformId, destIndex);
127+
// After the transform finishes, there should be no transform task left
128+
assertEquals(0, getNumberOfTransformTasks());
129+
// Delete the transform
130+
deleteTransform(transformId);
131+
} catch (AssertionError | Exception e) {
132+
fail("Failure at iteration " + i + ": " + e.getMessage());
133+
}
134+
}
135+
}
136+
116137
@SuppressWarnings("unchecked")
117138
private int getNumberOfTransformTasks() throws IOException {
118139
final Request tasksRequest = new Request("GET", "/_tasks");

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportGetTransformStatsAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ private void addCheckpointingInfoForTransformsWithoutTasks(
335335
ClusterState clusterState,
336336
ActionListener<Void> listener
337337
) {
338-
339338
if (statsForTransformsWithoutTasks.isEmpty()) {
340339
// No work to do, but we must respond to the listener
341340
listener.onResponse(null);
@@ -366,10 +365,14 @@ private void addCheckpointingInfoForTransformsWithoutTasks(
366365
)
367366
);
368367
} else {
368+
final boolean transformPersistentTaskIsStillRunning = TransformTask.getTransformTask(
369+
stat.getId(),
370+
clusterState
371+
) != null;
369372
allStateAndStats.add(
370373
new TransformStats(
371374
stat.getId(),
372-
TransformStats.State.STOPPED,
375+
transformPersistentTaskIsStillRunning ? TransformStats.State.STOPPING : TransformStats.State.STOPPED,
373376
null,
374377
null,
375378
stat.getTransformStats(),

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.elasticsearch.common.Strings;
2020
import org.elasticsearch.common.regex.Regex;
2121
import org.elasticsearch.persistent.AllocatedPersistentTask;
22+
import org.elasticsearch.persistent.PersistentTaskParams;
2223
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
2324
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.PersistentTask;
2425
import org.elasticsearch.persistent.PersistentTasksService;
@@ -47,6 +48,7 @@
4748
import java.util.Collection;
4849
import java.util.Collections;
4950
import java.util.Map;
51+
import java.util.Objects;
5052
import java.util.Set;
5153
import java.util.function.Predicate;
5254

@@ -464,6 +466,17 @@ public void shutdown() {
464466
logger.debug("[{}] shutdown of transform requested", transform.getId());
465467
transformScheduler.deregisterTransform(getTransformId());
466468
markAsCompleted();
469+
waitForPersistentTask(Objects::isNull, null, new PersistentTasksService.WaitForPersistentTaskListener<>() {
470+
@Override
471+
public void onResponse(PersistentTask<PersistentTaskParams> persistentTask) {
472+
logger.trace("[{}] successfully finished waiting for persistent task to disappear.", transform.getId());
473+
}
474+
475+
@Override
476+
public void onFailure(Exception e) {
477+
logger.error(() -> "[" + transform.getId() + "] failure while waiting for persistent task to disappear.", e);
478+
}
479+
});
467480
}
468481

469482
void persistStateToClusterState(TransformState state, ActionListener<PersistentTask<?>> listener) {

0 commit comments

Comments
 (0)