- Notifications
You must be signed in to change notification settings - Fork 25.7k
Make MutableSearchResponse ref-counted to prevent use-after-close in async search #134359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f4a1a81 025e9e3 47f554e 4a2cdc0 e3b7c64 f7db807 beb3f23 0c7e61d b02ce52 966ad7b 987434e 91eac66 f23f78f dd9567d c643cda dd09251 497a736 029c0d1 4f8e55b b504e2a 85ec60f 746caa3 8758981 81a6a55 e1d33dc 99610b2 2cb90cd a2f7f7a ab343f3 aed9d1f b82dbe8 a6a45cb 07825fd 3412045 e4fedfc c26b15d 770a902 fc178a9 2c9d1c7 0359f76 9fa88b9 286b6a9 9b57b99 dab85e4 a452322 89b3f3f ee8fbca 30fe790 26f105c bfcd31e 547a050 adada6a 9ce4cc7 634e698 ce9f54d e552d16 2ac0077 063b5a8 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| pr: 134359 | ||
| summary: Make `MutableSearchResponse` ref-counted to prevent use-after-close in async | ||
| search | ||
| area: Search | ||
| type: bug | ||
| issues: [] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,271 @@ | ||
| /* | ||
| * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
| * or more contributor license agreements. Licensed under the Elastic License | ||
| * 2.0; you may not use this file except in compliance with the Elastic License | ||
| * 2.0. | ||
| */ | ||
| | ||
| package org.elasticsearch.xpack.search; | ||
| | ||
| import org.elasticsearch.ElasticsearchStatusException; | ||
| import org.elasticsearch.ExceptionsHelper; | ||
| import org.elasticsearch.action.index.IndexRequestBuilder; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.core.TimeValue; | ||
| import org.elasticsearch.rest.RestStatus; | ||
| import org.elasticsearch.search.aggregations.AggregationBuilders; | ||
| import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
| import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase; | ||
| import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse; | ||
| | ||
| import java.util.ArrayList; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Queue; | ||
| import java.util.Set; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ConcurrentLinkedQueue; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import java.util.concurrent.atomic.AtomicReference; | ||
| import java.util.concurrent.atomic.LongAdder; | ||
| import java.util.stream.IntStream; | ||
| | ||
| @SuiteScopeTestCase | ||
| public class AsyncSearchConcurrentStatusIT extends AsyncSearchIntegTestCase { | ||
| private static String indexName; | ||
| private static int numShards; | ||
| | ||
| private static int numKeywords; | ||
| | ||
| @Override | ||
| public void setupSuiteScopeCluster() { | ||
| indexName = "test-async"; | ||
| numShards = randomIntBetween(1, 20); | ||
| int numDocs = randomIntBetween(100, 1000); | ||
| createIndex(indexName, Settings.builder().put("index.number_of_shards", numShards).build()); | ||
| numKeywords = randomIntBetween(50, 100); | ||
| Set<String> keywordSet = new HashSet<>(); | ||
| for (int i = 0; i < numKeywords; i++) { | ||
| keywordSet.add(randomAlphaOfLengthBetween(10, 20)); | ||
| } | ||
| numKeywords = keywordSet.size(); | ||
| String[] keywords = keywordSet.toArray(String[]::new); | ||
| List<IndexRequestBuilder> reqs = new ArrayList<>(); | ||
| for (int i = 0; i < numDocs; i++) { | ||
| float metric = randomFloat(); | ||
| String keyword = keywords[randomIntBetween(0, numKeywords - 1)]; | ||
| reqs.add(prepareIndex(indexName).setSource("terms", keyword, "metric", metric)); | ||
| } | ||
| indexRandom(true, true, reqs); | ||
| } | ||
| | ||
| /** | ||
| * This test spins up a set of poller threads that repeatedly call | ||
| * {@code _async_search/{id}}. Each poller starts immediately, and once enough | ||
| * requests have been issued they signal a latch to indicate the group is "warmed up". | ||
| * The test waits on this latch to deterministically ensure pollers are active. | ||
| * In parallel, a consumer thread drives the async search to completion using the | ||
| * blocking iterator. This coordinated overlap exercises the window where the task | ||
| * is closing and some status calls may return {@code 410 GONE}. | ||
| */ | ||
| public void testConcurrentStatusFetchWhileTaskCloses() throws Exception { | ||
drempapis marked this conversation as resolved. Show resolved Hide resolved | ||
| final TimeValue timeout = TimeValue.timeValueSeconds(3); | ||
| final String aggName = "terms"; | ||
| final SearchSourceBuilder source = new SearchSourceBuilder().aggregation( | ||
| AggregationBuilders.terms(aggName).field("terms.keyword").size(Math.max(1, numKeywords)) | ||
| ); | ||
| | ||
| final int progressStep = (numShards > 2) ? randomIntBetween(2, numShards) : 2; | ||
| try (SearchResponseIterator it = assertBlockingIterator(indexName, numShards, source, 0, progressStep)) { | ||
| String id = getAsyncId(it); | ||
| | ||
| PollStats stats = new PollStats(); | ||
| | ||
| // Pick a random number of status-poller threads, at least 1, up to (4×numShards) | ||
| int pollerThreads = randomIntBetween(1, 4 * numShards); | ||
| | ||
| // Wait for pollers to be active | ||
| CountDownLatch warmed = new CountDownLatch(1); | ||
| | ||
| // Executor and coordination for pollers | ||
| ExecutorService pollerExec = Executors.newFixedThreadPool(pollerThreads); | ||
| AtomicBoolean running = new AtomicBoolean(true); | ||
| Queue<Throwable> failures = new ConcurrentLinkedQueue<>(); | ||
| | ||
| CompletableFuture<Void> pollers = createPollers(id, pollerThreads, stats, warmed, pollerExec, running, failures); | ||
| | ||
| // Wait until pollers are issuing requests (warming period) | ||
| assertTrue("pollers did not warm up in time", warmed.await(timeout.millis(), TimeUnit.MILLISECONDS)); | ||
| | ||
| // Start consumer on a separate thread and capture errors | ||
| var consumerExec = Executors.newSingleThreadExecutor(); | ||
| AtomicReference<Throwable> consumerError = new AtomicReference<>(); | ||
| Future<?> consumer = consumerExec.submit(() -> { | ||
| try { | ||
| consumeAllResponses(it, aggName); | ||
| } catch (Throwable t) { | ||
| consumerError.set(t); | ||
| } | ||
| }); | ||
| | ||
| // Join consumer & surface errors | ||
| try { | ||
| consumer.get(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| | ||
| if (consumerError.get() != null) { | ||
| fail("consumeAllResponses failed: " + consumerError.get()); | ||
| } | ||
| } catch (TimeoutException e) { | ||
| consumer.cancel(true); | ||
| fail(e, "Consumer thread did not finish within timeout"); | ||
| } catch (Exception ignored) { | ||
| // ignored | ||
| } finally { | ||
| // Stop pollers | ||
| running.set(false); | ||
| try { | ||
| pollers.get(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } catch (TimeoutException te) { | ||
| // The finally block will shut down the pollers forcibly | ||
| } catch (ExecutionException ee) { | ||
| failures.add(ExceptionsHelper.unwrapCause(ee.getCause())); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| pollerExec.shutdownNow(); | ||
| try { | ||
| pollerExec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| fail("Interrupted while stopping pollers: " + ie.getMessage()); | ||
| } | ||
| } | ||
| | ||
| // Shut down the consumer executor | ||
| consumerExec.shutdown(); | ||
drempapis marked this conversation as resolved. Show resolved Hide resolved | ||
| try { | ||
| consumerExec.awaitTermination(timeout.millis(), TimeUnit.MILLISECONDS); | ||
| } catch (InterruptedException ie) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
| | ||
| assertNoWorkerFailures(failures); | ||
| assertStats(stats); | ||
| } | ||
| } | ||
| | ||
| private void assertNoWorkerFailures(Queue<Throwable> failures) { | ||
| assertTrue( | ||
| "Unexpected worker failures:\n" + failures.stream().map(ExceptionsHelper::stackTrace).reduce("", (a, b) -> a + "\n---\n" + b), | ||
| failures.isEmpty() | ||
| ); | ||
| } | ||
| | ||
| private void assertStats(PollStats stats) { | ||
| assertEquals(stats.totalCalls.sum(), stats.runningResponses.sum() + stats.completedResponses.sum()); | ||
| assertEquals("There should be no exceptions other than GONE", 0, stats.exceptions.sum()); | ||
| } | ||
| | ||
| private String getAsyncId(SearchResponseIterator it) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| assertNotNull(response.getId()); | ||
| return response.getId(); | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
| | ||
| private void consumeAllResponses(SearchResponseIterator it, String aggName) throws Exception { | ||
| while (it.hasNext()) { | ||
| AsyncSearchResponse response = it.next(); | ||
| try { | ||
| if (response.getSearchResponse() != null && response.getSearchResponse().getAggregations() != null) { | ||
| assertNotNull(response.getSearchResponse().getAggregations().get(aggName)); | ||
| } | ||
| } finally { | ||
| response.decRef(); | ||
| } | ||
| } | ||
| } | ||
| | ||
| private CompletableFuture<Void> createPollers( | ||
| String id, | ||
| int threads, | ||
| PollStats stats, | ||
| CountDownLatch warmed, | ||
| ExecutorService pollerExec, | ||
| AtomicBoolean running, | ||
| Queue<Throwable> failures | ||
| ) { | ||
| @SuppressWarnings("unchecked") | ||
| final CompletableFuture<Void>[] tasks = IntStream.range(0, threads).mapToObj(i -> CompletableFuture.runAsync(() -> { | ||
| while (running.get()) { | ||
| AsyncSearchResponse resp = null; | ||
| try { | ||
| resp = getAsyncSearch(id); | ||
| stats.totalCalls.increment(); | ||
| | ||
| // Once enough requests have been sent, consider pollers "warmed". | ||
| if (stats.totalCalls.sum() >= threads) { | ||
| warmed.countDown(); | ||
| } | ||
| | ||
| if (resp.isRunning()) { | ||
| stats.runningResponses.increment(); | ||
| } else { | ||
| // Success-only assertions: if reported completed, we must have a proper search response | ||
| assertNull("Async search reported completed with failure", resp.getFailure()); | ||
| assertNotNull("Completed async search must carry a SearchResponse", resp.getSearchResponse()); | ||
| assertNotNull("Completed async search must have aggregations", resp.getSearchResponse().getAggregations()); | ||
| assertNotNull( | ||
| "Completed async search must contain the expected aggregation", | ||
| resp.getSearchResponse().getAggregations().get("terms") | ||
| ); | ||
| stats.completedResponses.increment(); | ||
| } | ||
| } catch (Exception e) { | ||
| Throwable cause = ExceptionsHelper.unwrapCause(e); | ||
| if (cause instanceof ElasticsearchStatusException) { | ||
| RestStatus status = ExceptionsHelper.status(cause); | ||
| if (status == RestStatus.GONE) { | ||
| stats.gone410.increment(); | ||
| } else { | ||
| stats.exceptions.increment(); | ||
| failures.add(cause); | ||
| } | ||
| } else { | ||
| stats.exceptions.increment(); | ||
| failures.add(cause); | ||
| } | ||
| } finally { | ||
| if (resp != null) { | ||
| resp.decRef(); | ||
| } | ||
| } | ||
| } | ||
| }, pollerExec).whenComplete((v, ex) -> { | ||
| if (ex != null) { | ||
| failures.add(ExceptionsHelper.unwrapCause(ex)); | ||
| } | ||
| })).toArray(CompletableFuture[]::new); | ||
| | ||
| return CompletableFuture.allOf(tasks); | ||
| } | ||
| | ||
| static final class PollStats { | ||
| final LongAdder totalCalls = new LongAdder(); | ||
| final LongAdder runningResponses = new LongAdder(); | ||
| final LongAdder completedResponses = new LongAdder(); | ||
| final LongAdder exceptions = new LongAdder(); | ||
| final LongAdder gone410 = new LongAdder(); | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use keep_alive values that prevent us from ever seeing Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test itself completes quickly, so the default The class Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we randomise the keep_alive value to sometimes be:
Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The minimum This ensures the async search document is deleted before pollers stop. In practice, though, we’ll usually see Practically both 410 GONE and ResourceNotFoundException represent the same expired state through different APIs (REST vs transport). Handling both wouldn’t test new logic, it would only duplicate coverage of the same lifecycle behavior, adding complexity without improving the test’s value. Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point was mostly around the test being prescriptive around what it is indeed expecting. It's currently, I believe, mixing success and some 410 as "everything is cool". I was suggesting we separate the cases where 410s are not even an option. I appreciate you don't consider this valuable, and I'm happy to go forward as this is now (hence why I approved the PR) | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.