Skip to content

Conversation

ivancea
Copy link
Contributor

@ivancea ivancea commented Aug 12, 2025

Fixes #128030
Fixes #130296
Fixes #130642
Fixes #131148
Fixes #132554
Fixes #132555
Fixes #132604
Fixes #131563
Fixes #132778

Extracted from #132738

An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the onSeqNoCompleted()>notifyIfBlocked()>future.onResponse(null), which ends the processing in some cases.

Bug explanation

An example profile of one of the CI fails:

{ planning={took_millis=8, start_millis=1754605547195, stop_millis=1754605547203, took_nanos=8612200}, plans=[ { cluster_name=test-cluster, node_name=test-cluster-1, description=single, plan= OutputExec[org.elasticsearch.xpack.esql.plugin.ComputeService$$Lambda/0x0000000095705068@4387ca3] \_ProjectExec[[language_code{r}#25708, language_name{f}#25719, country{f}#25716]] \_TopNExec[[Order[language_code{r}#25708,ASC,LAST], Order[language_name{f}#25719,ASC,LAST], Order[country{f}#25716,ASC,LAST]],1000[INTEGER],104] \_LookupJoinExec[[language_code{r}#25708],[language_code{f}#25718],[country{f}#25716, language_name{f}#25719]] |_LocalSourceExec[[language_code{r}#25708],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@1ef1bc4] \_EsQueryExec[languages_lookup_non_unique_key], indexMode[lookup], query[][_doc{f}#25720], limit[], sort[] estimatedRowSize[104] } ], query={took_millis=14, start_millis=1754605547195, stop_millis=1754605547209, took_nanos=14013972}, drivers=[ { cluster_name=test-cluster, sleeps={last=[], counts={}, first=[]}, cpu_nanos=3189539, operators=[ {operator=LocalSourceOperator}, { operator=LookupOperator[ index=languages_lookup_non_unique_key input_type=INTEGER match_field=language_code load_fields=[country{f}#25716, language_name{f}#25719] inputChannel=0 ], status={received_pages=0, total_terms=4, emitted_pages=1, process_nanos=0, completed_pages=0} }, { operator=TopNOperator[ count=0/1000, elementTypes=[INT, BYTES_REF, BYTES_REF], encoders=[DefaultSortable, UTF8TopNEncoder, UTF8TopNEncoder], sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false], SortOrder[channel=2, asc=true, nullsFirst=false], SortOrder[channel=1, asc=true, nullsFirst=false]] ], status={receive_nanos=36566, emit_nanos=33132, occupied_rows=0, rows_emitted=1, pages_emitted=1, ram_used=4.2kb, rows_received=1, pages_received=1, ram_bytes_used=4368} }, {operator=ProjectOperator[projection = [0, 2, 1]], status={pages_processed=1, rows_emitted=1, process_nanos=2856, rows_received=1}}, {operator=OutputOperator[columns = [language_code, language_name, country]]} ], node_name=test-cluster-1, description=single, documents_found=0, start_millis=1754605547205, stop_millis=1754605547208, values_loaded=0, iterations=3124, took_nanos=3432530 } ] } 

The LookupOperator has this strange status:
status={received_pages=0, total_terms=4, emitted_pages=1, process_nanos=0, completed_pages=0}

0 received pages, 0 completed pages, 0 prcess_nanos, but 4 total terms and 1 emitted page. All the 0s come from AsyncOperator, while the others come from LookupFromIndexOperator.

After some investigation, the received_pages and completed_pages were actually wrong (Fixed in #132738), but the process_nanos shows the issue: That part of the code didn't get to execute yet.

@ivancea ivancea added >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.2.0 labels Aug 12, 2025
@ivancea ivancea requested a review from nik9000 August 12, 2025 15:30
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've created a changelog YAML for you.

@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've updated the changelog YAML for you.

Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks better to me. Let @dnhatn look too please.

@Override
public final Operator.Status status() {
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum());
return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnhatn , could you have a look at this bit?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this whole PR to be honest. It's the kind of thing I don't trust myself with. Obviously, I broke it in the first place.

Copy link
Contributor Author

@ivancea ivancea Aug 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey! This change feels a bit mystic. It's properly tested in the other PR, where I fix and test multiple things around the status() here.

I'll remove it from here, it doesn't make sense in this PR, and I wanted only the other bugfix to be here!

@dnhatn
Copy link
Member

dnhatn commented Aug 12, 2025

Thanks Ivan!

I think the problematic sequence is: onSeqNoCompleted(seqNo)close()responseHeadersCollector.finish()responseHeadersCollector.collect()driverContext.removeAsyncAction(), where responseHeadersCollector.finish() is called before responseHeadersCollector.collect(). This PR prevents that sequence. However, we should not increase the sequence number in onFailure() (call onSeqNoCompleted), as it may cause a NullPointerException later:

@dnhatn dnhatn self-requested a review August 12, 2025 17:06
@elasticsearchmachine
Copy link
Collaborator

Hi @ivancea, I've updated the changelog YAML for you.

responseHeadersCollector.collect();
driverContext.removeAsyncAction();
processNanos.add(System.nanoTime() - startNanos);
onSeqNoCompleted(seqNo);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Answering you here @dnhatn, as to have a thread)

This PR prevents that sequence. However, we should not increase the sequence number in onFailure() (call onSeqNoCompleted), as it may cause a NullPointerException later

From what I see, there's already an if (fetched != null) there on line 170, so no possible NPE, unless you mean something else or I missed something 👀

private void discardResults() {
long nextCheckpoint;
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
Fetched result = buffers.remove(nextCheckpoint);
checkpoint.markSeqNoAsPersisted(nextCheckpoint);
if (result != null) {
releaseFetchedOnAnyThread(result);
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ivan. I took another closer look. I think we are all good.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks Ivan!

responseHeadersCollector.collect();
driverContext.removeAsyncAction();
processNanos.add(System.nanoTime() - startNanos);
onSeqNoCompleted(seqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ivan. I took another closer look. I think we are all good.

@ivancea ivancea merged commit 4e3602d into elastic:main Aug 14, 2025
33 checks passed
@ivancea ivancea deleted the esql-fix-async-operator-warnings branch August 14, 2025 10:05
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 14, 2025
…-stats * upstream/main: (36 commits) ESQL: Fix async operator warnings not always sent when blocking (elastic#132744) Method not needed anymore (elastic#132912) [Test] Excercise shutdown more reliably in snapshot stress IT (elastic#132909) Update Gradle shadow plugin to 9.0.1 (elastic#132637) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/410_named_queries/named_queries_with_score} elastic#132906 Update docker.elastic.co/wolfi/chainguard-base-fips:latest Docker digest to fa6cb69 (elastic#132735) Remove unnecessary calls to fold() (elastic#131870) Use consistent terminology for transport version resources/references (elastic#132882) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search.vectors/40_knn_search_cosine/kNN search only regular query} elastic#132890 Finalize release notes for v9.1.2 release (elastic#132745) Finalize release notes for v9.0.5 release (elastic#132718) Move inner records out of TransportVersionUtils (elastic#132872) Add support for Lookup Join on Multiple Fields (elastic#131559) Bootstrap PR-based benchmarks (elastic#132717) Refactor MetadataIndexTemplateService to use template maps instead of project metadata (elastic#132662) [Gradle] Update nebula ospackage plugin to 12.1.0 (elastic#132640) Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:ip.CdirMatchEqualsInsOrs} elastic#132860 Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:floats.InMultivalue} elastic#132859 Revert "Reuse prod code and reduce EsqlSession public surface" (elastic#132843) Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:string.LengthOfText} elastic#132857 ...
joshua-adams-1 pushed a commit to joshua-adams-1/elasticsearch that referenced this pull request Aug 14, 2025
…tic#132744) Fixes elastic#128030 Fixes elastic#130296 Fixes elastic#130642 Fixes elastic#131148 Fixes elastic#132554 Fixes elastic#132555 Fixes elastic#132604 Fixes elastic#131563 Fixes elastic#132778 Extracted from elastic#132738 An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the `onSeqNoCompleted()`>`notifyIfBlocked()`>`future.onResponse(null)`, which ends the processing in some cases.
szybia added a commit to szybia/elasticsearch that referenced this pull request Aug 15, 2025
* upstream/main: (278 commits) ESQL - dense vector support cosine normalization (elastic#132721) [ML] Add support for dimensions in google vertex ai request (elastic#132689) ESQL - Add byte element support for dense_vector data type (elastic#131863) ESQL: Fix async operator warnings not always sent when blocking (elastic#132744) Method not needed anymore (elastic#132912) [Test] Excercise shutdown more reliably in snapshot stress IT (elastic#132909) Update Gradle shadow plugin to 9.0.1 (elastic#132637) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/410_named_queries/named_queries_with_score} elastic#132906 Update docker.elastic.co/wolfi/chainguard-base-fips:latest Docker digest to fa6cb69 (elastic#132735) Remove unnecessary calls to fold() (elastic#131870) Use consistent terminology for transport version resources/references (elastic#132882) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search.vectors/40_knn_search_cosine/kNN search only regular query} elastic#132890 Finalize release notes for v9.1.2 release (elastic#132745) Finalize release notes for v9.0.5 release (elastic#132718) Move inner records out of TransportVersionUtils (elastic#132872) Add support for Lookup Join on Multiple Fields (elastic#131559) Bootstrap PR-based benchmarks (elastic#132717) Refactor MetadataIndexTemplateService to use template maps instead of project metadata (elastic#132662) [Gradle] Update nebula ospackage plugin to 12.1.0 (elastic#132640) Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:ip.CdirMatchEqualsInsOrs} elastic#132860 ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.2.0

4 participants