Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a8f9a4d
Fix inline stats after pruned aggregate by using `CopyingLocalSupplier`
kanoshiou Oct 13, 2025
c989c71
Update docs/changelog/136467.yaml
kanoshiou Oct 13, 2025
d28c47c
Merge branch 'refs/heads/main' into fix-inline-stats-pruned-aggregates
kanoshiou Oct 13, 2025
97a5900
Fix other scenarios
kanoshiou Oct 14, 2025
906435a
Merge branch 'refs/heads/main' into fix-inline-stats-pruned-aggregates
kanoshiou Oct 14, 2025
29c545e
Update docs/changelog/136467.yaml
kanoshiou Oct 14, 2025
58345d1
Update tests
kanoshiou Oct 14, 2025
31bcc3b
Update tests
kanoshiou Oct 14, 2025
40f93f9
Merge branch 'refs/heads/main' into fix-inline-stats-pruned-aggregates
kanoshiou Oct 15, 2025
712a32b
Optimize inline join and local relation handling
kanoshiou Oct 15, 2025
b045514
Merge branch 'refs/heads/main' into fix-inline-stats-pruned-aggregates
kanoshiou Oct 15, 2025
4cc2558
Using astefan's approach
kanoshiou Oct 16, 2025
b2a26da
Merge branch 'refs/heads/main' into fix-inline-stats-pruned-aggregates
kanoshiou Oct 16, 2025
6b55463
Remove tests
kanoshiou Oct 16, 2025
ab57a39
Update capability name
kanoshiou Oct 16, 2025
7259a5b
required_capability: fork_v9
kanoshiou Oct 17, 2025
ee62799
Merge branch 'refs/heads/main' into fix-inline-stats-pruned-aggregates
kanoshiou Oct 17, 2025
85f1808
capabilities clean up
astefan Oct 17, 2025
ae47b5c
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Oct 17, 2025
e5444f3
Move capability
astefan Oct 17, 2025
a52977e
Address reviews
astefan Oct 17, 2025
5ca4666
Merge branch 'main' of https://github.com/elastic/elasticsearch into …
astefan Oct 17, 2025
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136467.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136467
summary: "ESQL: Fix double release in inline stats when `LocalRelation` is reused"
area: ES|QL
type: bug
issues:
- 135679
Original file line number Diff line number Diff line change
Expand Up @@ -3990,3 +3990,184 @@ Canada |English |null |null
Mv-Land |Mv-Lang |null |null
Mv-Land2 |Mv-Lang2 |null |null
;

// PruneColumns
inlineStatsAfterPruningAggregate
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

row a = 1
| stats b = max(a)
| inline stats c = count(*)
| drop b
;

c:long
1
;

// PropagateEmptyRelation
inlineStatsAfterPruningAggregate2
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

row a = 12
| where false
| stats b = max(a)
| inline stats c = count(b)
;

b:integer | c:long
null | 0
;


// ReplaceStatsFilteredAggWithEval
inlineStatsAfterPruningAggregate3
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

row a= 12
| stats b = sum(a) where false
| inline stats c = max(b)
;

b:long | c:long
null | null
;


inlineStatsAfterPruningAggregate4
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

