Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
462a8e5
Async request in txt mode returns the async parameters as headers
kanoshiou Jul 19, 2024
33e8b1f
Add changelog
kanoshiou Jul 19, 2024
f0285ea
Typo
kanoshiou Jul 30, 2024
f77b893
Change listener of `/_query/async/{id}` to `EsqlResponseListener` to …
kanoshiou Jul 30, 2024
e81487a
Change HTTP header names access modifiers to private
kanoshiou Jul 31, 2024
c27716f
Revert "Change HTTP header names access modifiers to private"
kanoshiou Jul 31, 2024
9a010a5
Response media type defaults to JSON if there is no specific media ty…
kanoshiou Jul 31, 2024
12ef64d
Add tests for async query in text mode
kanoshiou Jul 31, 2024
b774b61
Get requests do not need checkNonNullMediaType
kanoshiou Jul 31, 2024
c3c1396
Update doc
kanoshiou Jul 31, 2024
e94872a
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Aug 1, 2024
3f48c01
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Sep 5, 2024
d6bd2e2
Merge remote-tracking branch 'origin/main' into async-parameters-retu…
kanoshiou Oct 10, 2024
cc9a40b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Nov 28, 2024
6f86392
small changes
kanoshiou Nov 28, 2024
e8484c8
Refactor `runEsqlAsync` to support text format
kanoshiou Nov 28, 2024
106b114
Refactor `runEsqlSync` to support text format
kanoshiou Nov 28, 2024
76d3be7
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Nov 28, 2024
7f9cece
Overload `runEsql` to support text formats
kanoshiou Nov 29, 2024
c01d53b
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Dec 1, 2024
e0786a7
Replace async headers
kanoshiou Dec 1, 2024
3a3912c
Revert
kanoshiou Dec 1, 2024
4f505fe
Add a private constructor for `EsqlResponseListener`
kanoshiou Dec 1, 2024
ae03113
Check non-null media type
kanoshiou Dec 1, 2024
7fc2bbb
Refactor `runEsqlAsTextWithFormat` to support async mode
kanoshiou Dec 1, 2024
cb50b12
Remove static map `TEXT_FORMATS`
kanoshiou Dec 1, 2024
3bf781d
Update 111104.yaml
kanoshiou Dec 2, 2024
b519248
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
kanoshiou Dec 2, 2024
eb9e49d
Update 111104.yaml
kanoshiou Dec 2, 2024
5ed973f
Update 111104.yaml
kanoshiou Dec 2, 2024
2904aa2
Update
kanoshiou Dec 4, 2024
b0da5b5
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 4, 2024
444ed89
Allow an ASYNC GET with no media type spec
bpintea Dec 5, 2024
389dd8a
Merge branch 'main' into fork/kanoshiou/async-parameters-returns-as-h…
bpintea Dec 5, 2024
5b02528
Testcases for request with no Content-type header
kanoshiou Dec 6, 2024
201a45d
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 7, 2024
0fb5625
Merge branch 'main' into async-parameters-returns-as-headers-in-text-…
bpintea Dec 9, 2024
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/111104.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 111104
summary: "ESQL: Enable async get to support formatting"
area: ES|QL
type: feature
issues:
- 110926
4 changes: 4 additions & 0 deletions docs/reference/esql/esql-async-query-get-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ parameter is `true`.
[[esql-async-query-get-api-query-params]]
==== {api-query-parms-title}

The API accepts the same parameters as the synchronous
<<esql-query-api-query-params,query API>>, along with the following
parameters:

`wait_for_completion_timeout`::
(Optional, <<time-units,time value>>)
Timeout duration to wait for the request to finish. Defaults to no timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
ActionListener<Response> listener
) {
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
// This is will performed in case of timeout
// This will be performed in case of timeout
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
if (acquiredListener != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.IntFunction;

Expand Down Expand Up @@ -350,21 +351,22 @@ public void testTextMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));

}

public void testCSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
}

public void testTSVMode() throws IOException {
int count = randomIntBetween(0, 100);
bulkLoadTestData(count);
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
}

