Skip to content

Commit 78a531b

Browse files
Catch and handle disconnect exceptions in search (#115836)
Getting a connection can throw an exception for a disconnected node. We failed to handle these in the adjusted spots, leading to a phase failure (and possible memory leaks for outstanding operations) instead of correctly recording a per-shard failure.
1 parent 6742147 commit 78a531b

File tree

6 files changed

+111
-65
lines changed

6 files changed

+111
-65
lines changed

docs/changelog/115836.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115836
2+
summary: Catch and handle disconnect exceptions in search
3+
area: Search
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,20 @@ public void run() {
8484

8585
for (final DfsSearchResult dfsResult : searchResults) {
8686
final SearchShardTarget shardTarget = dfsResult.getSearchShardTarget();
87-
Transport.Connection connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
88-
ShardSearchRequest shardRequest = rewriteShardSearchRequest(dfsResult.getShardSearchRequest());
87+
final int shardIndex = dfsResult.getShardIndex();
8988
QuerySearchRequest querySearchRequest = new QuerySearchRequest(
90-
context.getOriginalIndices(dfsResult.getShardIndex()),
89+
context.getOriginalIndices(shardIndex),
9190
dfsResult.getContextId(),
92-
shardRequest,
91+
rewriteShardSearchRequest(dfsResult.getShardSearchRequest()),
9392
dfs
9493
);
95-
final int shardIndex = dfsResult.getShardIndex();
94+
final Transport.Connection connection;
95+
try {
96+
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
97+
} catch (Exception e) {
98+
shardFailure(e, querySearchRequest, shardIndex, shardTarget, counter);
99+
return;
100+
}
96101
searchTransportService.sendExecuteQuery(
97102
connection,
98103
querySearchRequest,
@@ -112,10 +117,7 @@ protected void innerOnResponse(QuerySearchResult response) {
112117
@Override
113118
public void onFailure(Exception exception) {
114119
try {
115-
context.getLogger()
116-
.debug(() -> "[" + querySearchRequest.contextId() + "] Failed to execute query phase", exception);
117-
progressListener.notifyQueryFailure(shardIndex, shardTarget, exception);
118-
counter.onFailure(shardIndex, shardTarget, exception);
120+
shardFailure(exception, querySearchRequest, shardIndex, shardTarget, counter);
119121
} finally {
120122
if (context.isPartOfPointInTime(querySearchRequest.contextId()) == false) {
121123
// the query might not have been executed at all (for example because thread pool rejected
@@ -134,6 +136,18 @@ public void onFailure(Exception exception) {
134136
}
135137
}
136138

139+
private void shardFailure(
140+
Exception exception,
141+
QuerySearchRequest querySearchRequest,
142+
int shardIndex,
143+
SearchShardTarget shardTarget,
144+
CountedCollector<SearchPhaseResult> counter
145+
) {
146+
context.getLogger().debug(() -> "[" + querySearchRequest.contextId() + "] Failed to execute query phase", exception);
147+
progressListener.notifyQueryFailure(shardIndex, shardTarget, exception);
148+
counter.onFailure(shardIndex, shardTarget, exception);
149+
}
150+
137151
// package private for testing
138152
ShardSearchRequest rewriteShardSearchRequest(ShardSearchRequest request) {
139153
SearchSourceBuilder source = request.source();

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.search.internal.ShardSearchContextId;
2222
import org.elasticsearch.search.rank.RankDoc;
2323
import org.elasticsearch.search.rank.RankDocShardInfo;
24+
import org.elasticsearch.transport.Transport;
2425

2526
import java.util.ArrayList;
2627
import java.util.HashMap;
@@ -214,9 +215,41 @@ private void executeFetch(
214215
final ShardSearchContextId contextId = shardPhaseResult.queryResult() != null
215216
? shardPhaseResult.queryResult().getContextId()
216217
: shardPhaseResult.rankFeatureResult().getContextId();
218+
var listener = new SearchActionListener<FetchSearchResult>(shardTarget, shardIndex) {
219+
@Override
220+
public void innerOnResponse(FetchSearchResult result) {
221+
try {
222+
progressListener.notifyFetchResult(shardIndex);
223+
counter.onResult(result);
224+
} catch (Exception e) {
225+
context.onPhaseFailure(FetchSearchPhase.this, "", e);
226+
}
227+
}
228+
229+
@Override
230+
public void onFailure(Exception e) {
231+
try {
232+
logger.debug(() -> "[" + contextId + "] Failed to execute fetch phase", e);
233+
progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
234+
counter.onFailure(shardIndex, shardTarget, e);
235+
} finally {
236+
// the search context might not be cleared on the node where the fetch was executed for example
237+
// because the action was rejected by the thread pool. in this case we need to send a dedicated
238+
// request to clear the search context.
239+
releaseIrrelevantSearchContext(shardPhaseResult, context);
240+
}
241+
}
242+
};
243+
final Transport.Connection connection;
244+
try {
245+
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
246+
} catch (Exception e) {
247+
listener.onFailure(e);
248+
return;
249+
}
217250
context.getSearchTransport()
218251
.sendExecuteFetch(
219-
context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId()),
252+
connection,
220253
new ShardFetchSearchRequest(
221254
context.getOriginalIndices(shardPhaseResult.getShardIndex()),
222255
contextId,
@@ -228,31 +261,7 @@ private void executeFetch(
228261
aggregatedDfs
229262
),
230263
context.getTask(),
231-
new SearchActionListener<>(shardTarget, shardIndex) {
232-
@Override
233-
public void innerOnResponse(FetchSearchResult result) {
234-
try {
235-
progressListener.notifyFetchResult(shardIndex);
236-
counter.onResult(result);
237-
} catch (Exception e) {
238-
context.onPhaseFailure(FetchSearchPhase.this, "", e);
239-
}
240-
}
241-
242-
@Override
243-
public void onFailure(Exception e) {
244-
try {
245-
logger.debug(() -> "[" + contextId + "] Failed to execute fetch phase", e);
246-
progressListener.notifyFetchFailure(shardIndex, shardTarget, e);
247-
counter.onFailure(shardIndex, shardTarget, e);
248-
} finally {
249-
// the search context might not be cleared on the node where the fetch was executed for example
250-
// because the action was rejected by the thread pool. in this case we need to send a dedicated
251-
// request to clear the search context.
252-
releaseIrrelevantSearchContext(shardPhaseResult, context);
253-
}
254-
}
255-
}
264+
listener
256265
);
257266
}
258267

server/src/main/java/org/elasticsearch/action/search/RankFeaturePhase.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.search.rank.feature.RankFeatureDoc;
2525
import org.elasticsearch.search.rank.feature.RankFeatureResult;
2626
import org.elasticsearch.search.rank.feature.RankFeatureShardRequest;
27+
import org.elasticsearch.transport.Transport;
2728

2829
import java.util.List;
2930

@@ -131,38 +132,46 @@ private void executeRankFeatureShardPhase(
131132
final SearchShardTarget shardTarget = queryResult.queryResult().getSearchShardTarget();
132133
final ShardSearchContextId contextId = queryResult.queryResult().getContextId();
133134
final int shardIndex = queryResult.getShardIndex();
135+
var listener = new SearchActionListener<RankFeatureResult>(shardTarget, shardIndex) {
136+
@Override
137+
protected void innerOnResponse(RankFeatureResult response) {
138+
try {
139+
progressListener.notifyRankFeatureResult(shardIndex);
140+
rankRequestCounter.onResult(response);
141+
} catch (Exception e) {
142+
context.onPhaseFailure(RankFeaturePhase.this, "", e);
143+
}
144+
}
145+
146+
@Override
147+
public void onFailure(Exception e) {
148+
try {
149+
logger.debug(() -> "[" + contextId + "] Failed to execute rank phase", e);
150+
progressListener.notifyRankFeatureFailure(shardIndex, shardTarget, e);
151+
rankRequestCounter.onFailure(shardIndex, shardTarget, e);
152+
} finally {
153+
releaseIrrelevantSearchContext(queryResult, context);
154+
}
155+
}
156+
};
157+
final Transport.Connection connection;
158+
try {
159+
connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
160+
} catch (Exception e) {
161+
listener.onFailure(e);
162+
return;
163+
}
134164
context.getSearchTransport()
135165
.sendExecuteRankFeature(
136-
context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId()),
166+
connection,
137167
new RankFeatureShardRequest(
138168
context.getOriginalIndices(queryResult.getShardIndex()),
139169
queryResult.getContextId(),
140170
queryResult.getShardSearchRequest(),
141171
entry
142172
),
143173
context.getTask(),
144-
new SearchActionListener<>(shardTarget, shardIndex) {
145-
@Override
146-
protected void innerOnResponse(RankFeatureResult response) {
147-
try {
148-
progressListener.notifyRankFeatureResult(shardIndex);
149-
rankRequestCounter.onResult(response);
150-
} catch (Exception e) {
151-
context.onPhaseFailure(RankFeaturePhase.this, "", e);
152-
}
153-
}
154-
155-
@Override
156-
public void onFailure(Exception e) {
157-
try {
158-
logger.debug(() -> "[" + contextId + "] Failed to execute rank phase", e);
159-
progressListener.notifyRankFeatureFailure(shardIndex, shardTarget, e);
160-
rankRequestCounter.onFailure(shardIndex, shardTarget, e);
161-
} finally {
162-
releaseIrrelevantSearchContext(queryResult, context);
163-
}
164-
}
165-
}
174+
listener
166175
);
167176
}
168177

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,12 +87,14 @@ protected void executePhaseOnShard(
8787
final SearchShardTarget shard,
8888
final SearchActionListener<DfsSearchResult> listener
8989
) {
90-
getSearchTransport().sendExecuteDfs(
91-
getConnection(shard.getClusterAlias(), shard.getNodeId()),
92-
buildShardSearchRequest(shardIt, listener.requestIndex),
93-
getTask(),
94-
listener
95-
);
90+
final Transport.Connection connection;
91+
try {
92+
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
93+
} catch (Exception e) {
94+
listener.onFailure(e);
95+
return;
96+
}
97+
getSearchTransport().sendExecuteDfs(connection, buildShardSearchRequest(shardIt, listener.requestIndex), getTask(), listener);
9698
}
9799

98100
@Override

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,15 @@ protected void executePhaseOnShard(
9494
final SearchShardTarget shard,
9595
final SearchActionListener<SearchPhaseResult> listener
9696
) {
97+
final Transport.Connection connection;
98+
try {
99+
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
100+
} catch (Exception e) {
101+
listener.onFailure(e);
102+
return;
103+
}
97104
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));
98-
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
105+
getSearchTransport().sendExecuteQuery(connection, request, getTask(), listener);
99106
}
100107

101108
@Override

0 commit comments

Comments
 (0)