from k8s
| stats PRezTcny = median(network.cost), `network.cost` = count(network.cost)
| drop network.cost
| inline stats QaoRGYyf = count(*)
| drop `PRezTcny`
Comment on lines +4041 to +4044
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These queries are generated. This one in particular is relatively clean, but those below are randomised and those identifiers are eye pain (most `` quotations aren't needed either). Usually, they can also be shrunk down to a "minimal" query that reproduces the issue.
Ideally, we'd minimise the queries and use "human-friendly" identifiers, but at a minimum, maybe indicate their source by adding a comment with the originating issue.

;

QaoRGYyf:long
1
;


inlineStatsAfterPruningAggregate5
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate
required_capability: join_lookup_v12

from colors,airports_web,boo*
| rename scalerank as language_code
| lookup join languages_lookup on language_code
| keep `ratings`, `rgb_vector`
| mv_expand ratings
| keep `ratings`, `rgb_vector`
| inline stats rycaPYFzpAId = absent(ratings), WkdxfjwuR = count(ratings) + avg(ratings)
| rename ratings AS `WkdxfjwuR`
| inline stats WkdxfjwuR = sum(WkdxfjwuR) + median(WkdxfjwuR), sjivSBvV = count(*)
| sort sjivSBvV NULLS FIRST
| where false OR NOT true OR false OR NOT false
| where false OR false AND false AND NOT true AND false
| stats rycaPYFzpAId = median(sjivSBvV) + count(sjivSBvV) + sum(sjivSBvV), NUiLBJXQoMXd = present(sjivSBvV)
| mv_expand `NUiLBJXQoMXd`
| inline stats NUiLBJXQoMXd = count(rycaPYFzpAId)
;

rycaPYFzpAId:double | NUiLBJXQoMXd:long
null | 0
;


inlineStatsAfterPruningAggregate6
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate
required_capability: join_lookup_v12

from d*,*
| rename scalerank as other2
| rename author.keyword as name_str
| rename intersects as is_active_bool
| rename year as id_int
| rename status as other1
| lookup join multi_column_joinable_lookup on other2, name_str, is_active_bool, id_int, other1
| rename street AS `huZQQIKVli`, `network.eth0.currently_connected_clients` AS is_rehired
| eval GhMzAVXivNI = message.raw, VDJKNsopeKQq = substring(threat_level, 1, 3), xeUVJNra = length(huZQQIKVli), kOQjzWgZNhwI = false
| where false AND NOT false AND true AND NOT false
| CHANGE_POINT salary_change.int ON birth_date AS UpYfyPcYFkes, EwnNgKIrDEKS
| sort owner.name DESC NULLS LAST, message.raw, country.keyword DESC NULLS LAST, UpYfyPcYFkes ASC NULLS FIRST, name_str ASC NULLS FIRST, num, event_duration DESC NULLS LAST, extra2 DESC, languages.byte ASC NULLS FIRST, client.ip NULLS LAST, ip1, collection NULLS FIRST, city.country.name DESC, destination.IP DESC NULLS LAST, host.version DESC, hex_code ASC NULLS LAST, language.code ASC NULLS LAST, language.name ASC, details DESC, language_name DESC NULLS FIRST, event ASC, salary_change.keyword DESC NULLS FIRST, lk ASC
| stats CLnBmEGor = sum(height.double) + sum(height.double) + avg(height.double), JLSxMYld = count(extra2) + max(extra2) + avg(extra2), `network.total_bytes_out` = min(language_code_integer), `version` = values(env), contains = min(languages.short) + min(languages.short)
| mv_expand `CLnBmEGor`
| dissect version "%{JLSxMYld} %{LTMJrburyt}"
| enrich languages_policy on LTMJrburyt
| inline stats QyCBayGHtAF = count(CLnBmEGor), yUMkeLezSN = present(contains + network.total_bytes_out), `JLSxMYld` = max(contains), GYbfUvbf = count(*)
;

CLnBmEGor:double | network.total_bytes_out:integer | version:keyword | contains:integer | LTMJrburyt:keyword | language_name:keyword | QyCBayGHtAF:long | yUMkeLezSN:boolean | JLSxMYld:integer | GYbfUvbf:long
null | null | null | null | null | null | 0 | false | null | 1
;


inlineStatsAfterPruningAggregate7
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate
required_capability: join_lookup_v12

from app_log*
| FORK (where false OR NOT true) (where true) (where true) (where false OR true) (where false) (where true
| mv_expand @timestamp)
| WHERE _fork == "fork2"
| DROP _fork
| rename service_id as name_str
| lookup join multi_column_joinable_lookup on name_str
| inline stats `is_active_bool` = count(id_int) + max(id_int), siofAMlpoHvh = max(id_int) + max(id_int) + max(id_int), `message` = present(other1), `id_int` = max(id_int)
| stats `message` = count(is_active_bool), id_int2 = median(is_active_bool), `siofAMlpoHvh` = count(*), `id_int` = max(id_int)
| eval NUtENNcs = message, message = "zaYFcCTy", id_int2 = null, VizcTbjMnBlQ = siofAMlpoHvh, siofAMlpoHvh = siofAMlpoHvh + siofAMlpoHvh + siofAMlpoHvh, gPzLgKdLfZ = null, `message` = 1099438720, sdsrveQOoi = -2373351299779434224, hpxNweCm = false, PwyOmHforRKI = -2230544247357512169
| stats VizcTbjMnBlQ = count(*), siofAMlpoHvh2 = median(sdsrveQOoi) + max(sdsrveQOoi), DUuKdsGgU = avg(PwyOmHforRKI) + count(PwyOmHforRKI), `gPzLgKdLfZ` = present(sdsrveQOoi + VizcTbjMnBlQ + PwyOmHforRKI), `siofAMlpoHvh` = count(sdsrveQOoi) + max(sdsrveQOoi)
| limit 61
| inline stats gPzLgKdLfZ = sum(VizcTbjMnBlQ), `siofAMlpoHvh2` = count(*), zvgqSiVGqli = max(VizcTbjMnBlQ), WFjgBzYRXwa = median(VizcTbjMnBlQ) + count(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ)
| keep `VizcTbjMnBlQ`
| inline stats wXPJguff = median(VizcTbjMnBlQ) + avg(VizcTbjMnBlQ) + max(VizcTbjMnBlQ)
;

VizcTbjMnBlQ:long | wXPJguff:double
1 | 3.0
;


inlineStatsAfterPruningAggregate8
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

from dense_vector,airport_city_boundaries
| eval kMvNZVQSH = -4048759096317256968, GzAYNHaHuIS = 4262770411405329967, EwPpQFmOPoB = "lkdgnbTrBT", `abbrev` = "LiXI"
| keep `city`, `kMvNZVQSH`, GzAYNHaHuIS, city_boundary, EwPpQFmOPoB, `city_boundary`, `id`, kMvNZVQSH, id, airport
| limit 6888
| keep `kMvNZVQSH`, EwPpQFmOPoB, city, `id`, kMvNZVQSH
| stats `EwPpQFmOPoB2` = min(id), `EwPpQFmOPoB3` = count(*), kZiJPhNZ = count(id), ErGhdKhdv = count_distinct(city), EwPpQFmOPoB = min(id)
| eval ErGhdKhdv = null, fHSyxniFCS = -710891486, ErGhdKhdv = -1104488048, zdxdQdVIyFh = -1579062572, `kZiJPhNZ` = null, SzmOySyUh = null, eWVVRiwP = 1092261898855405071, nUxVhGhcpkiV = true, RvqGioBAAzYs = 169356242
| eval `kZiJPhNZ` = "gWJHDTcDMjRVSnlzSX", lxLgaagX = 4700531997851493340, `kZiJPhNZ` = false
| keep `zdxdQdVIyFh`
| inline stats `zdxdQdVIyFh` = count(*), WbWVJnYJ = count(*), zdxdQdVIyFh2 = max(zdxdQdVIyFh), uKjfeKTfaH = max(zdxdQdVIyFh), QpovVxjMWSjh = min(zdxdQdVIyFh)
;

zdxdQdVIyFh:long | WbWVJnYJ:long | zdxdQdVIyFh2:integer | uKjfeKTfaH:integer | QpovVxjMWSjh:integer
1 | 1 | -1579062572 | -1579062572 | -1579062572
;


inlineStatsAfterPruningAggregate9
required_capability: inline_stats
required_capability: inline_stats_after_pruning_aggregate

from employees
| stats m = count(emp_no)
| eval c = 0
| keep c
| inline stats c = count(*)
;

c:long
1
;
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,11 @@ public enum Cap {
*/
TS_PERMIT_TEXT_BECOMING_KEYWORD_WHEN_GROUPED_ON,

/**
* INLINE STATS support after aggregate pruning
*/
INLINE_STATS_AFTER_PRUNING_AGGREGATE,

/**
* Fix management of plans with no columns
* https://github.com/elastic/elasticsearch/issues/120272
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
Expand All @@ -43,7 +44,7 @@ protected LogicalPlan rule(UnaryPlan plan, LogicalOptimizerContext ctx) {
List<Block> emptyBlocks = aggsFromEmpty(ctx.foldCtx(), agg.aggregates());
p = replacePlanByRelation(
plan,
LocalSupplier.of(emptyBlocks.isEmpty() ? new Page(0) : new Page(emptyBlocks.toArray(Block[]::new)))
new CopyingLocalSupplier(emptyBlocks.isEmpty() ? new Page(0) : new Page(emptyBlocks.toArray(Block[]::new)))
);
} else {
p = PruneEmptyPlans.skipPlan(plan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.elasticsearch.xpack.esql.plan.logical.Sample;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.rule.Rule;

Expand Down Expand Up @@ -111,7 +111,7 @@ private static LogicalPlan pruneColumnsInAggregate(Aggregate aggregate, Attribut
p = new LocalRelation(
aggregate.source(),
List.of(Expressions.attribute(aggregate.aggregates().getFirst())),
LocalSupplier.of(new Page(BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1)))
new CopyingLocalSupplier(new Page(BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, null, 1)))
);
} else {
// Aggs cannot produce pages with 0 columns, so retain one grouping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.join.InlineJoin;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -137,6 +137,6 @@ private static LocalRelation localRelation(Source source, List<Alias> newEvals)
attributes.add(alias.toAttribute());
blocks[i] = BlockUtils.constantBlock(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, ((Literal) alias.child()).value(), 1);
}
return new LocalRelation(source, attributes, LocalSupplier.of(new Page(blocks)));
return new LocalRelation(source, attributes, new CopyingLocalSupplier(new Page(blocks)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.compute.aggregation.QuantileStates;
import org.elasticsearch.compute.data.ConstantNullBlock;
import org.elasticsearch.compute.data.IntArrayBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.test.TestBlockFactory;
import org.elasticsearch.core.Nullable;
Expand Down Expand Up @@ -134,6 +136,7 @@
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.EmptyLocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
Expand Down Expand Up @@ -2628,6 +2631,91 @@ public void testEvalAfterStats() {
assertThat(Expressions.names(eval.output()), contains("x"));
}

/**
* Test for <code>PruneColumns</code>
* <pre>{@code
* EsqlProject[[c{r}#8]]
* \_Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[],[]]
* |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
* \_Aggregate[[],[COUNT(*[KEYWORD],true[BOOLEAN]) AS c#8]]
* \_StubRelation[[b{r}#6]]
* }</pre>
*/
public void testInlineStatsAfterPruningAggregate() {
var plan = optimizedPlan("""
row a = 1
| stats b = max(a)
| inline stats c = count(*)
| drop b
""");
var project = as(plan, Project.class);
var limit = asLimit(project.child(), 1000, false);
var inlineJoin = as(limit.child(), InlineJoin.class);
var localRelation = as(inlineJoin.left(), LocalRelation.class);
var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class);
Page page = copyingLocalSupplier.get();
assertEquals(1, page.getBlockCount());
as(page.getBlock(0), ConstantNullBlock.class);
var right = as(inlineJoin.right(), Aggregate.class);
var StubRelation = as(right.child(), StubRelation.class);
}

/**
* Test for <code>PropagateEmptyRelation</code>
* <pre>{@code
* Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[],[]]
* |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
* \_Aggregate[[],[COUNT(b{r}#6,true[BOOLEAN]) AS c#9]]
* \_StubRelation[[b{r}#6]]
* }</pre>
*/
public void testInlineStatsAfterPruningAggregate2() {
var plan = optimizedPlan("""
row a = 12
| where false
| stats b = max(a)
| inline stats c = count(b)
""");
var limit = asLimit(plan, 1000, false);
var inlineJoin = as(limit.child(), InlineJoin.class);
var localRelation = as(inlineJoin.left(), LocalRelation.class);
var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class);
Page page = copyingLocalSupplier.get();
assertEquals(1, page.getBlockCount());
as(page.getBlock(0), IntArrayBlock.class);
var right = as(inlineJoin.right(), Aggregate.class);
var stubRelation = as(right.child(), StubRelation.class);
}

