Skip to content

Commit dbdad5f

Browse files
committed
[Transform] Make transform _preview request cancellable (elastic#91313)
(cherry picked from commit a8a684e) # Conflicts: # x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/preview_transforms.yml # x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java # x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java # x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java # x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java
1 parent 72f37bc commit dbdad5f

File tree

10 files changed

+105
-13
lines changed

10 files changed

+105
-13
lines changed

docs/changelog/91313.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 91313
2+
summary: Make transform `_preview` request cancellable
3+
area: Transform
4+
type: bug
5+
issues:
6+
- 91286

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformAction.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import org.elasticsearch.common.io.stream.StreamOutput;
1919
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
2020
import org.elasticsearch.core.TimeValue;
21+
import org.elasticsearch.tasks.CancellableTask;
22+
import org.elasticsearch.tasks.Task;
23+
import org.elasticsearch.tasks.TaskId;
2124
import org.elasticsearch.xcontent.ConstructingObjectParser;
2225
import org.elasticsearch.xcontent.ParseField;
2326
import org.elasticsearch.xcontent.ToXContentObject;
@@ -136,6 +139,12 @@ public boolean equals(Object obj) {
136139
Request other = (Request) obj;
137140
return Objects.equals(config, other.config);
138141
}
142+
143+
@Override
144+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
145+
String description = "preview_transform[" + config.getId() + "]";
146+
return new CancellableTask(id, type, action, description, parentTaskId, headers);
147+
}
139148
}
140149

141150
public static class Response extends ActionResponse implements ToXContentObject {

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/PreviewTransformActionRequestTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
import org.elasticsearch.common.bytes.BytesArray;
1212
import org.elasticsearch.common.io.stream.Writeable;
1313
import org.elasticsearch.core.TimeValue;
14+
import org.elasticsearch.tasks.CancellableTask;
15+
import org.elasticsearch.tasks.Task;
16+
import org.elasticsearch.tasks.TaskId;
1417
import org.elasticsearch.xcontent.DeprecationHandler;
1518
import org.elasticsearch.xcontent.XContentParser;
1619
import org.elasticsearch.xcontent.json.JsonXContent;
@@ -22,9 +25,11 @@
2225
import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
2326

2427
import java.io.IOException;
28+
import java.util.Collections;
2529

2630
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
2731
import static org.hamcrest.Matchers.equalTo;
32+
import static org.hamcrest.Matchers.instanceOf;
2833
import static org.hamcrest.Matchers.is;
2934

3035
public class PreviewTransformActionRequestTests extends AbstractSerializingTransformTestCase<Request> {
@@ -128,4 +133,11 @@ private void testParsingOverwrites(
128133
assertThat(request.getConfig().getDestination().getPipeline(), is(equalTo(expectedDestPipeline)));
129134
}
130135
}
136+
137+
public void testCreateTask() {
138+
Request request = createTestInstance();
139+
Task task = request.createTask(123, "type", "action", TaskId.EMPTY_TASK_ID, Collections.emptyMap());
140+
assertThat(task, is(instanceOf(CancellableTask.class)));
141+
assertThat(task.getDescription(), is(equalTo("preview_transform[transform-preview]")));
142+
}
131143
}

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/preview_transforms.yml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,49 @@ setup:
150150
- match: { generated_dest_index.mappings.properties.by-hour.type: "date" }
151151
- match: { generated_dest_index.mappings.properties.avg_response.type: "double" }
152152