public void testCSVNoHeaderMode() throws IOException {
Expand Down Expand Up @@ -1167,14 +1169,19 @@ static void deleteNonExistent(Request request) throws IOException {
assertEquals(404, response.getStatusLine().getStatusCode());
}

static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
Request request = prepareRequest(SYNC);
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
throws IOException {
Request request = prepareRequest(mode);
if (mode == ASYNC) {
addAsyncParameters(builder);
}
String mediaType = attachBody(builder.build(), request);

RequestOptions.Builder options = request.getOptions().toBuilder();
options.addHeader("Content-Type", mediaType);

if (randomBoolean()) {
boolean addParam = randomBoolean();
if (addParam) {
request.addParameter("format", format);
} else {
switch (format) {
Expand All @@ -1188,8 +1195,71 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
}
request.setOptions(options);

HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
Response response = performRequest(request);
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());

// get the content, it could be empty because the request might have not completed
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
String id = response.getHeader("X-Elasticsearch-Async-Id");

if (mode == SYNC) {
assertThat(id, is(emptyOrNullString()));
return initialValue;
}

if (id == null) {
// no id returned from an async call, must have completed immediately and without keep_on_completion
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
assertNull(response.getHeader("is_running"));
// the content cant be empty
assertThat(initialValue, not(emptyOrNullString()));
return initialValue;
} else {
// async may not return results immediately, so may need an async get
assertThat(id, is(not(emptyOrNullString())));
String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
if (Objects.equals(isRunning, "?0")) {
// must have completed immediately so keep_on_completion must be true
assertThat(builder.keepOnCompletion(), is(true));
} else {
// did not return results immediately, so we will need an async get
// Also, different format modes return different results.
switch (format) {
case "txt" -> assertThat(initialValue, emptyOrNullString());
case "csv" -> {
assertEquals(initialValue, "\r\n");
initialValue = "";
}
case "tsv" -> {
assertEquals(initialValue, "\n");
initialValue = "";
}
}
}
// issue a second request to "async get" the results
Request getRequest = prepareAsyncGetRequest(id);
if (delimiter != null) {
getRequest.addParameter("delimiter", String.valueOf(delimiter));
}
// If the `format` parameter is not added, the GET request will return a response
// with the `Content-Type` type due to the lack of an `Accept` header.
if (addParam) {
getRequest.addParameter("format", format);
}
// if `addParam` is false, `options` will already have an `Accept` header
getRequest.setOptions(options);
response = performRequest(getRequest);
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
}
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));

// assert initial contents, if any, are the same as async get contents
if (initialValue != null && initialValue.isEmpty() == false) {
assertEquals(initialValue, newValue);
}

assertDeletable(id);
return newValue;
}