/**
* Test for <code>ReplaceStatsFilteredAggWithEval</code>
* <pre>{@code
* Limit[1000[INTEGER],false]
* \_InlineJoin[LEFT,[],[]]
* |_LocalRelation[[b{r}#6],org.elasticsearch.xpack.esql.plan.logical.local.CopyingLocalSupplier@3fe]
* \_Aggregate[[],[MAX(b{r}#6,true[BOOLEAN]) AS c#9]]
* \_StubRelation[[b{r}#6]]
* }</pre>
*/
public void testInlineStatsAfterPruningAggregate3() {
var plan = optimizedPlan("""
row a= 12
| stats b = sum(a) where false
| inline stats c = max(b)
""");
var limit = asLimit(plan, 1000, false);
var inlineJoin = as(limit.child(), InlineJoin.class);
var localRelation = as(inlineJoin.left(), LocalRelation.class);
var copyingLocalSupplier = as(localRelation.supplier(), CopyingLocalSupplier.class);
Page page = copyingLocalSupplier.get();
assertEquals(1, page.getBlockCount());
as(page.getBlock(0), ConstantNullBlock.class);
var right = as(inlineJoin.right(), Aggregate.class);
var stubRelation = as(right.child(), StubRelation.class);
}

/**
* Expects
* <pre>{@code
Expand Down