Skip to content

Conversation

GalLalouche
Copy link
Contributor

@GalLalouche GalLalouche commented Jun 15, 2025

Keep better track of shard contexts using RefCounted, so they can be released more aggressively during operator processing. For example, during TopN, we can potentially release some contexts if they don't pass the limit filter.

This is done in preparation of TopN fetch optimization, which will delay the fetching of additional columns to the data node coordinator, instead of doing it in each individual worker, thereby reducing IO. Since the node coordinator would need to maintain the shard contexts for a potentially longer duration, it is important we try to release what we can eariler.

An even more advanced optimization is to delay fetching to the main cluster coordinator, but that would be more involved, since we need to first figure out how to transport the shard contexts between nodes.

Summary of main changes:

  • DocVector now maintains a RefCounted instance per shard.
  • Things which can build or release DocVectors (e.g., LuceneSourceOperator, TopNOperator), can also hold RefCounted instances, so they can pass them to DocVector and also ensure contexts aren't released if they can still be potentially used later.
  • Driver's main loop iteration (runSingleLoopIteration), now closes its operators even between different operator processing. This is extra aggressive, and was mostly done to improve testability.
  • Added a couple of tests to TopNOperator and a new integration test EsqlTopNShardManagementIT, which uses the pausable plugin framework to check that TopNOperator releases things as early as possible..
Keep better track of shard contexts using `RefCounted`, so they can be released more aggressively during operator processing. For example, during TopN, we can potentially release some contexts if they don't pass the limit filter. This is done in preparation of TopN fetch optimization, which will delay the fetching of additional columns to the data node coordinator, instead of doing it in each individual worker, thereby reducing IO. Since the node coordinator would need to maintain the shard contexts for a potentially longer duration, it is important we try to release what we can eariler. An even more advanced optimization is to delay fetching to the main cluster coordinator, but that would be more involved, since we need to first figure out how to transport the shard contexts between nodes.
@GalLalouche GalLalouche added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL labels Jun 15, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @GalLalouche, I've created a changelog YAML for you.

public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
SearchContext searchContext = super.createSearchContext(request, timeout);
onPutContext.accept(searchContext.readerContext());
try {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done after confirming with @dnhatn that onPutContext here can be replaced with onCreateSearchContext. The try/catch clause was copy pasted from above.

* A source operator whose output is the given tuple values. This operator produces pages
* with two Blocks. The returned pages preserve the order of values as given in the in initial list.
*/
public abstract class TupleAbstractBlockSourceOperator<T, S> extends AbstractBlockSourceOperator {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've generalized the already existing TupleBlockSourceOperator to support more than just a tuple of two longs.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few comments, but it feels right to me!

@nik9000
Copy link
Member

nik9000 commented Jun 16, 2025

If you want me to review more let me know. Or I can wait until you remove draft.

@GalLalouche
Copy link
Contributor Author

If you want me to review more let me know. Or I can wait until you remove draft.

Thanks @nik9000! I'm chasing down test failures right now, and will go over your comments in the meantime. I'll re-request a review when I'm done.

@GalLalouche GalLalouche force-pushed the feature/shard_ref_count branch 2 times, most recently from 0591cf6 to b5f56f8 Compare June 17, 2025 11:00
@GalLalouche GalLalouche force-pushed the feature/shard_ref_count branch from b5f56f8 to 1ad8eb4 Compare June 17, 2025 12:49
@GalLalouche GalLalouche marked this pull request as ready for review June 17, 2025 20:02
@GalLalouche GalLalouche requested a review from a team as a code owner June 17, 2025 20:02
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@GalLalouche GalLalouche requested a review from nik9000 June 18, 2025 09:31
@GalLalouche GalLalouche requested a review from nik9000 June 23, 2025 19:52
@GalLalouche GalLalouche enabled auto-merge (squash) June 24, 2025 11:48
@GalLalouche GalLalouche disabled auto-merge June 24, 2025 14:59
@GalLalouche GalLalouche enabled auto-merge (squash) June 24, 2025 15:55
@GalLalouche GalLalouche force-pushed the feature/shard_ref_count branch from c9bedcd to c8af2c6 Compare June 25, 2025 12:47
@GalLalouche GalLalouche merged commit 6970bd2 into elastic:main Jun 25, 2025
32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.1.0

4 participants