Skip to content

Commit 49c768e

Browse files
committed
fix tests
1 parent 514e22d commit 49c768e

File tree

7 files changed

+55
-10
lines changed

7 files changed

+55
-10
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public void finishSessionEarly(String sessionId, ActionListener<Void> listener)
185185
ExchangeSourceHandler exchangeSource = removeExchangeSourceHandler(sessionId);
186186
if (exchangeSource != null) {
187187
exchangeSource.finishEarly(false, listener);
188+
exchangeSource.onFinishEarly();
188189
} else {
189190
listener.onResponse(null);
190191
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public void fetchPageAsync(boolean sourceFinished, ActionListener<ExchangeRespon
102102
buffer.finish(true);
103103
var subSource = source.get();
104104
if (subSource != null) {
105-
subSource.finishEarly(true, ActionListener.noop());
105+
subSource.finishEarly(false, ActionListener.noop());
106106
}
107107
}
108108
listeners.add(listener);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -317,9 +317,6 @@ public Releasable addEmptySink() {
317317
* @param drainingPages whether to discard pages already fetched in the exchange
318318
*/
319319
public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
320-
if (finishEarlyHandler != null) {
321-
finishEarlyHandler.run();
322-
}
323320
buffer.finish(drainingPages);
324321
if (remoteSinks.isEmpty()) {
325322
listener.onResponse(null);
@@ -332,6 +329,15 @@ public void finishEarly(boolean drainingPages, ActionListener<Void> listener) {
332329
}
333330
}
334331

332+
/**
333+
* Calls the handler signifying that the request has been completed prematurely.
334+
*/
335+
public void onFinishEarly() {
336+
if (finishEarlyHandler != null) {
337+
finishEarlyHandler.run();
338+
}
339+
}
340+
335341
private static class PendingInstances {
336342
private final AtomicInteger instances = new AtomicInteger();
337343
private final SubscribableListener<Void> completion = new SubscribableListener<>();

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -518,30 +518,65 @@ static EsqlQueryResponse fromXContent(XContentParser parser) {
518518
}
519519
}
520520

521+
public static int clusterDetailsSize(int numClusters) {
522+
/* Example:
523+
"_clusters" : {
524+
"total" : 2,
525+
"successful" : 2,
526+
"running" : 0,
527+
"skipped" : 0,
528+
"partial" : 0,
529+
"failed" : 0,
530+
"is_partial" : false,
531+
"details" : {
532+
"(local)" : {
533+
"status" : "successful",
534+
"indices" : "logs-1",
535+
"took" : 4444,
536+
"_shards" : {
537+
"total" : 10,
538+
"successful" : 10,
539+
"skipped" : 3,
540+
"failed" : 0
541+
}
542+
},
543+
"remote1" : {
544+
"status" : "successful",
545+
"indices" : "remote1:logs-1",
546+
"took" : 4999,
547+
"_shards" : {
548+
"total" : 12,
549+
"successful" : 12,
550+
"skipped" : 5,
551+
"failed" : 0
552+
}
553+
}
554+
*/
555+
return numClusters * 4 + 7;
556+
}
557+
521558
public void testChunkResponseSizeColumnar() {
522-
int sizeClusterDetails = 14;
523559
try (EsqlQueryResponse resp = randomResponse(true, null)) {
524560
int columnCount = resp.pages().get(0).getBlockCount();
525561
int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
526-
assertChunkCount(resp, r -> 5 + sizeClusterDetails + bodySize);
562+
assertChunkCount(resp, r -> 5 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
527563
}
528564

529565
try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) {
530566
int columnCount = resp.pages().get(0).getBlockCount();
531567
int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
532-
assertChunkCount(resp, r -> 7 + sizeClusterDetails + bodySize); // is_running
568+
assertChunkCount(resp, r -> 7 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize); // is_running
533569
}
534570
}
535571

536572
public void testChunkResponseSizeRows() {
537-
int sizeClusterDetails = 14;
538573
try (EsqlQueryResponse resp = randomResponse(false, null)) {
539574
int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum();
540-
assertChunkCount(resp, r -> 5 + sizeClusterDetails + bodySize);
575+
assertChunkCount(resp, r -> 5 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
541576
}
542577
try (EsqlQueryResponse resp = randomResponseAsync(false, null, true)) {
543578
int bodySize = resp.pages().stream().mapToInt(Page::getPositionCount).sum();
544-
assertChunkCount(resp, r -> 7 + sizeClusterDetails + bodySize);
579+
assertChunkCount(resp, r -> 7 + clusterDetailsSize(resp.getExecutionInfo().clusterInfo.size()) + bodySize);
545580
}
546581
}
547582

x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1MissingIndicesIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ record ExpectedCluster(String clusterAlias, String indexExpression, String statu
7979
void assertExpectedClustersForMissingIndicesTests(Map<String, Object> responseMap, List<ExpectedCluster> expected) {
8080
Map<String, ?> clusters = (Map<String, ?>) responseMap.get("_clusters");
8181
assertThat((int) responseMap.get("took"), greaterThan(0));
82+
assertThat((boolean) clusters.get("is_partial"), is(false));
8283

8384
Map<String, ?> detailsMap = (Map<String, ?>) clusters.get("details");
8485
assertThat(detailsMap.size(), is(expected.size()));

x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS1UnavailableRemotesIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ private void clusterShutDownWithRandomSkipUnavailable() throws Exception {
101101
assertThat((int) clusters.get("skipped"), is(0));
102102
assertThat((int) clusters.get("partial"), is(0));
103103
assertThat((int) clusters.get("failed"), is(0));
104+
assertThat((boolean) clusters.get("is_partial"), is(false));
104105

105106
assertThat(clusterDetails.size(), is(2));
106107
assertThat((int) localClusterDetails.get("took"), greaterThan(0));

x-pack/plugin/security/qa/multi-cluster/src/javaRestTest/java/org/elasticsearch/xpack/remotecluster/CrossClusterEsqlRCS2UnavailableRemotesIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ private void clusterShutDownWithRandomSkipUnavailable() throws Exception {
119119
assertThat((int) clusters.get("skipped"), is(0));
120120
assertThat((int) clusters.get("partial"), is(0));
121121
assertThat((int) clusters.get("failed"), is(0));
122+
assertThat((boolean) clusters.get("is_partial"), is(false));
122123

123124
assertThat(clusterDetails.size(), is(2));
124125
assertThat((int) localClusterDetails.get("took"), greaterThan(0));

0 commit comments

Comments
 (0)