Skip to content

Commit bde7828

Browse files
Catch and handle disconnect exceptions in search (#115836) (#117373)
Getting a connection can throw an exception for a disconnected node. We failed to handle these in the adjusted spots, leading to a phase failure (and possible memory leaks for outstanding operations) instead of correctly recording a per-shard failure.
1 parent 3699811 commit bde7828

File tree

7 files changed

+121
-71
lines changed

7 files changed

+121
-71
lines changed

docs/changelog/115836.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115836
2+
summary: Catch and handle disconnect exceptions in search
3+
area: Search
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,20 @@ public void run() {
8484

8585
for (final DfsSearchResult dfsResult : searchResults) {
8686
final SearchShardTarget shardTarget = dfsResult.getSearchShardTarget();
87-
Transport.Connection connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
88-
ShardSearchRequest shardRequest = rewriteShardSearchRequest(dfsResult.getShardSearchRequest());
87+
final int shardIndex = dfsResult.getShardIndex();
8988
QuerySearchRequest querySearchRequest = new QuerySearchRequest(
90-
context.getOriginalIndices(dfsResult.getShardIndex()),
89+
context.getOriginalIndices(shardIndex),
9190
dfsResult.getContextId(),
92-
shardRequest,
91+
rewriteShardSearchRequest(dfsResult.getShardSearchRequest()),
9392
dfs
9493
);
95-
final int shardIndex = dfsResult.getShardIndex();
94+
final Transport.Connection connection;
95+
try {
96+
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
97+
} catch (Exception e) {
98+
shardFailure(e, querySearchRequest, shardIndex, shardTarget, counter);
99+
return;
100+
}
96101
searchTransportService.sendExecuteQuery(
97102
connection,
98103
querySearchRequest,
@@ -112,10 +117,7 @@ protected void innerOnResponse(QuerySearchResult response) {
112117
@Override
113118
public void onFailure(Exception exception) {
114119
try {
115-
context.getLogger()
116-
.debug(() -> "[" + querySearchRequest.contextId() + "] Failed to execute query phase", exception);
117-
progressListener.notifyQueryFailure(shardIndex, shardTarget, exception);
118-
counter.onFailure(shardIndex, shardTarget, exception);
120+
shardFailure(exception, querySearchRequest, shardIndex, shardTarget, counter);
119121
} finally {
120122
if (context.isPartOfPointInTime(querySearchRequest.contextId()) == false) {
121123
// the query might not have been executed at all (for example because thread pool rejected
@@ -134,6 +136,18 @@ public void onFailure(Exception exception) {
134136
}
135137
}
136138

139+
private void shardFailure(
140+
Exception exception,
141+
QuerySearchRequest querySearchRequest,
142+
int shardIndex,
143+
SearchShardTarget shardTarget,
144+
CountedCollector<SearchPhaseResult> counter
145+
) {
146+
context.getLogger().debug(() -> "[" + querySearchRequest.contextId() + "] Failed to execute query phase", exception);
147+
progressListener.notifyQueryFailure(shardIndex, shardTarget, exception);
148+
counter.onFailure(shardIndex, shardTarget, exception);
149+
}
150+
137151
// package private for testing
138152
ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) {
139153
SearchSourceBuilder source = request.source();

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.search.internal.ShardSearchContextId;
2222
import org.elasticsearch.search.rank.RankDoc;
2323
import org.elasticsearch.search.rank.RankDocShardInfo;
24+
import org.elasticsearch.transport.Transport;
2425

2526
import java.util.ArrayList;
2627
import java.util.HashMap;
@@ -214,9 +215,41 @@ private void executeFetch(
214215
final ShardSearchContextId contextId = shardPhaseResult.queryResult() != null
215216
? shardPhaseResult.queryResult().getContextId()
216217
: shardPhaseResult.rankFeatureResult().getContextId();
218+
var listener = new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
219+
@Override
220+
public void innerOnResponse(FetchSearchResult result) {
221+
try {
222+
progressListener.notifyFetchResult(shardIndex);
223+
counter.onResult(result);
224+
} catch (Exception e) {
225+
context.onPhaseFailure(FetchSearchPhase.this, "", e);
226+
}
227+
}
228+
229+
@Override
230+
public void onFailure(Exception e) {
231+
try {
232+
logger.debug(() -> "[" + contextId + "] Failed to execute fetch phase", e);
233+
progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
234+
counter.onFailure(shardIndex, shardTarget, e);
235+
} finally {
236+
// the search context might not be cleared on the node where the fetch was executed for example
237+
// because the action was rejected by the thread pool. in this case we need to send a dedicated
238+
// request to clear the search context.
239+
releaseIrrelevantSearchContext(shardPhaseResult, context);
240+
}
241+
}
242+
};
243+
final Transport.Connection connection;
244+
try {
245+
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
246+
} catch (Exception e) {
247+
listener.onFailure(e);
248+
return;
249+
}
217250
context.getSearchTransport()
218251
.sendExecuteFetch(
219-
context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId()),
252+
connection,
220253
new ShardFetchSearchRequest(
221254
context.getOriginalIndices(shardPhaseResult.getShardIndex()),
222255
contextId,
@@ -228,31 +261,7 @@ private void executeFetch(
228261
aggregatedDfs
229262
),
230263
context.getTask(),
231-
new SearchActionListener<>(shardTarget, shardIndex) {
232-
@Override
233-
public void innerOnResponse(FetchSearchResult result) {
234-
try {
235-
progressListener.notifyFetchResult(shardIndex);
236-
counter.onResult(result);
237-
} catch (Exception e) {
238-
context.onPhaseFailure(FetchSearchPhase.this, "", e);
239-
}
240-
}
241-
242-
@Override
243-
public void onFailure(Exception e) {
244-
try {
245-
logger.debug(() -> "[" + contextId + "] Failed to execute fetch phase", e);
246-
progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
247-
counter.onFailure(shardIndex, shardTarget, e);
248-
} finally {
249-
// the search context might not be cleared on the node where the fetch was executed for example
250-
// because the action was rejected by the thread pool. in this case we need to send a dedicated
251-
// request to clear the search context.
252-
releaseIrrelevantSearchContext(shardPhaseResult, context);
253-
}
254-
}
255-
}
264+
listener
256265
);
257266
}
258267

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.search.rank.feature.RankFeatureDoc;
2525
import org.elasticsearch.search.rank.feature.RankFeatureResult;
2626
import org.elasticsearch.search.rank.feature.RankFeatureShardRequest;
27+
import org.elasticsearch.transport.Transport;
2728

2829
import java.util.List;
2930

@@ -136,38 +137,46 @@ private void executeRankFeatureShardPhase(
136137
final SearchShardTarget shardTarget = queryResult.queryResult().getSearchShardTarget();
137138
final ShardSearchContextId contextId = queryResult.queryResult().getContextId();
138139
final int shardIndex = queryResult.getShardIndex();
140+
var listener = new SearchActionListener<RankFeatureResult>(shardTarget, shardIndex) {
141+
@Override
142+
protected void innerOnResponse(RankFeatureResult response) {
143+
try {
144+
progressListener.notifyRankFeatureResult(shardIndex);
145+
rankRequestCounter.onResult(response);
146+
} catch (Exception e) {
147+
context.onPhaseFailure(RankFeaturePhase.this, "", e);
148+
}
149+
}
150+
151+
@Override
152+
public void onFailure(Exception e) {
153+
try {
154+
logger.debug(() -> "[" + contextId + "] Failed to execute rank phase", e);
155+
progressListener.notifyRankFeatureFailure(shardIndex, shardTarget, e);
156+
rankRequestCounter.onFailure(shardIndex, shardTarget, e);
157+
} finally {
158+
releaseIrrelevantSearchContext(queryResult, context);
159+
}
160+
}
161+
};
162+
final Transport.Connection connection;
163+
try {
164+
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
165+
} catch (Exception e) {
166+
listener.onFailure(e);
167+
return;
168+
}
139169
context.getSearchTransport()
140170
.sendExecuteRankFeature(
141-
context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId()),
171+
connection,
142172
new RankFeatureShardRequest(
143173
context.getOriginalIndices(queryResult.getShardIndex()),
144174
queryResult.getContextId(),
145175
queryResult.getShardSearchRequest(),
146176
entry
147177
),
148178
context.getTask(),
149-
new SearchActionListener<>(shardTarget, shardIndex) {
150-
@Override
151-
protected void innerOnResponse(RankFeatureResult response) {
152-
try {
153-
progressListener.notifyRankFeatureResult(shardIndex);
154-
rankRequestCounter.onResult(response);
155-
} catch (Exception e) {
156-
context.onPhaseFailure(RankFeaturePhase.this, "", e);
157-
}
158-
}
159-
160-
@Override
161-
public void onFailure(Exception e) {
162-
try {
163-
logger.debug(() -> "[" + contextId + "] Failed to execute rank phase", e);
164-
progressListener.notifyRankFeatureFailure(shardIndex, shardTarget, e);
165-
rankRequestCounter.onFailure(shardIndex, shardTarget, e);
166-
} finally {
167-
releaseIrrelevantSearchContext(queryResult, context);
168-
}
169-
}
170-
}
179+
listener
171180
);
172181
}
173182

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,14 @@ protected void executePhaseOnShard(
8787
final SearchShardTarget shard,
8888
final SearchActionListener<DfsSearchResult> listener
8989
) {
90-
getSearchTransport().sendExecuteDfs(
91-
getConnection(shard.getClusterAlias(), shard.getNodeId()),
92-
buildShardSearchRequest(shardIt, listener.requestIndex),
93-
getTask(),
94-
listener
95-
);
90+
final Transport.Connection connection;
91+
try {
92+
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
93+
} catch (Exception e) {
94+
listener.onFailure(e);
95+
return;
96+
}
97+
getSearchTransport().sendExecuteDfs(connection, buildShardSearchRequest(shardIt, listener.requestIndex), getTask(), listener);
9698
}
9799

98100
@Override

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,15 @@ protected void executePhaseOnShard(
9393
final SearchShardTarget shard,
9494
final SearchActionListener<SearchPhaseResult> listener
9595
) {
96+
final Transport.Connection connection;
97+
try {
98+
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
99+
} catch (Exception e) {
100+
listener.onFailure(e);
101+
return;
102+
}
96103
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));
97-
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
104+
getSearchTransport().sendExecuteQuery(connection, request, getTask(), listener);
98105
}
99106

100107
@Override

server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.Version;
1717
import org.elasticsearch.action.ActionListener;
1818
import org.elasticsearch.action.OriginalIndices;
19+
import org.elasticsearch.action.support.PlainActionFuture;
1920
import org.elasticsearch.cluster.ClusterName;
2021
import org.elasticsearch.cluster.ClusterState;
2122
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -733,17 +734,20 @@ public void run() {
733734
assertThat(phase.totalHits().relation, equalTo(TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO));
734735

735736
SearchShardTarget searchShardTarget = new SearchShardTarget("node3", shardIt.shardId(), null);
737+
final PlainActionFuture<Void> f = new PlainActionFuture<>();
736738
SearchActionListener<SearchPhaseResult> listener = new SearchActionListener<SearchPhaseResult>(searchShardTarget, 0) {
737739
@Override
738-
public void onFailure(Exception e) {}
740+
public void onFailure(Exception e) {
741+
f.onFailure(e);
742+
}
739743

740744
@Override
741-
protected void innerOnResponse(SearchPhaseResult response) {}
745+
protected void innerOnResponse(SearchPhaseResult response) {
746+
fail("should not be called");
747+
}
742748
};
743-
Exception e = expectThrows(
744-
VersionMismatchException.class,
745-
() -> action.executePhaseOnShard(shardIt, searchShardTarget, listener)
746-
);
749+
action.executePhaseOnShard(shardIt, searchShardTarget, listener);
750+
Exception e = expectThrows(VersionMismatchException.class, f::actionGet);
747751
assertThat(e.getMessage(), equalTo("One of the shards is incompatible with the required minimum version [" + minVersion + "]"));
748752
}
749753
}

0 commit comments

Comments
 (0)