- Notifications
You must be signed in to change notification settings - Fork 25.7k
Make MutableSearchResponse ref-counted to prevent use-after-close in async search #134359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 39 commits
f4a1a81 025e9e3 47f554e 4a2cdc0 e3b7c64 f7db807 beb3f23 0c7e61d b02ce52 966ad7b 987434e 91eac66 f23f78f dd9567d c643cda dd09251 497a736 029c0d1 4f8e55b b504e2a 85ec60f 746caa3 8758981 81a6a55 e1d33dc 99610b2 2cb90cd a2f7f7a ab343f3 aed9d1f b82dbe8 a6a45cb 07825fd 3412045 e4fedfc c26b15d 770a902 fc178a9 2c9d1c7 0359f76 9fa88b9 286b6a9 9b57b99 dab85e4 a452322 89b3f3f ee8fbca 30fe790 26f105c bfcd31e 547a050 adada6a 9ce4cc7 634e698 ce9f54d e552d16 2ac0077 063b5a8 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 134359 | ||
| summary: Make `MutableSearchResponse` ref-counted to prevent use-after-close in async | ||
| search | ||
| area: Search | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,266 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
| | ||
| package org.elasticsearch.xpack.search; | ||
| | ||
| import org.elasticsearch.ElasticsearchStatusException; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.index.IndexRequestBuilder; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.rest.RestStatus; | ||
| import org.elasticsearch.search.aggregations.AggregationBuilders; | ||
| import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
| import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; | ||
| import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; | ||
| | ||
| import java.util.ArrayList; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.atomic.LongAdder; | ||
| | ||
| @SuiteScopeTestCase | ||
| public class AsyncSearchConcurrentStatusIT extends AsyncSearchIntegTestCase { | ||
| private static String indexName; | ||
| private static int numShards; | ||
| | ||
| private static int numKeywords; | ||
| private static Map<String, AtomicInteger> keywordFreqs; | ||
| private static float maxMetric = Float.NEGATIVE_INFINITY; | ||
| private static float minMetric = Float.POSITIVE_INFINITY; | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| | ||
| @Override | ||
| public void setupSuiteScopeCluster() { | ||
| indexName = "test-async"; | ||
| numShards = randomIntBetween(1, 20); | ||
| int numDocs = randomIntBetween(100, 1000); | ||
| createIndex(indexName, Settings.builder().put("index.number_of_shards", numShards).build()); | ||
| numKeywords = randomIntBetween(50, 100); | ||
| keywordFreqs = new HashMap<>(); | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| Set<String> keywordSet = new HashSet<>(); | ||
| for (int i = 0; i < numKeywords; i++) { | ||
| keywordSet.add(randomAlphaOfLengthBetween(10, 20)); | ||
| } | ||
| numKeywords = keywordSet.size(); | ||
| String[] keywords = keywordSet.toArray(String[]::new); | ||
| List<IndexRequestBuilder> reqs = new ArrayList<>(); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| float metric = randomFloat(); | ||
| maxMetric = Math.max(metric, maxMetric); | ||
| minMetric = Math.min(metric, minMetric); | ||
| String keyword = keywords[randomIntBetween(0, numKeywords - 1)]; | ||
| keywordFreqs.compute(keyword, (k, v) -> { | ||
| if (v == null) { | ||
| return new AtomicInteger(1); | ||
| } | ||
| v.incrementAndGet(); | ||
| return v; | ||
| }); | ||
| reqs.add(prepareIndex(indexName).setSource("terms", keyword, "metric", metric)); | ||
| } | ||
| indexRandom(true, true, reqs); | ||
| } | ||
| | ||
| /** | ||
| * Tests that concurrent async search status requests behave correctly | ||
| * while the underlying async search task is still executing and during its close/cleanup. | ||
| */ | ||
| public void testConcurrentStatusFetchWhileTaskCloses() throws Exception { | ||
drempapis marked this conversation as resolved. Show resolved Hide resolved | ||
| final String aggName = "terms"; | ||
| final SearchSourceBuilder source = new SearchSourceBuilder().aggregation( | ||
| AggregationBuilders.terms(aggName).field("terms.keyword").size(Math.max(1, numKeywords)) | ||
| ); | ||
| | ||
| final int progressStep = (numShards > 2) ? randomIntBetween(2, numShards) : 2; | ||
| try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, 0, progressStep)) { | ||
| String id = getAsyncId(it); | ||
| | ||
| PollStats stats = new PollStats(); | ||
| | ||
| int statusThreads = randomIntBetween(1, Math.max(2, 4 * numShards)); | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| StartableThreadGroup pollers = startGetStatusThreadsHot(id, statusThreads, aggName, stats); | ||
| pollers.startHotThreads.countDown(); // release pollers | ||
| | ||
| // Finish consumption on a separate thread and capture errors | ||
| var consumerExec = Executors.newSingleThreadExecutor(); | ||
| AtomicReference<Throwable> consumerError = new AtomicReference<>(); | ||
| Future<?> consumer = consumerExec.submit(() -> { | ||
| try { | ||
| consumeAllResponses(it, aggName); | ||
| } catch (Throwable t) { | ||
| consumerError.set(t); | ||
| } | ||
| }); | ||
| | ||
| Thread.sleep(randomIntBetween(100, 200)); | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| pollers.stopAndAwait(TimeValue.timeValueMillis(randomIntBetween(500, 1000))); | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| | ||
| // Join consumer & surface errors | ||
| try { | ||
| consumer.get(); | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| } catch (Exception ignored) {} finally { | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| consumerExec.shutdown(); | ||
drempapis marked this conversation as resolved. Show resolved Hide resolved | ||
| } | ||
| | ||
| assertNull(consumerError.get()); | ||
| assertNoWorkerFailures(pollers); | ||
| assertStats(stats); | ||
| } | ||
| } | ||
| | ||
| private void assertNoWorkerFailures(StartableThreadGroup pollers) { | ||
| List<Throwable> failures = pollers.getFailures(); | ||
| assertTrue( | ||
| "Unexpected worker failures:\n" + failures.stream().map(ExceptionsHelper::stackTrace).reduce("", (a, b) -> a + "\n---\n" + b), | ||
| failures.isEmpty() | ||
| ); | ||
| } | ||
| | ||
| private void assertStats(PollStats stats) { | ||
| assertEquals(stats.totalCalls.sum(), stats.runningResponses.sum() + stats.completedResponses.sum()); | ||
| assertEquals("There should be no exceptions other than GONE", 0, stats.exceptions.sum()); | ||
| } | ||
| | ||
| private String getAsyncId(SearchResponseIterator it) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| assertNotNull(response.getId()); | ||
| return response.getId(); | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
| | ||
| private void consumeAllResponses(SearchResponseIterator it, String aggName) throws Exception { | ||
| while (it.hasNext()) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| if (response.getSearchResponse() != null && response.getSearchResponse().getAggregations() != null) { | ||
| assertNotNull(response.getSearchResponse().getAggregations().get(aggName)); | ||
| } | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
| } | ||
| | ||
| private StartableThreadGroup startGetStatusThreadsHot(String id, int threads, String aggName, PollStats stats) { | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| final ExecutorService exec = Executors.newFixedThreadPool(threads); | ||
| final List<Future<?>> futures = new ArrayList<>(threads); | ||
| final AtomicBoolean running = new AtomicBoolean(true); | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| final CountDownLatch start = new CountDownLatch(1); | ||
| | ||
| for (int i = 0; i < threads; i++) { | ||
| futures.add(exec.submit(() -> { | ||
| start.await(); | ||
| while (running.get()) { | ||
| AsyncSearchResponse resp = null; | ||
| try { | ||
| resp = getAsyncSearch(id); | ||
| stats.totalCalls.increment(); | ||
| | ||
| if (resp.isRunning()) { | ||
| stats.runningResponses.increment(); | ||
| } else { | ||
| // Success-only assertions: if reported completed, we must have a proper search response | ||
| assertNull("Async search reported completed with failure", resp.getFailure()); | ||
| assertNotNull("Completed async search must carry a SearchResponse", resp.getSearchResponse()); | ||
| assertNotNull("Completed async search must have aggregations", resp.getSearchResponse().getAggregations()); | ||
| assertNotNull( | ||
| "Completed async search must contain the expected aggregation", | ||
| resp.getSearchResponse().getAggregations().get(aggName) | ||
| ); | ||
| stats.completedResponses.increment(); | ||
| } | ||
| } catch (Exception e) { | ||
| Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
| if (cause instanceof ElasticsearchStatusException) { | ||
| RestStatus status = ExceptionsHelper.status(cause); | ||
| if (status == RestStatus.GONE) { | ||
| stats.gone410.increment(); | ||
| } | ||
| } else { | ||
| stats.exceptions.increment(); | ||
| } | ||
| } finally { | ||
| if (resp != null) { | ||
| resp.decRef(); | ||
| } | ||
| } | ||
| } | ||
| return null; | ||
| })); | ||
| } | ||
| return new StartableThreadGroup(exec, futures, running, start); | ||
| } | ||
| | ||
| static final class PollStats { | ||
| final LongAdder totalCalls = new LongAdder(); | ||
| final LongAdder runningResponses = new LongAdder(); | ||
| final LongAdder completedResponses = new LongAdder(); | ||
| final LongAdder exceptions = new LongAdder(); | ||
| final LongAdder gone410 = new LongAdder(); | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use keep_alive values that prevent us from ever seeing Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test itself completes quickly, so the default The class Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we randomise the keep_alive value to sometimes be:
Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The minimum This ensures the async search document is deleted before pollers stop. In practice, though, we’ll usually see Practically both 410 GONE and ResourceNotFoundException represent the same expired state through different APIs (REST vs transport). Handling both wouldn’t test new logic, it would only duplicate coverage of the same lifecycle behavior, adding complexity without improving the test’s value. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point was mostly around the test being prescriptive around what it is indeed expecting. It's currently, I believe, mixing success and some 410 as "everything is cool". I was suggesting we separate the cases where 410s are not even an option. I appreciate you don't consider this valuable, and I'm happy to go forward as this is now (hence why I approved the PR) | ||
| } | ||
| | ||
| static class StartableThreadGroup extends ThreadGroup { | ||
drempapis marked this conversation as resolved. Outdated Show resolved Hide resolved | ||
| private final CountDownLatch startHotThreads; | ||
| | ||
| StartableThreadGroup(ExecutorService exec, List<Future<?>> futures, AtomicBoolean running, CountDownLatch startHotThreads) { | ||
| super(exec, futures, running); | ||
| this.startHotThreads = startHotThreads; | ||
| } | ||
| } | ||
| | ||
| static class ThreadGroup { | ||
| private final ExecutorService exec; | ||
| private final List<Future<?>> futures; | ||
| private final AtomicBoolean running; | ||
| | ||
| private ThreadGroup(ExecutorService exec, List<Future<?>> futures, AtomicBoolean running) { | ||
| this.exec = exec; | ||
| this.futures = futures; | ||
| this.running = running; | ||
| } | ||
| | ||
| void stopAndAwait(TimeValue timeout) throws InterruptedException { | ||
| running.set(false); | ||
| exec.shutdown(); | ||
| if (exec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS) == false) { | ||
| exec.shutdownNow(); | ||
| exec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } | ||
| } | ||
| | ||
| List<Throwable> getFailures() { | ||
| List<Throwable> failures = new ArrayList<>(); | ||
| for (Future<?> f : futures) { | ||
| try { | ||
| f.get(); | ||
| } catch (CancellationException ignored) {} catch (ExecutionException ee) { | ||
| failures.add(ExceptionsHelper.unwrapCause(ee.getCause())); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| if (failures.isEmpty()) failures.add(ie); | ||
| } | ||
| } | ||
| return failures; | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.