Skip to content
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
setup:
- skip:
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just get away with unskipping this ? I was under the impression that it's going to fail. Or does it run in a controlled scenario where the result is predictable?

Copy link
Contributor Author

@benchaplin benchaplin Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take a look at #121885 the line that would make this test fail was removed. Now it essentially just tests batched_reduce_size validation. I'm not sure if it's worth keeping, what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. I'd keep it. Can we have some simpler check on num_reduce_phases, like greather than some threshold that's easier to predict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think we can assume anything about num_reduce_phases now - if it's 1, it's left out of the response entirely. And it can be 1 if all shards are batched.


- do:
indices.create:
index: test_1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESIntegTestCase;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.action.search.SearchType.QUERY_THEN_FETCH;
import static org.elasticsearch.search.aggregations.AggregationBuilders.terms;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailuresAndResponse;
import static org.hamcrest.Matchers.equalTo;

public class BatchedQueryPhaseIT extends ESIntegTestCase {

public void testNumReducePhases() {
String indexName = "test-idx";
assertAcked(
prepareCreate(indexName).setMapping("title", "type=keyword")
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0))
);
for (int i = 0; i < 100; i++) {
prepareIndex(indexName).setId(Integer.toString(i)).setSource("title", "testing" + i).get();
}
refresh();

final String coordinatorNode = internalCluster().getRandomNodeName();
final String coordinatorNodeId = getNodeId(coordinatorNode);
assertNoFailuresAndResponse(
client(coordinatorNode).prepareSearch(indexName)
.setBatchedReduceSize(2)
.addAggregation(terms("terms").field("title"))
.setSearchType(QUERY_THEN_FETCH),
response -> {
Map<String, Integer> shardsPerNode = getNodeToShardCountMap(indexName);
// Shards are not batched if they are already on the coordinating node or if there is only one per data node.
final int coordinatorShards = shardsPerNode.getOrDefault(coordinatorNodeId, 0);
final long otherSingleShardNodes = shardsPerNode.entrySet()
.stream()
.filter(entry -> entry.getKey().equals(coordinatorNodeId) == false)
.filter(entry -> entry.getValue() == 1)
.count();
final int numNotBatchedShards = coordinatorShards + (int) otherSingleShardNodes;

// Because batched_reduce_size = 2, whenever two or more shard results exist on the coordinating node, they will be
// partially reduced (batched queries do not count towards num_reduce_phases).
// Hence, the formula: (# of NOT batched shards) - 1.
final int expectedNumReducePhases = Math.max(1, numNotBatchedShards - 1);
assertThat(response.getNumReducePhases(), equalTo(expectedNumReducePhases));
}
);
}

private Map<String, Integer> getNodeToShardCountMap(String indexName) {
ClusterState clusterState = clusterAdmin().prepareState(TEST_REQUEST_TIMEOUT).get().getState();
IndexRoutingTable indexRoutingTable = clusterState.routingTable(ProjectId.DEFAULT).index(indexName);
if (indexRoutingTable == null) {
return Collections.emptyMap();
}

Map<String, Integer> nodeToShardCount = new HashMap<>();
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId);
for (int copy = 0; copy < shardRoutingTable.size(); copy++) {
ShardRouting shardRouting = shardRoutingTable.shard(copy);
String nodeId = shardRouting.currentNodeId();
if (nodeId != null) {
nodeToShardCount.merge(nodeId, 1, Integer::sum);
}
}
}

return nodeToShardCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
final TopDocsStats topDocsStats;
private volatile MergeResult mergeResult;
private volatile boolean hasPartialReduce;
// Note: at this time, numReducePhases does not count reductions that occur on the data node as part of batched query execution.
private volatile int numReducePhases;

/**
Expand Down