Skip to content

Commit 67ee034

Browse files
authored
ESQL: Enable async get to support formatting (#111104)
I've updated the listener for GET /_query/async/{id} to EsqlResponseListener, so it now accepts parameters (delimiter, drop_null_columns and format) like the POST /_query API. Additionally, I have added tests to verify the correctness of the code. You can now set the format in the request parameters to specify the return style. Closes #110926
1 parent b4e852a commit 67ee034

File tree

8 files changed

+236
-74
lines changed

8 files changed

+236
-74
lines changed

docs/changelog/111104.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 111104
2+
summary: "ESQL: Enable async get to support formatting"
3+
area: ES|QL
4+
type: feature
5+
issues:
6+
- 110926

docs/reference/esql/esql-async-query-get-api.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ parameter is `true`.
3939
[[esql-async-query-get-api-query-params]]
4040
==== {api-query-parms-title}
4141

42+
The API accepts the same parameters as the synchronous
43+
<<esql-query-api-query-params,query API>>, along with the following
44+
parameters:
45+
4246
`wait_for_completion_timeout`::
4347
(Optional, <<time-units,time value>>)
4448
Timeout duration to wait for the request to finish. Defaults to no timeout,

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/async/AsyncTaskManagementService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ private ActionListener<Response> wrapStoringListener(
208208
ActionListener<Response> listener
209209
) {
210210
AtomicReference<ActionListener<Response>> exclusiveListener = new AtomicReference<>(listener);
211-
// This is will performed in case of timeout
211+
// This will be performed in case of timeout
212212
Scheduler.ScheduledCancellable timeoutHandler = threadPool.schedule(() -> {
213213
ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
214214
if (acquiredListener != null) {

x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/RestEsqlTestCase.java

Lines changed: 160 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -350,21 +350,21 @@ public void testTextMode() throws IOException {
350350
int count = randomIntBetween(0, 100);
351351
bulkLoadTestData(count);
352352
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
353-
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null));
353+
assertEquals(expectedTextBody("txt", count, null), runEsqlAsTextWithFormat(builder, "txt", null, mode));
354354
}
355355

356356
public void testCSVMode() throws IOException {
357357
int count = randomIntBetween(0, 100);
358358
bulkLoadTestData(count);
359359
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
360-
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|'));
360+
assertEquals(expectedTextBody("csv", count, '|'), runEsqlAsTextWithFormat(builder, "csv", '|', mode));
361361
}
362362

363363
public void testTSVMode() throws IOException {
364364
int count = randomIntBetween(0, 100);
365365
bulkLoadTestData(count);
366366
var builder = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
367-
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null));
367+
assertEquals(expectedTextBody("tsv", count, null), runEsqlAsTextWithFormat(builder, "tsv", null, mode));
368368
}
369369

370370
public void testCSVNoHeaderMode() throws IOException {
@@ -1003,53 +1003,35 @@ public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject
10031003
}
10041004

10051005
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject) throws IOException {
1006-
return runEsqlAsync(requestObject, new AssertWarnings.NoWarnings());
1006+
return runEsqlAsync(requestObject, randomBoolean(), new AssertWarnings.NoWarnings());
10071007
}
10081008

10091009
static Map<String, Object> runEsql(RequestObjectBuilder requestObject, AssertWarnings assertWarnings, Mode mode) throws IOException {
10101010
if (mode == ASYNC) {
1011-
return runEsqlAsync(requestObject, assertWarnings);
1011+
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
10121012
} else {
10131013
return runEsqlSync(requestObject, assertWarnings);
10141014
}
10151015
}
10161016

10171017
public static Map<String, Object> runEsqlSync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
1018-
requestObject.build();
1019-
Request request = prepareRequest(SYNC);
1020-
String mediaType = attachBody(requestObject, request);
1021-
1022-
RequestOptions.Builder options = request.getOptions().toBuilder();
1023-
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
1024-
options.addHeader("Content-Type", mediaType);
1025-
1026-
if (randomBoolean()) {
1027-
options.addHeader("Accept", mediaType);
1028-
} else {
1029-
request.addParameter("format", requestObject.contentType().queryParameter());
1030-
}
1031-
request.setOptions(options);
1018+
Request request = prepareRequestWithOptions(requestObject, SYNC);
10321019

10331020
HttpEntity entity = performRequest(request, assertWarnings);
10341021
return entityToMap(entity, requestObject.contentType());
10351022
}
10361023

10371024
public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObject, AssertWarnings assertWarnings) throws IOException {
1038-
addAsyncParameters(requestObject);
1039-
requestObject.build();
1040-
Request request = prepareRequest(ASYNC);
1041-
String mediaType = attachBody(requestObject, request);
1042-
1043-
RequestOptions.Builder options = request.getOptions().toBuilder();
1044-
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
1045-
options.addHeader("Content-Type", mediaType);
1025+
return runEsqlAsync(requestObject, randomBoolean(), assertWarnings);
1026+
}
10461027

