Skip to content

Commit e076653

Browse files
committed
Return failures in response
1 parent aa2db74 commit e076653

File tree

5 files changed

+57
-4
lines changed

5 files changed

+57
-4
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ static TransportVersion def(int id) {
185185
public static final TransportVersion ESQL_THREAD_NAME_IN_DRIVER_PROFILE = def(9_027_0_00);
186186
public static final TransportVersion INFERENCE_CONTEXT = def(9_028_0_00);
187187
public static final TransportVersion ML_INFERENCE_DEEPSEEK = def(9_029_00_0);
188+
public static final TransportVersion ESQL_FAILURE_FROM_REMOTE = def(9_030_00_0);
188189

189190
/*
190191
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/FailureCollector.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.tasks.TaskCancelledException;
1414
import org.elasticsearch.transport.TransportException;
1515

16+
import java.util.ArrayList;
1617
import java.util.EnumMap;
1718
import java.util.List;
1819
import java.util.Map;
@@ -136,4 +137,27 @@ private Exception buildFailure() {
136137
assert first != null;
137138
return first;
138139
}
140+
141+
public List<Exception> getFailures() {
142+
if (hasFailure == false) {
143+
return List.of();
144+
}
145+
synchronized (this) {
146+
List<Exception> failures = new ArrayList<>();
147+
for (Category category : List.of(Category.CLIENT, Category.SERVER, Category.SHARD_UNAVAILABLE)) {
148+
for (Exception e : categories.get(category)) {
149+
if (failures.size() <= maxExceptions) {
150+
failures.add(e);
151+
}
152+
}
153+
}
154+
if (failures.isEmpty()) {
155+
Exception any = categories.get(Category.CANCELLATION).poll();
156+
if (any != null) {
157+
failures.add(any);
158+
}
159+
}
160+
return failures;
161+
}
162+
}
139163
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ void runComputeOnRemoteCluster(
251251
try (var computeListener = new ComputeListener(transportService.getThreadPool(), cancelQueryOnFailure, listener.map(profiles -> {
252252
final TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
253253
final ComputeResponse r = finalResponse.get();
254-
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards);
254+
return new ComputeResponse(profiles, took, r.totalShards, r.successfulShards, r.skippedShards, r.failedShards, r.failures);
255255
}))) {
256256
var exchangeSource = new ExchangeSourceHandler(
257257
configuration.pragmas().exchangeBufferSize(),

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeResponse.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ final class ComputeResponse extends TransportResponse {
2929
public final int successfulShards;
3030
public final int skippedShards;
3131
public final int failedShards;
32+
public final List<Exception> failures;
3233

3334
ComputeResponse(List<DriverProfile> profiles) {
34-
this(profiles, null, null, null, null, null);
35+
this(profiles, null, null, null, null, null, List.of());
3536
}
3637

3738
ComputeResponse(
@@ -40,14 +41,16 @@ final class ComputeResponse extends TransportResponse {
4041
Integer totalShards,
4142
Integer successfulShards,
4243
Integer skippedShards,
43-
Integer failedShards
44+
Integer failedShards,
45+
List<Exception> failures
4446
) {
4547
this.profiles = profiles;
4648
this.took = took;
4749
this.totalShards = totalShards == null ? 0 : totalShards.intValue();
4850
this.successfulShards = successfulShards == null ? 0 : successfulShards.intValue();
4951
this.skippedShards = skippedShards == null ? 0 : skippedShards.intValue();
5052
this.failedShards = failedShards == null ? 0 : failedShards.intValue();
53+
this.failures = failures;
5154
}
5255

5356
ComputeResponse(StreamInput in) throws IOException {
@@ -74,6 +77,11 @@ final class ComputeResponse extends TransportResponse {
7477
this.skippedShards = 0;
7578
this.failedShards = 0;
7679
}
80+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
81+
this.failures = in.readCollectionAsImmutableList(StreamInput::readException);
82+
} else {
83+
this.failures = List.of();
84+
}
7785
}
7886

7987
@Override
@@ -93,6 +101,9 @@ public void writeTo(StreamOutput out) throws IOException {
93101
out.writeVInt(skippedShards);
94102
out.writeVInt(failedShards);
95103
}
104+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_FAILURE_FROM_REMOTE)) {
105+
out.writeCollection(failures, StreamOutput::writeException);
106+
}
96107
}
97108

98109
public List<DriverProfile> getProfiles() {
@@ -118,4 +129,8 @@ public int getSkippedShards() {
118129
public int getFailedShards() {
119130
return failedShards;
120131
}
132+
133+
public List<Exception> getFailures() {
134+
return failures;
135+
}
121136
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ final void startComputeOnDataNodes(
115115
targetShards.totalShards(),
116116
targetShards.totalShards() - shardFailures.size() - skippedShards.get(),
117117
targetShards.skippedShards() + skippedShards.get(),
118-
shardFailures.size()
118+
shardFailures.size(),
119+
selectFailures()
119120
);
120121
}))) {
121122
for (TargetShard shard : targetShards.shards.values()) {
@@ -208,6 +209,18 @@ private void reportFailures(ComputeListener computeListener) {
208209
}
209210
}
210211

212+
private List<Exception> selectFailures() {
213+
assert reportedFailure == false;
214+
FailureCollector collector = new FailureCollector();
215+
Set<Exception> seen = Collections.newSetFromMap(new IdentityHashMap<>());
216+
for (ShardFailure e : shardFailures.values()) {
217+
if (seen.add(e.failure)) {
218+
collector.unwrapAndCollect(e.failure);
219+
}
220+
}
221+
return collector.getFailures();
222+
}
223+
211224
private void sendOneNodeRequest(TargetShards targetShards, ComputeListener computeListener, NodeRequest request) {
212225
final ActionListener<List<DriverProfile>> listener = computeListener.acquireCompute();
213226
sendRequest(request.node, request.shardIds, request.aliasFilters, new NodeListener() {

0 commit comments

Comments
 (0)