- Notifications
You must be signed in to change notification settings - Fork 25.5k
ESQL: Fix async operator warnings not always sent when blocking #132744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ESQL: Fix async operator warnings not always sent when blocking #132744
Conversation
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Hi @ivancea, I've created a changelog YAML for you. |
Hi @ivancea, I've updated the changelog YAML for you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks better to me. Let @dnhatn look too please.
@Override | ||
public final Operator.Status status() { | ||
return status(Math.max(0L, checkpoint.getMaxSeqNo()), Math.max(0L, checkpoint.getProcessedCheckpoint()), processNanos.sum()); | ||
return status(checkpoint.getMaxSeqNo() + 1, checkpoint.getProcessedCheckpoint() + 1, processNanos.sum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dnhatn , could you have a look at this bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this whole PR to be honest. It's the kind of thing I don't trust myself with. Obviously, I broke it in the first place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey! This change feels a bit mystic. It's properly tested in the other PR, where I fix and test multiple things around the status() here.
I'll remove it from here, it doesn't make sense in this PR, and I wanted only the other bugfix to be here!
Thanks Ivan! I think the problematic sequence is: Line 168 in b504514
|
Hi @ivancea, I've updated the changelog YAML for you. |
# Conflicts: # muted-tests.yml
responseHeadersCollector.collect(); | ||
driverContext.removeAsyncAction(); | ||
processNanos.add(System.nanoTime() - startNanos); | ||
onSeqNoCompleted(seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Answering you here @dnhatn, as to have a thread)
This PR prevents that sequence. However, we should not increase the sequence number in onFailure() (call onSeqNoCompleted), as it may cause a NullPointerException later
From what I see, there's already an if (fetched != null)
there on line 170, so no possible NPE, unless you mean something else or I missed something 👀
Lines 165 to 174 in 9f927b2
private void discardResults() { | |
long nextCheckpoint; | |
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) { | |
Fetched result = buffers.remove(nextCheckpoint); | |
checkpoint.markSeqNoAsPersisted(nextCheckpoint); | |
if (result != null) { | |
releaseFetchedOnAnyThread(result); | |
} | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Ivan. I took another closer look. I think we are all good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks Ivan!
responseHeadersCollector.collect(); | ||
driverContext.removeAsyncAction(); | ||
processNanos.add(System.nanoTime() - startNanos); | ||
onSeqNoCompleted(seqNo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Ivan. I took another closer look. I think we are all good.
…-stats * upstream/main: (36 commits) ESQL: Fix async operator warnings not always sent when blocking (elastic#132744) Method not needed anymore (elastic#132912) [Test] Excercise shutdown more reliably in snapshot stress IT (elastic#132909) Update Gradle shadow plugin to 9.0.1 (elastic#132637) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/410_named_queries/named_queries_with_score} elastic#132906 Update docker.elastic.co/wolfi/chainguard-base-fips:latest Docker digest to fa6cb69 (elastic#132735) Remove unnecessary calls to fold() (elastic#131870) Use consistent terminology for transport version resources/references (elastic#132882) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search.vectors/40_knn_search_cosine/kNN search only regular query} elastic#132890 Finalize release notes for v9.1.2 release (elastic#132745) Finalize release notes for v9.0.5 release (elastic#132718) Move inner records out of TransportVersionUtils (elastic#132872) Add support for Lookup Join on Multiple Fields (elastic#131559) Bootstrap PR-based benchmarks (elastic#132717) Refactor MetadataIndexTemplateService to use template maps instead of project metadata (elastic#132662) [Gradle] Update nebula ospackage plugin to 12.1.0 (elastic#132640) Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:ip.CdirMatchEqualsInsOrs} elastic#132860 Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:floats.InMultivalue} elastic#132859 Revert "Reuse prod code and reduce EsqlSession public surface" (elastic#132843) Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:string.LengthOfText} elastic#132857 ...
…tic#132744) Fixes elastic#128030 Fixes elastic#130296 Fixes elastic#130642 Fixes elastic#131148 Fixes elastic#132554 Fixes elastic#132555 Fixes elastic#132604 Fixes elastic#131563 Fixes elastic#132778 Extracted from elastic#132738 An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the `onSeqNoCompleted()`>`notifyIfBlocked()`>`future.onResponse(null)`, which ends the processing in some cases.
* upstream/main: (278 commits) ESQL - dense vector support cosine normalization (elastic#132721) [ML] Add support for dimensions in google vertex ai request (elastic#132689) ESQL - Add byte element support for dense_vector data type (elastic#131863) ESQL: Fix async operator warnings not always sent when blocking (elastic#132744) Method not needed anymore (elastic#132912) [Test] Excercise shutdown more reliably in snapshot stress IT (elastic#132909) Update Gradle shadow plugin to 9.0.1 (elastic#132637) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search/410_named_queries/named_queries_with_score} elastic#132906 Update docker.elastic.co/wolfi/chainguard-base-fips:latest Docker digest to fa6cb69 (elastic#132735) Remove unnecessary calls to fold() (elastic#131870) Use consistent terminology for transport version resources/references (elastic#132882) Mute org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT test {p0=search.vectors/40_knn_search_cosine/kNN search only regular query} elastic#132890 Finalize release notes for v9.1.2 release (elastic#132745) Finalize release notes for v9.0.5 release (elastic#132718) Move inner records out of TransportVersionUtils (elastic#132872) Add support for Lookup Join on Multiple Fields (elastic#131559) Bootstrap PR-based benchmarks (elastic#132717) Refactor MetadataIndexTemplateService to use template maps instead of project metadata (elastic#132662) [Gradle] Update nebula ospackage plugin to 12.1.0 (elastic#132640) Mute org.elasticsearch.xpack.esql.CsvTests test {csv-spec:ip.CdirMatchEqualsInsOrs} elastic#132860 ...
Fixes #128030
Fixes #130296
Fixes #130642
Fixes #131148
Fixes #132554
Fixes #132555
Fixes #132604
Fixes #131563
Fixes #132778
Extracted from #132738
An AsyncOperator listener misordering caused the warnings collection and status metrics updates to be executed after the
onSeqNoCompleted()
>notifyIfBlocked()
>future.onResponse(null)
, which ends the processing in some cases.Bug explanation
An example profile of one of the CI fails:
The LookupOperator has this strange status:
status={received_pages=0, total_terms=4, emitted_pages=1, process_nanos=0, completed_pages=0}
0 received pages, 0 completed pages, 0 prcess_nanos, but 4 total terms and 1 emitted page. All the 0s come from
AsyncOperator
, while the others come fromLookupFromIndexOperator
.After some investigation, the
received_pages
andcompleted_pages
were actually wrong (Fixed in #132738), but the process_nanos shows the issue: That part of the code didn't get to execute yet.