Skip to content

Commit 6fd6697

Browse files
committed
Return failures in partial response
1 parent 83faaa0 commit 6fd6697

File tree

10 files changed

+86
-42
lines changed

10 files changed

+86
-42
lines changed

test/external-modules/error-query/src/javaRestTest/java/org/elasticsearch/test/esql/EsqlPartialResultsIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.client.Response;
1515
import org.elasticsearch.client.ResponseException;
1616
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1718
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1819
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1920
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -26,6 +27,7 @@
2627

2728
import static org.hamcrest.Matchers.containsString;
2829
import static org.hamcrest.Matchers.equalTo;
30+
import static org.hamcrest.Matchers.hasSize;
2931
import static org.hamcrest.Matchers.lessThanOrEqualTo;
3032

3133
public class EsqlPartialResultsIT extends ESRestTestCase {
@@ -97,6 +99,7 @@ public Set<String> populateIndices() throws Exception {
9799
return okIds;
98100
}
99101

102+
@SuppressWarnings("unchecked")
100103
public void testPartialResult() throws Exception {
101104
Set<String> okIds = populateIndices();
102105
String query = """
@@ -113,11 +116,30 @@ public void testPartialResult() throws Exception {
113116
}
114117
Response resp = client().performRequest(request);
115118
Map<String, Object> results = entityAsMap(resp);
119+
logger.info("--> results {}", results);
116120
assertThat(results.get("is_partial"), equalTo(true));
117121
List<?> columns = (List<?>) results.get("columns");
118122
assertThat(columns, equalTo(List.of(Map.of("name", "fail_me", "type", "long"), Map.of("name", "v", "type", "long"))));
119123
List<?> values = (List<?>) results.get("values");
120124
assertThat(values.size(), lessThanOrEqualTo(okIds.size()));
125+
Map<String, Object> localInfo = (Map<String, Object>) XContentMapValues.extractValue(
126+
results,
127+
"_clusters",
128+
"details",
129+
"(local)"
130+
);
131+
assertNotNull(localInfo);
132+
assertThat(XContentMapValues.extractValue(localInfo, "_shards", "successful"), equalTo(0));
133+
assertThat(
134+
XContentMapValues.extractValue(localInfo, "_shards", "failed"),
135+
equalTo(XContentMapValues.extractValue(localInfo, "_shards", "total"))
136+
);
137+
List<Map<String, Object>> failures = (List<Map<String, Object>>) XContentMapValues.extractValue(localInfo, "failures");
138+
assertThat(failures, hasSize(1));
139+
assertThat(
140+
failures.get(0).get("reason"),
141+
equalTo(Map.of("type", "illegal_state_exception", "reason", "Accessing failing field"))
142+
);
121143
}
122144
// allow_partial_results = false
123145
{

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16-
import java.util.ArrayList;
1716
import java.util.EnumMap;
1817
import java.util.List;
1918
import java.util.Map;
@@ -137,27 +136,4 @@ private Exception buildFailure() {
137136
assert first != null;
138137
return first;
139138
}
140-
141-
public List<Exception> getFailures() {
142-
if (hasFailure == false) {
143-
return List.of();
144-
}
145-
synchronized (this) {
146-
List<Exception> failures = new ArrayList<>();
147-
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE)) {
148-
for (Exception e : categories.get(category)) {
149-
if (failures.size() <= maxExceptions) {
150-
failures.add(e);
151-
}
152-
}
153-
}
154-
if (failures.isEmpty()) {
155-
Exception any = categories.get(Category.CANCELLATION).poll();
156-
if (any != null) {
157-
failures.add(any);
158-
}
159-
}
160-
return failures;
161-
}
162-
}
163139
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.elasticsearch.ExceptionsHelper;
1111
import org.elasticsearch.ResourceNotFoundException;
12+
import org.elasticsearch.action.search.ShardSearchFailure;
1213
import org.elasticsearch.client.internal.Client;
1314
import org.elasticsearch.common.breaker.CircuitBreaker;
1415
import org.elasticsearch.common.breaker.CircuitBreakingException;
@@ -37,11 +38,13 @@
3738
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
3839
import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
3940
import static org.hamcrest.Matchers.containsString;
41+
import static org.hamcrest.Matchers.empty;
4042
import static org.hamcrest.Matchers.equalTo;
4143
import static org.hamcrest.Matchers.greaterThan;
4244
import static org.hamcrest.Matchers.in;
4345
import static org.hamcrest.Matchers.is;
4446
import static org.hamcrest.Matchers.lessThanOrEqualTo;
47+
import static org.hamcrest.Matchers.not;
4548

4649
public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterTestCase {
4750

@@ -70,6 +73,14 @@ private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, C
7073
assertClusterPartial(resp, clusterAlias, cluster.okShards + cluster.failingShards, cluster.okShards);
7174
}
7275

76+
private void assertClusterFailure(EsqlQueryResponse resp, String clusterAlias, String reason) {
77+
EsqlExecutionInfo.Cluster info = resp.getExecutionInfo().getCluster(clusterAlias);
78+
assertThat(info.getFailures(), not(empty()));
79+
for (ShardSearchFailure f : info.getFailures()) {
80+
assertThat(f.reason(), containsString(reason));
81+
}
82+
}
83+
7384
private void assertClusterPartial(EsqlQueryResponse resp, String clusterAlias, int totalShards, int okShards) {
7485
EsqlExecutionInfo.Cluster clusterInfo = resp.getExecutionInfo().getCluster(clusterAlias);
7586
assertThat(clusterInfo.getTotalShards(), equalTo(totalShards));
@@ -83,6 +94,7 @@ private void assertClusterSuccess(EsqlQueryResponse resp, String clusterAlias, i
8394
assertThat(clusterInfo.getSuccessfulShards(), equalTo(numShards));
8495
assertThat(clusterInfo.getFailedShards(), equalTo(0));
8596
assertThat(clusterInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
97+
assertThat(clusterInfo.getFailures(), empty());
8698
}
8799

88100
public void testPartialResults() throws Exception {
@@ -110,10 +122,12 @@ public void testPartialResults() throws Exception {
110122
assertTrue(returnedIds.add(id));
111123
assertThat(id, is(in(allIds)));
112124
}
113-
114125
assertClusterPartial(resp, LOCAL_CLUSTER, local);
115126
assertClusterPartial(resp, REMOTE_CLUSTER_1, remote1);
116127
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2);
128+
for (String cluster : List.of(LOCAL_CLUSTER, REMOTE_CLUSTER_1, REMOTE_CLUSTER_2)) {
129+
assertClusterFailure(resp, cluster, "Accessing failing field");
130+
}
117131
}
118132
}
119133

@@ -139,6 +153,7 @@ public void testOneRemoteClusterPartial() throws Exception {
139153
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
140154
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
141155
assertClusterPartial(resp, REMOTE_CLUSTER_2, remote2.failingShards, 0);
156+
assertClusterFailure(resp, REMOTE_CLUSTER_2, "Accessing failing field");
142157
}
143158
}
144159

@@ -191,9 +206,9 @@ public void sendResponse(Exception exception) {
191206
}
192207
assertThat(returnedIds, equalTo(Sets.union(local.okIds, remote1.okIds)));
193208
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
194-
195209
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
196210
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
211+
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
197212
}
198213
} finally {
199214
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
@@ -239,9 +254,9 @@ public void testFailToStartRequestOnRemoteCluster() throws Exception {
239254
}
240255
assertThat(returnedIds, equalTo(local.okIds));
241256
assertClusterSuccess(resp, LOCAL_CLUSTER, local.okShards);
242-
243257
EsqlExecutionInfo.Cluster remoteInfo = resp.getExecutionInfo().getCluster(REMOTE_CLUSTER_1);
244258
assertThat(remoteInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
259+
assertClusterFailure(resp, REMOTE_CLUSTER_1, simulatedFailure.getMessage());
245260
}
246261
} finally {
247262
for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {
@@ -286,8 +301,7 @@ public void testFailSearchShardsOnLocalCluster() throws Exception {
286301
assertThat(returnedIds, equalTo(remote1.okIds));
287302
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(LOCAL_CLUSTER);
288303
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
289-
290-
assertClusterSuccess(resp, REMOTE_CLUSTER_1, remote1.okShards);
304+
assertClusterFailure(resp, LOCAL_CLUSTER, simulatedFailure.getMessage());
291305
}
292306
} finally {
293307
for (TransportService transportService : cluster(LOCAL_CLUSTER).getInstances(TransportService.class)) {

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.plugins.Plugin;
1515
import org.elasticsearch.test.ESIntegTestCase;
1616
import org.elasticsearch.test.FailingFieldPlugin;
17+
import org.elasticsearch.transport.RemoteClusterService;
1718
import org.elasticsearch.xcontent.XContentBuilder;
1819
import org.elasticsearch.xcontent.json.JsonXContent;
1920
import org.elasticsearch.xpack.esql.EsqlTestUtils;
@@ -26,9 +27,12 @@
2627
import java.util.Set;
2728

2829
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
30+
import static org.hamcrest.Matchers.containsString;
31+
import static org.hamcrest.Matchers.empty;
2932
import static org.hamcrest.Matchers.equalTo;
3033
import static org.hamcrest.Matchers.in;
3134
import static org.hamcrest.Matchers.lessThanOrEqualTo;
35+
import static org.hamcrest.Matchers.not;
3236

3337
/**
3438
* Make sure the failures on the data node come back as failures over the wire.
@@ -121,6 +125,10 @@ public void testPartialResults() throws Exception {
121125
assertThat(id, in(okIds));
122126
assertTrue(actualIds.add(id));
123127
}
128+
EsqlExecutionInfo.Cluster localInfo = resp.getExecutionInfo().getCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY);
129+
assertThat(localInfo.getFailures(), not(empty()));
130+
assertThat(localInfo.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.PARTIAL));
131+
assertThat(localInfo.getFailures().get(0).reason(), containsString("Accessing failing field"));
124132
}
125133
}
126134
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ private void updateExecutionInfo(EsqlExecutionInfo executionInfo, String cluster
163163
builder.setTook(executionInfo.tookSoFar());
164164
}
165165
if (v.getStatus() == EsqlExecutionInfo.Cluster.Status.RUNNING) {
166-
if (executionInfo.isStopped() || resp.failedShards > 0) {
166+
builder.setFailures(resp.failures);
167+
if (executionInfo.isStopped() || resp.failedShards > 0 || resp.failures.isEmpty() == false) {
167168
builder.setStatus(EsqlExecutionInfo.Cluster.Status.PARTIAL);
168169
} else {
169170
builder.setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL);

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
package org.elasticsearch.xpack.esql.plugin;
99

1010
import org.elasticsearch.TransportVersions;
11+
import org.elasticsearch.action.search.ShardSearchFailure;
1112
import org.elasticsearch.common.io.stream.StreamInput;
1213
import org.elasticsearch.common.io.stream.StreamOutput;
1314
import org.elasticsearch.compute.operator.DriverProfile;
@@ -29,7 +30,7 @@ final class ComputeResponse extends TransportResponse {
2930
public final int successfulShards;
3031
public final int skippedShards;
3132
public final int failedShards;
32-
public final List<Exception> failures;
33+
public final List<ShardSearchFailure> failures;
3334

3435
ComputeResponse(List<DriverProfile> profiles) {
3536
this(profiles, null, null, null, null, null, List.of());
@@ -42,7 +43,7 @@ final class ComputeResponse extends TransportResponse {
4243
Integer successfulShards,
4344
Integer skippedShards,
4445
Integer failedShards,
45-
List<Exception> failures
46+
List<ShardSearchFailure> failures
4647
) {
4748
this.profiles = profiles;
4849
this.took = took;
@@ -78,7 +79,7 @@ final class ComputeResponse extends TransportResponse {
7879
this.failedShards = 0;
7980
}
8081
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
81-
this.failures = in.readCollectionAsImmutableList(StreamInput::readException);
82+
this.failures = in.readCollectionAsImmutableList(ShardSearchFailure::readShardSearchFailure);
8283
} else {
8384
this.failures = List.of();
8485
}
@@ -102,7 +103,7 @@ public void writeTo(StreamOutput out) throws IOException {
102103
out.writeVInt(failedShards);
103104
}
104105
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
105-
out.writeCollection(failures, StreamOutput::writeException);
106+
out.writeCollection(failures, (o, v) -> v.writeTo(o));
106107
}
107108
}
108109

@@ -130,7 +131,7 @@ public int getFailedShards() {
130131
return failedShards;
131132
}
132133

133-
public List<Exception> getFailures() {
134+
public List<ShardSearchFailure> getFailures() {
134135
return failures;
135136
}
136137
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,7 @@ public void execute(
268268
.setSuccessfulShards(r.getSuccessfulShards())
269269
.setSkippedShards(r.getSkippedShards())
270270
.setFailedShards(r.getFailedShards())
271+
.setFailures(r.failures)
271272
.build()
272273
);
273274
dataNodesListener.onResponse(r.getProfiles());

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ void startComputeOnDataNodes(
101101
new DataNodeRequestSender(
102102
transportService,
103103
esqlExecutor,
104+
clusterAlias,
104105
parentTask,
105106
configuration.allowPartialResults(),
106107
configuration.pragmas().maxConcurrentNodesPerCluster()

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.action.search.SearchShardsGroup;
1616
import org.elasticsearch.action.search.SearchShardsRequest;
1717
import org.elasticsearch.action.search.SearchShardsResponse;
18+
import org.elasticsearch.action.search.ShardSearchFailure;
1819
import org.elasticsearch.action.support.TransportActions;
1920
import org.elasticsearch.cluster.node.DiscoveryNode;
2021
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -26,6 +27,7 @@
2627
import org.elasticsearch.index.Index;
2728
import org.elasticsearch.index.query.QueryBuilder;
2829
import org.elasticsearch.index.shard.ShardId;
30+
import org.elasticsearch.search.SearchShardTarget;
2931
import org.elasticsearch.search.internal.AliasFilter;
3032
import org.elasticsearch.tasks.CancellableTask;
3133
import org.elasticsearch.tasks.Task;
@@ -73,6 +75,7 @@ abstract class DataNodeRequestSender {
7375

7476
private final TransportService transportService;
7577
private final Executor esqlExecutor;
78+
private final String clusterAlias;
7679
private final CancellableTask rootTask;
7780
private final boolean allowPartialResults;
7881
private final Semaphore concurrentRequests;
@@ -87,12 +90,14 @@ abstract class DataNodeRequestSender {
8790
DataNodeRequestSender(
8891
TransportService transportService,
8992
Executor esqlExecutor,
93+
String clusterAlias,
9094
CancellableTask rootTask,
9195
boolean allowPartialResults,
9296
int concurrentRequests
9397
) {
9498
this.transportService = transportService;
9599
this.esqlExecutor = esqlExecutor;
100+
this.clusterAlias = clusterAlias;
96101
this.rootTask = rootTask;
97102
this.allowPartialResults = allowPartialResults;
98103
this.concurrentRequests = concurrentRequests > 0 ? new Semaphore(concurrentRequests) : null;
@@ -209,16 +214,25 @@ private void reportFailures(ComputeListener computeListener) {
209214
}
210215
}
211216

212-
private List<Exception> selectFailures() {
217+
private List<ShardSearchFailure> selectFailures() {
213218
assert reportedFailure == false;
214-
FailureCollector collector = new FailureCollector();
215-
Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
216-
for (ShardFailure e : shardFailures.values()) {
217-
if (seen.add(e.failure)) {
218-
collector.unwrapAndCollect(e.failure);
219+
final List<ShardSearchFailure> failures = new ArrayList<>();
220+
final Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
221+
for (Map.Entry<ShardId, ShardFailure> e : shardFailures.entrySet()) {
222+
final ShardFailure failure = e.getValue();
223+
if (ExceptionsHelper.unwrap(failure.failure(), TaskCancelledException.class) != null) {
224+
continue;
219225
}
226+
if (seen.add(failure.failure) && failures.size() < 5) {
227+
failures.add(new ShardSearchFailure(failure.failure, new SearchShardTarget(null, e.getKey(), clusterAlias)));
228+
}
229+
}
230+
// pick any cancellation exception
231+
if (failures.isEmpty() && shardFailures.isEmpty() == false) {
232+
final ShardFailure any = shardFailures.values().iterator().next();
233+
failures.add(new ShardSearchFailure(any.failure));
220234
}
221-
return collector.getFailures();
235+
return failures;
222236
}
223237

224238
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequest;
6363
import static org.hamcrest.Matchers.anyOf;
6464
import static org.hamcrest.Matchers.containsString;
65+
import static org.hamcrest.Matchers.empty;
6566
import static org.hamcrest.Matchers.equalTo;
6667
import static org.hamcrest.Matchers.hasSize;
6768
import static org.hamcrest.Matchers.in;
@@ -145,6 +146,10 @@ public void testMissingShards() {
145146
assertThat(resp.totalShards, equalTo(3));
146147
assertThat(resp.failedShards, equalTo(1));
147148
assertThat(resp.successfulShards, equalTo(2));
149+
assertThat(resp.failures, not(empty()));
150+
assertNotNull(resp.failures.get(0).shard());
151+
assertThat(resp.failures.get(0).shard().getShardId(), equalTo(shard3));
152+
assertThat(resp.failures.get(0).reason(), containsString("no shard copies found"));
148153
}
149154
}
150155

@@ -453,6 +458,7 @@ PlainActionFuture<ComputeResponse> sendRequests(
453458
DataNodeRequestSender requestSender = new DataNodeRequestSender(
454459
transportService,
455460
executor,
461+
"",
456462
task,
457463
allowPartialResults,
458464
concurrentRequests

0 commit comments

Comments
 (0)