153+
---
154+
"Test preview transform with timeout":
155+
- do:
156+
transform.preview_transform:
157+
timeout: "10s"
158+
body: >
159+
{
160+
"source": { "index": "airline-data" },
161+
"pivot": {
162+
"group_by": {
163+
"airline": {"terms": {"field": "airline"}},
164+
"by-hour": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
165+
"aggs": {
166+
"avg_response": {"avg": {"field": "responsetime"}},
167+
"time.max": {"max": {"field": "time"}},
168+
"time.min": {"min": {"field": "time"}}
169+
}
170+
}
171+
}
172+
173+
---
174+
"Test preview transform with disabled mapping deduction":
175+
- do:
176+
transform.preview_transform:
177+
body: >
178+
{
179+
"source": { "index": "airline-data" },
180+
"pivot": {
181+
"group_by": {
182+
"airline": {"terms": {"field": "airline"}},
183+
"by-hour": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
184+
"aggs": {
185+
"avg_response": {"avg": {"field": "responsetime"}},
186+
"time.max": {"max": {"field": "time"}},
187+
"time.min": {"min": {"field": "time"}}
188+
}
189+
},
190+
"settings": {
191+
"deduce_mappings": false
192+
}
193+
}
194+
- match: { generated_dest_index.mappings.properties: {} }
195+
153196
---
154197
"Test preview transform by id":
155198
- do:

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportPreviewTransformAction.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.elasticsearch.action.support.ActionFilters;
1818
import org.elasticsearch.action.support.HandledTransportAction;
1919
import org.elasticsearch.client.Client;
20+
import org.elasticsearch.client.ParentTaskAssigningClient;
2021
import org.elasticsearch.cluster.ClusterState;
2122
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2223
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -27,11 +28,13 @@
2728
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.xcontent.XContentHelper;
2930
import org.elasticsearch.common.xcontent.support.XContentMapValues;
31+
import org.elasticsearch.core.TimeValue;
3032
import org.elasticsearch.ingest.IngestService;
3133
import org.elasticsearch.license.License;
3234
import org.elasticsearch.license.RemoteClusterLicenseChecker;
3335
import org.elasticsearch.license.XPackLicenseState;
3436
import org.elasticsearch.tasks.Task;
37+
import org.elasticsearch.tasks.TaskId;
3538
import org.elasticsearch.threadpool.ThreadPool;
3639
import org.elasticsearch.transport.TransportService;
3740
import org.elasticsearch.xcontent.ToXContent;
@@ -143,6 +146,7 @@ protected TransportPreviewTransformAction(
143146

144147
@Override
145148
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
149+
TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
146150
final ClusterState clusterState = clusterService.state();
147151
TransformNodes.throwIfNoTransformNodes(clusterState);
148152

@@ -169,6 +173,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
169173
// <4> Validate transform query
170174
ActionListener<Boolean> validateConfigListener = ActionListener.wrap(validateConfigResponse -> {
171175
getPreview(
176+
parentTaskId,
177+
request.timeout(),
172178
config.getId(), // note: @link{PreviewTransformAction} sets an id, so this is never null
173179
function,
174180
config.getSource(),
@@ -204,7 +210,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
204210
securityContext,
205211
indexNameExpressionResolver,
206212
clusterState,
207-
client,
213+
new ParentTaskAssigningClient(client, parentTaskId),
208214
config,
209215
// We don't want to check privileges for a dummy (placeholder) index and the placeholder is inserted as config.dest.index
210216
// early in the REST action so the only possibility we have here is string comparison.
@@ -218,6 +224,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
218224

219225
@SuppressWarnings("unchecked")
220226
private void getPreview(
227+
TaskId parentTaskId,
228+
TimeValue timeout,
221229
String transformId,
222230
Function function,
223231
SourceConfig source,
@@ -226,6 +234,8 @@ private void getPreview(
226234
SyncConfig syncConfig,
227235
ActionListener<Response> listener
228236
) {
237+
Client parentTaskAssigningClient = new ParentTaskAssigningClient(client, parentTaskId);
238+
229239
final SetOnce<Map<String, String>> mappings = new SetOnce<>();
230240

231241
ActionListener<SimulatePipelineResponse> pipelineResponseActionListener = ActionListener.wrap(simulatePipelineResponse -> {
@@ -285,15 +295,16 @@ private void getPreview(
285295
builder.endObject();
286296
SimulatePipelineRequest pipelineRequest = new SimulatePipelineRequest(BytesReference.bytes(builder), XContentType.JSON);
287297
pipelineRequest.setId(pipeline);
288-
client.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener);
298+
parentTaskAssigningClient.execute(SimulatePipelineAction.INSTANCE, pipelineRequest, pipelineResponseActionListener);
289299
}
290300
}
291301
}, listener::onFailure);
292302

293303
ActionListener<Map<String, String>> deduceMappingsListener = ActionListener.wrap(deducedMappings -> {
294304
mappings.set(deducedMappings);
295305
function.preview(
296-
client,
306+
parentTaskAssigningClient,
307+
timeout,
297308
ClientHelper.filterSecurityHeaders(threadPool.getThreadContext().getHeaders()),
298309
source,
299310
deducedMappings,
@@ -302,6 +313,6 @@ private void getPreview(
302313
);
303314
}, listener::onFailure);
304315

305-
function.deduceMappings(client, source, deduceMappingsListener);
316+
function.deduceMappings(parentTaskAssigningClient, source, deduceMappingsListener);
306317
}
307318
}

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportValidateTransformAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
140140
if (request.isDeferValidation()) {
141141
validateQueryListener.onResponse(true);
142142
} else {
143-
function.validateQuery(client, config.getSource(), validateQueryListener);
143+
function.validateQuery(client, config.getSource(), request.timeout(), validateQueryListener);
144144
}
145145
}, listener::onFailure);
146146

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestPreviewTransformAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
import org.apache.lucene.util.SetOnce;
1111
import org.elasticsearch.action.ActionListener;
1212
import org.elasticsearch.action.support.master.AcknowledgedRequest;
13+
import org.elasticsearch.client.Client;
1314
import org.elasticsearch.client.node.NodeClient;
1415
import org.elasticsearch.common.Strings;
1516
import org.elasticsearch.core.TimeValue;
1617
import org.elasticsearch.rest.BaseRestHandler;
1718
import org.elasticsearch.rest.RestRequest;
19+
import org.elasticsearch.rest.action.RestCancellableNodeClient;
1820
import org.elasticsearch.rest.action.RestToXContentListener;
1921
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
2022
import org.elasticsearch.xpack.core.transform.TransformField;
@@ -48,7 +50,7 @@ public String getName() {
4850
}
4951

5052
@Override
51-
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
53+
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
5254
String transformId = restRequest.param(TransformField.ID.getPreferredName());
5355

5456
if (Strings.isNullOrEmpty(transformId) && restRequest.hasContentOrSourceParam() == false) {
@@ -73,6 +75,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
7375
previewRequestHolder.set(PreviewTransformAction.Request.fromXContent(restRequest.contentOrSourceParamParser(), timeout));
7476
}
7577

78+
Client client = new RestCancellableNodeClient(nodeClient, restRequest.getHttpChannel());
7679
return channel -> {
7780
RestToXContentListener<PreviewTransformAction.Response> listener = new RestToXContentListener<>(channel);
7881

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/Function.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.action.index.IndexRequest;
1212
import org.elasticsearch.action.search.SearchResponse;
1313
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.core.TimeValue;
1416
import org.elasticsearch.core.Tuple;
1517
import org.elasticsearch.index.query.QueryBuilder;
1618
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -124,6 +126,7 @@ interface ChangeCollector {
124126
* Create a preview of the function.
125127
*
126128
* @param client a client instance for querying
129+
* @param timeout search query timeout
127130
* @param headers headers to be used to query only for what the caller is allowed to
128131
* @param sourceConfig the source configuration
129132
* @param fieldTypeMap mapping of field types
@@ -132,6 +135,7 @@ interface ChangeCollector {
132135
*/
133136
void preview(
134137
Client client,
138+
@Nullable TimeValue timeout,
135139
Map<String, String> headers,
136140
SourceConfig sourceConfig,
137141
Map<String, String> fieldTypeMap,
@@ -175,9 +179,10 @@ void preview(
175179
*
176180
* @param client a client instance for querying the source
177181
* @param sourceConfig the source configuration
182+
* @param timeout search query timeout
178183
* @param listener the result listener
179184
*/
180-
void validateQuery(Client client, SourceConfig sourceConfig, ActionListener<Boolean> listener);
185+
void validateQuery(Client client, SourceConfig sourceConfig, @Nullable TimeValue timeout, ActionListener<Boolean> listener);
181186

182187
/**
183188
* Create a change collector instance and return it

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/common/AbstractCompositeAggFunction.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.action.support.IndicesOptions;
1919
import org.elasticsearch.client.Client;
2020
import org.elasticsearch.common.ValidationException;
21+
import org.elasticsearch.core.TimeValue;
2122
import org.elasticsearch.core.Tuple;
2223
import org.elasticsearch.rest.RestStatus;
2324
import org.elasticsearch.search.aggregations.Aggregations;
@@ -62,6 +63,7 @@ public SearchSourceBuilder buildSearchQuery(SearchSourceBuilder builder, Map<Str
6263
@Override
6364
public void preview(
6465
Client client,
66+
TimeValue timeout,
6567
Map<String, String> headers,
6668
SourceConfig sourceConfig,
6769
Map<String, String> fieldTypeMap,
@@ -75,7 +77,7 @@ public void preview(
7577
client,
7678
SearchAction.INSTANCE,
7779
true,
78-
buildSearchRequest(sourceConfig, null, numberOfBuckets),
80+
buildSearchRequest(sourceConfig, timeout, numberOfBuckets),
7981
ActionListener.wrap(r -> {
8082
try {
8183
final Aggregations aggregations = r.getAggregations();
@@ -102,8 +104,8 @@ public void preview(
102104
}
103105

104106
@Override
105-
public void validateQuery(Client client, SourceConfig sourceConfig, ActionListener<Boolean> listener) {
106-
SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
107+
public void validateQuery(Client client, SourceConfig sourceConfig, TimeValue timeout, ActionListener<Boolean> listener) {
108+
SearchRequest searchRequest = buildSearchRequest(sourceConfig, timeout, TEST_QUERY_PAGE_SIZE);
107109
client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
108110
if (response == null) {
109111
listener.onFailure(new ValidationException().addValidationError("Unexpected null response from test query"));
@@ -176,9 +178,10 @@ protected abstract Stream<Map<String, Object>> extractResults(
176178
TransformProgress progress
177179
);
178180

179-
private SearchRequest buildSearchRequest(SourceConfig sourceConfig, Map<String, Object> position, int pageSize) {
181+
private SearchRequest buildSearchRequest(SourceConfig sourceConfig, TimeValue timeout, int pageSize) {
180182
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(sourceConfig.getQueryConfig().getQuery())
181-
.runtimeMappings(sourceConfig.getRuntimeMappings());
183+
.runtimeMappings(sourceConfig.getRuntimeMappings())
184+
.timeout(timeout);
182185
buildSearchQuery(sourceBuilder, null, pageSize);
183186
return new SearchRequest(sourceConfig.getIndex()).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
184187
.source(sourceBuilder)

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/pivot/PivotTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ private static void assertInvalidTransform(Client client, SourceConfig source, F
354354
private static void validate(Client client, SourceConfig source, Function pivot, boolean expectValid) throws Exception {
355355
CountDownLatch latch = new CountDownLatch(1);
356356
final AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
357-
pivot.validateQuery(client, source, ActionListener.wrap(validity -> {
357+
pivot.validateQuery(client, source, null, ActionListener.wrap(validity -> {
358358
assertEquals(expectValid, validity);
359359
latch.countDown();
360360
}, e -> {

0 commit comments

Comments
 (0)