- Notifications
You must be signed in to change notification settings - Fork 25.5k
Aggs: Fix CB on reduction phase #133398
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Aggs: Fix CB on reduction phase #133398
Conversation
server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java Show resolved Hide resolved
while ((batchedResult = batchedResults.poll()) != null) { | ||
topDocsStats.add(batchedResult.v1()); | ||
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList); | ||
addEstimateAndMaybeBreak(batchedResult.v2().estimatedSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That MergeResult estimation was ignored and lost. I followed it since the deserialization, and it wasn't used anywhere else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if we should add it to the breaker before consuming? or we only know the estimated size after consuming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That consumePartialMergeResult()
simply adds the agg to the aggsList param, nothing else. They're deserialized later here:
elasticsearch/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
Line 250 in 5a3c9e7
aggs = aggregate(buffer.iterator(), new Iterator<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, then it does not matter 👍
*/ | ||
private static long estimateRamBytesUsedForReduce(long size) { | ||
return Math.round(1.5d * size - size); | ||
return Math.round(1.5d * size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the javadoc for better understanding of this method. It's being used here:
elasticsearch/server/src/main/java/org/elasticsearch/action/search/QueryPhaseResultConsumer.java
Lines 245 to 264 in e798aa0
long breakerSize = circuitBreakerBytes; | |
final InternalAggregations aggs; | |
try { | |
if (aggsList != null) { | |
// Add an estimate of the final reduce size | |
breakerSize = addEstimateAndMaybeBreak(estimateRamBytesUsedForReduce(breakerSize)); | |
aggs = aggregate(buffer.iterator(), new Iterator<>() { | |
@Override | |
public boolean hasNext() { | |
return aggsList.isEmpty() == false; | |
} | |
@Override | |
public DelayableWriteable<InternalAggregations> next() { | |
return aggsList.pollFirst(); | |
} | |
}, | |
resultSize, | |
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction() | |
); |
(circuitBreakerBytes
is the total bytes added to the CB in the consumer; the batched results will be here)
It's interesting though that if we added the estimated MergeResult size before, we're adding it once again. I'll have to ensure the 1.5 thing wasn't actually "fixing" this problem here. It affects other cases however, so it makes things more complicated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: I'll review this again, as maybe it was considering the original estimated size to be already accounted, so the 0.5 extra would be right.
I'll check the codepaths leading to this to see what's happening exactly, and maybe undo this or change the javadoc if required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems right. But it's tricky! I'd give it a sleep and another look before merging.
Hi @ivancea, I've created a changelog YAML for you. |
💔 Backport failed
You can use sqren/backport to manually backport by running |
Fix an issue with aggs reduction phase where `MergeResult` estimations were ignored on batched results, leading to no CB changes before serialized aggs expansion
Removing backports for 9.0 and before, as this change was made in 9.1 in #121885 |
Fix an issue with aggs reduction phase where `MergeResult` estimations were ignored on batched results, leading to no CB changes before serialized aggs expansion
Fix an issue with aggs reduction phase where `MergeResult` estimations were ignored on batched results, leading to no CB changes before serialized aggs expansion
Fix an issue with aggs reduction phase where
MergeResult
estimations were ignored on batched results, leading to no CB changes before serialized aggs expansionNotes
This comes from an OOM investigation on aggs. Investigating the CB, checking the estimation code, I noticed the coordinator
QueryPhaseResultConsumer
circuitBreakerBytes
was at 0 before the actual reduction, apparently from those batchedMergeResults
being ignored.Adding them to the accounting here, which automatically applies them a 1.5x later for the serialized->deserialized memory difference