1047-
if (randomBoolean()) {
1048-
options.addHeader("Accept", mediaType);
1049-
} else {
1050-
request.addParameter("format", requestObject.contentType().queryParameter());
1051-
}
1052-
request.setOptions(options);
1028+
public static Map<String, Object> runEsqlAsync(
1029+
RequestObjectBuilder requestObject,
1030+
boolean keepOnCompletion,
1031+
AssertWarnings assertWarnings
1032+
) throws IOException {
1033+
addAsyncParameters(requestObject, keepOnCompletion);
1034+
Request request = prepareRequestWithOptions(requestObject, ASYNC);
10531035

10541036
if (shouldLog()) {
10551037
LOGGER.info("REQUEST={}", request);
@@ -1061,7 +1043,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
10611043
Object initialColumns = null;
10621044
Object initialValues = null;
10631045
var json = entityToMap(entity, requestObject.contentType());
1064-
checkKeepOnCompletion(requestObject, json);
1046+
checkKeepOnCompletion(requestObject, json, keepOnCompletion);
10651047
String id = (String) json.get("id");
10661048

10671049
var supportsAsyncHeaders = clusterHasCapability("POST", "/_query", List.of(), List.of("async_query_status_headers")).orElse(false);
@@ -1101,7 +1083,7 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
11011083

11021084
// issue a second request to "async get" the results
11031085
Request getRequest = prepareAsyncGetRequest(id);
1104-
getRequest.setOptions(options);
1086+
getRequest.setOptions(request.getOptions());
11051087
response = performRequest(getRequest);
11061088
entity = response.getEntity();
11071089
}
@@ -1119,6 +1101,66 @@ public static Map<String, Object> runEsqlAsync(RequestObjectBuilder requestObjec
11191101
return removeAsyncProperties(result);
11201102
}
11211103

1104+
public void testAsyncGetWithoutContentType() throws IOException {
1105+
int count = randomIntBetween(0, 100);
1106+
bulkLoadTestData(count);
1107+
var requestObject = requestObjectBuilder().query(fromIndex() + " | keep keyword, integer | sort integer asc | limit 100");
1108+
1109+
addAsyncParameters(requestObject, true);
1110+
Request request = prepareRequestWithOptions(requestObject, ASYNC);
1111+
1112+
if (shouldLog()) {
1113+
LOGGER.info("REQUEST={}", request);
1114+
}
1115+
1116+
Response response = performRequest(request);
1117+
HttpEntity entity = response.getEntity();
1118+
1119+
var json = entityToMap(entity, requestObject.contentType());
1120+
checkKeepOnCompletion(requestObject, json, true);
1121+
String id = (String) json.get("id");
1122+
// results won't be returned since keepOnCompletion is true
1123+
assertThat(id, is(not(emptyOrNullString())));
1124+
1125+
// issue an "async get" request with no Content-Type
1126+
Request getRequest = prepareAsyncGetRequest(id);
1127+
response = performRequest(getRequest);
1128+
entity = response.getEntity();
1129+
var result = entityToMap(entity, XContentType.JSON);
1130+
1131+
ListMatcher values = matchesList();
1132+
for (int i = 0; i < count; i++) {
1133+
values = values.item(matchesList().item("keyword" + i).item(i));
1134+
}
1135+
assertMap(
1136+
result,
1137+
matchesMap().entry(
1138+
"columns",
1139+
matchesList().item(matchesMap().entry("name", "keyword").entry("type", "keyword"))
1140+
.item(matchesMap().entry("name", "integer").entry("type", "integer"))
1141+
).entry("values", values).entry("took", greaterThanOrEqualTo(0)).entry("id", id).entry("is_running", false)
1142+
);
1143+
1144+
}
1145+
1146+
static Request prepareRequestWithOptions(RequestObjectBuilder requestObject, Mode mode) throws IOException {
1147+
requestObject.build();
1148+
Request request = prepareRequest(mode);
1149+
String mediaType = attachBody(requestObject, request);
1150+
1151+
RequestOptions.Builder options = request.getOptions().toBuilder();
1152+
options.setWarningsHandler(WarningsHandler.PERMISSIVE); // We assert the warnings ourselves
1153+
options.addHeader("Content-Type", mediaType);
1154+
1155+
if (randomBoolean()) {
1156+
options.addHeader("Accept", mediaType);
1157+
} else {
1158+
request.addParameter("format", requestObject.contentType().queryParameter());
1159+
}
1160+
request.setOptions(options);
1161+
return request;
1162+
}
1163+
11221164
// Removes async properties, otherwise consuming assertions would need to handle sync and async differences
11231165
static Map<String, Object> removeAsyncProperties(Map<String, Object> map) {
11241166
Map<String, Object> copy = new HashMap<>(map);
@@ -1139,17 +1181,20 @@ protected static Map<String, Object> entityToMap(HttpEntity entity, XContentType
11391181
}
11401182
}
11411183

1142-
static void addAsyncParameters(RequestObjectBuilder requestObject) throws IOException {
1184+
static void addAsyncParameters(RequestObjectBuilder requestObject, boolean keepOnCompletion) throws IOException {
11431185
// deliberately short in order to frequently trigger return without results
11441186
requestObject.waitForCompletion(TimeValue.timeValueNanos(randomIntBetween(1, 100)));
1145-
requestObject.keepOnCompletion(randomBoolean());
1187+
requestObject.keepOnCompletion(keepOnCompletion);
11461188
requestObject.keepAlive(TimeValue.timeValueDays(randomIntBetween(1, 10)));
11471189
}
11481190

11491191
// If keep_on_completion is set then an id must always be present, regardless of the value of any other property.
1150-
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json) {
1192+
static void checkKeepOnCompletion(RequestObjectBuilder requestObject, Map<String, Object> json, boolean keepOnCompletion) {
11511193
if (requestObject.keepOnCompletion()) {
1194+
assertTrue(keepOnCompletion);
11521195
assertThat((String) json.get("id"), not(emptyOrNullString()));
1196+
} else {
1197+
assertFalse(keepOnCompletion);
11531198
}
11541199
}
11551200

@@ -1167,14 +1212,19 @@ static void deleteNonExistent(Request request) throws IOException {
11671212
assertEquals(404, response.getStatusLine().getStatusCode());
11681213
}
11691214

1170-
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter) throws IOException {
1171-
Request request = prepareRequest(SYNC);
1215+
static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String format, @Nullable Character delimiter, Mode mode)
1216+
throws IOException {
1217+
Request request = prepareRequest(mode);
1218+
if (mode == ASYNC) {
1219+
addAsyncParameters(builder, randomBoolean());
1220+
}
11721221
String mediaType = attachBody(builder.build(), request);
11731222

11741223
RequestOptions.Builder options = request.getOptions().toBuilder();
11751224
options.addHeader("Content-Type", mediaType);
11761225

1177-
if (randomBoolean()) {
1226+
boolean addParam = randomBoolean();
1227+
if (addParam) {
11781228
request.addParameter("format", format);
11791229
} else {
11801230
switch (format) {
@@ -1188,8 +1238,75 @@ static String runEsqlAsTextWithFormat(RequestObjectBuilder builder, String forma
11881238
}
11891239
request.setOptions(options);
11901240

1191-
HttpEntity entity = performRequest(request, new AssertWarnings.NoWarnings());
1192-
return Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
1241+
if (shouldLog()) {
1242+
LOGGER.info("REQUEST={}", request);
1243+
}
1244+
1245+
Response response = performRequest(request);
1246+
HttpEntity entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1247+
1248+
// get the content, it could be empty because the request might have not completed
1249+
String initialValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
1250+
String id = response.getHeader("X-Elasticsearch-Async-Id");
1251+
1252+
if (mode == SYNC) {
1253+
assertThat(id, is(emptyOrNullString()));
1254+
return initialValue;
1255+
}
1256+
1257+
if (id == null) {
1258+
// no id returned from an async call, must have completed immediately and without keep_on_completion
1259+
assertThat(builder.keepOnCompletion(), either(nullValue()).or(is(false)));
1260+
assertNull(response.getHeader("is_running"));
1261+
// the content cant be empty
1262+
assertThat(initialValue, not(emptyOrNullString()));
1263+
return initialValue;
1264+
} else {
1265+
// async may not return results immediately, so may need an async get
1266+
assertThat(id, is(not(emptyOrNullString())));
1267+
String isRunning = response.getHeader("X-Elasticsearch-Async-Is-Running");
1268+
if ("?0".equals(isRunning)) {
1269+
// must have completed immediately so keep_on_completion must be true
1270+
assertThat(builder.keepOnCompletion(), is(true));
1271+
} else {
1272+
// did not return results immediately, so we will need an async get
1273+
// Also, different format modes return different results.
1274+
switch (format) {
1275+
case "txt" -> assertThat(initialValue, emptyOrNullString());
1276+
case "csv" -> {
1277+
assertEquals(initialValue, "\r\n");
1278+
initialValue = "";
1279+
}
1280+
case "tsv" -> {
1281+
assertEquals(initialValue, "\n");
1282+
initialValue = "";
1283+
}
1284+
}
1285+
}
1286+
// issue a second request to "async get" the results
1287+
Request getRequest = prepareAsyncGetRequest(id);
1288+
if (delimiter != null) {
1289+
getRequest.addParameter("delimiter", String.valueOf(delimiter));
1290+
}
1291+
// If the `format` parameter is not added, the GET request will return a response
1292+
// with the `Content-Type` type due to the lack of an `Accept` header.
1293+
if (addParam) {
1294+
getRequest.addParameter("format", format);
1295+
}
1296+
// if `addParam` is false, `options` will already have an `Accept` header
1297+
getRequest.setOptions(options);
1298+
response = performRequest(getRequest);
1299+
entity = assertWarnings(response, new AssertWarnings.NoWarnings());
1300+
}
1301+
String newValue = Streams.copyToString(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8));
1302+
1303+
// assert initial contents, if any, are the same as async get contents
1304+
if (initialValue != null && initialValue.isEmpty() == false) {
1305+
assertEquals(initialValue, newValue);
1306+
}
1307+
1308+
assertDeletable(id);
1309+
return newValue;
11931310
}
11941311

11951312
private static Request prepareRequest(Mode mode) {

0 commit comments

Comments
 (0)