private static Request prepareRequest(Mode mode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public TimeValue stop() {
/**
* Keep the initial query for logging purposes.
*/
private final String esqlQuery;
private final String esqlQueryOrId;
/**
* Stop the time it took to build a response to later log it. Use something thread-safe here because stopping time requires state and
* {@link EsqlResponseListener} might be used from different threads.
Expand All @@ -98,29 +98,23 @@ public TimeValue stop() {
* To correctly time the execution of a request, a {@link EsqlResponseListener} must be constructed immediately before execution begins.
*/
public EsqlResponseListener(RestChannel channel, RestRequest restRequest, EsqlQueryRequest esqlRequest) {
super(channel);
this(channel, restRequest, esqlRequest.query(), EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest));
}

/**
* Async query get API does not have an EsqlQueryRequest, store the async ID as an alternative
*/
public EsqlResponseListener(RestChannel channel, RestRequest restRequest) {
this(channel, restRequest, restRequest.param("id"), EsqlMediaTypeParser.getResponseMediaType(restRequest));
}

private EsqlResponseListener(RestChannel channel, RestRequest restRequest, String esqlQueryOrId, MediaType mediaType) {
super(channel);
this.channel = channel;
this.restRequest = restRequest;
this.esqlQuery = esqlRequest.query();
mediaType = EsqlMediaTypeParser.getResponseMediaType(restRequest, esqlRequest);

/*
* Special handling for the "delimiter" parameter which should only be
* checked for being present or not in the case of CSV format. We cannot
* override {@link BaseRestHandler#responseParams()} because this
* parameter should only be checked for CSV, not other formats.
*/
if (mediaType != CSV && restRequest.hasParam(URL_PARAM_DELIMITER)) {
String message = String.format(
Locale.ROOT,
"parameter: [%s] can only be used with the format [%s] for request [%s]",
URL_PARAM_DELIMITER,
CSV.queryParameter(),
restRequest.path()
);
throw new IllegalArgumentException(message);
}
this.esqlQueryOrId = esqlQueryOrId;
this.mediaType = mediaType;
checkDelimiter();
}

@Override
Expand Down Expand Up @@ -197,14 +191,18 @@ public ActionListener<EsqlQueryResponse> wrapWithLogging() {
listener.onResponse(r);
// At this point, the StopWatch should already have been stopped, so we log a consistent time.
LOGGER.debug(
"Finished execution of ESQL query.\nQuery string: [{}]\nExecution time: [{}]ms",
esqlQuery,
"Finished execution of ESQL query.\nQuery string or async ID: [{}]\nExecution time: [{}]ms",
esqlQueryOrId,
getTook(r, TimeUnit.MILLISECONDS)
);
}, ex -> {
// In case of failure, stop the time manually before sending out the response.
long timeMillis = getTook(null, TimeUnit.MILLISECONDS);
LOGGER.debug("Failed execution of ESQL query.\nQuery string: [{}]\nExecution time: [{}]ms", esqlQuery, timeMillis);
LOGGER.debug(
"Failed execution of ESQL query.\nQuery string or async ID: [{}]\nExecution time: [{}]ms",
esqlQueryOrId,
timeMillis
);
listener.onFailure(ex);
});
}
Expand All @@ -213,4 +211,23 @@ static void logOnFailure(Throwable throwable) {
RestStatus status = ExceptionsHelper.status(throwable);
LOGGER.log(status.getStatus() >= 500 ? Level.WARN : Level.DEBUG, () -> "Request failed with status [" + status + "]: ", throwable);
}

/*
* Special handling for the "delimiter" parameter which should only be
* checked for being present or not in the case of CSV format. We cannot
* override {@link BaseRestHandler#responseParams()} because this
* parameter should only be checked for CSV, not other formats.
*/
private void checkDelimiter() {
if (mediaType != CSV && restRequest.hasParam(URL_PARAM_DELIMITER)) {
String message = String.format(
Locale.ROOT,
"parameter: [%s] can only be used with the format [%s] for request [%s]",
URL_PARAM_DELIMITER,
CSV.queryParameter(),
restRequest.path()
);
throw new IllegalArgumentException(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;

import java.util.List;
Expand Down Expand Up @@ -43,7 +42,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
if (request.hasParam("keep_alive")) {
get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));
}
return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new RestRefCountedChunkedToXContentListener<>(channel));
return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new EsqlResponseListener(channel, request));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,20 @@ public class EsqlMediaTypeParser {
* combinations are detected.
*/
public static MediaType getResponseMediaType(RestRequest request, EsqlQueryRequest esqlRequest) {
var mediaType = request.hasParam(URL_PARAM_FORMAT) ? mediaTypeFromParams(request) : mediaTypeFromHeaders(request);
var mediaType = getResponseMediaType(request);
validateColumnarRequest(esqlRequest.columnar(), mediaType);
validateIncludeCCSMetadata(esqlRequest.includeCCSMetadata(), mediaType);
return checkNonNullMediaType(mediaType, request);
}

/*
* Retrieve the mediaType of a request without validating the EsqlQueryRequest.
*/
public static MediaType getResponseMediaType(RestRequest request) {
var mediaType = request.hasParam(URL_PARAM_FORMAT) ? mediaTypeFromParams(request) : mediaTypeFromHeaders(request);
return checkNonNullMediaType(mediaType, request);
}

private static MediaType mediaTypeFromHeaders(RestRequest request) {
ParsedMediaType acceptType = request.getParsedAccept();
MediaType mediaType = acceptType != null ? acceptType.toMediaType(MEDIA_TYPE_REGISTRY) : request.getXContentType();
Expand Down