Skip to content

Commit e8ffd92

Browse files
committed
ESQL: Push down filter passed lookup join
Improve the planner to detect filters that can be pushed down 'through' a LOOKUP JOIN by determining the conditions scoped to the left/main side and moving them closer to the source. Relates #118305
1 parent 47be542 commit e8ffd92

File tree

2 files changed

+78
-2
lines changed

2 files changed

+78
-2
lines changed

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/PushDownAndCombineFilters.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.xpack.esql.core.expression.Expressions;
1616
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
1717
import org.elasticsearch.xpack.esql.core.expression.predicate.Predicates;
18+
import org.elasticsearch.xpack.esql.core.util.CollectionUtils;
1819
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
1920
import org.elasticsearch.xpack.esql.plan.logical.Eval;
2021
import org.elasticsearch.xpack.esql.plan.logical.Filter;
@@ -23,6 +24,8 @@
2324
import org.elasticsearch.xpack.esql.plan.logical.Project;
2425
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
2526
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
27+
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
28+
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
2629

2730
import java.util.ArrayList;
2831
import java.util.List;
@@ -76,11 +79,66 @@ protected LogicalPlan rule(Filter filter) {
7679
} else if (child instanceof OrderBy orderBy) {
7780
// swap the filter with its child
7881
plan = orderBy.replaceChild(filter.with(orderBy.child(), condition));
82+
} else if (child instanceof Join join) {
83+
return pushDownPastJoin(filter, join);
7984
}
8085
// cannot push past a Limit, this could change the tailing result set returned
8186
return plan;
8287
}
8388

89+
private record ScopedFilter(List<Expression> leftFilters, List<Expression> rightFilters, List<Expression> commonFilters) {}
90+
91+
// split the filter condition in 3 parts:
92+
// 1. filter scoped to the left
93+
// 2. filter scoped to the right
94+
// 3. filter that requires both sides to be evaluated
95+
private static ScopedFilter scopeFilter(List<Expression> filters, LogicalPlan left, LogicalPlan right) {
96+
List<Expression> rest = new ArrayList<>(filters);
97+
List<Expression> leftFilters = new ArrayList<>();
98+
List<Expression> rightFilters = new ArrayList<>();
99+
100+
AttributeSet leftOutput = left.outputSet();
101+
AttributeSet rightOutput = right.outputSet();
102+
103+
// first remove things that left scoped only
104+
rest.removeIf(f -> f.references().subsetOf(leftOutput) && leftFilters.add(f));
105+
// followed by right scoped only
106+
rest.removeIf(f -> f.references().subsetOf(rightOutput) && rightFilters.add(f));
107+
return new ScopedFilter(leftFilters, rightFilters, rest);
108+
}
109+
110+
private static LogicalPlan pushDownPastJoin(Filter filter, Join join) {
111+
LogicalPlan plan = filter;
112+
// pushdown only through LEFT joins
113+
// TODO: generalize this for other join types
114+
if (join.config().type() == JoinTypes.LEFT) {
115+
LogicalPlan left = join.left();
116+
LogicalPlan right = join.right();
117+
118+
// split the filter condition in 3 parts:
119+
// 1. filter scoped to the left
120+
// 2. filter scoped to the right
121+
// 3. filter that requires both sides to be evaluated
122+
ScopedFilter scoped = scopeFilter(Predicates.splitAnd(filter.condition()), left, right);
123+
// push the left scoped filter down to the left child, keep the rest intact
124+
if (scoped.leftFilters.size() > 0) {
125+
// push the filter down to the left child
126+
left = new Filter(left.source(), left, Predicates.combineAnd(scoped.leftFilters));
127+
// update the join with the new left child
128+
join = (Join) join.replaceLeft(left);
129+
130+
// keep the remaining filters in place, otherwise return the new join
131+
if (scoped.commonFilters.size() > 0 || scoped.rightFilters.size() > 0) {
132+
plan = filter.with(join, Predicates.combineAnd(CollectionUtils.combine(scoped.commonFilters, scoped.rightFilters)));
133+
} else {
134+
plan = join;
135+
}
136+
}
137+
}
138+
// ignore the rest of the join
139+
return plan;
140+
}
141+
84142
private static Function<Expression, Expression> NO_OP = expression -> expression;
85143

86144
private static LogicalPlan maybePushDownPastUnary(

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizerTests.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5695,7 +5695,7 @@ public void testLookupSimple() {
56955695
String query = """
56965696
FROM test
56975697
| RENAME languages AS int
5698-
| LOOKUP int_number_names ON int""";
5698+
| LOOKUP_🐔 int_number_names ON int""";
56995699
if (Build.current().isSnapshot() == false) {
57005700
var e = expectThrows(ParsingException.class, () -> analyze(query));
57015701
assertThat(e.getMessage(), containsString("line 3:3: mismatched input 'LOOKUP' expecting {"));
@@ -5775,7 +5775,7 @@ public void testLookupStats() {
57755775
String query = """
57765776
FROM test
57775777
| RENAME languages AS int
5778-
| LOOKUP int_number_names ON int
5778+
| LOOKUP_🐔 int_number_names ON int
57795779
| STATS MIN(emp_no) BY name""";
57805780
if (Build.current().isSnapshot() == false) {
57815781
var e = expectThrows(ParsingException.class, () -> analyze(query));
@@ -5844,6 +5844,24 @@ public void testLookupStats() {
58445844
);
58455845
}
58465846

5847+
//
5848+
// Lookup JOIN
5849+
//
5850+
public void testLookupJoinPushDown() {
5851+
String query = """
5852+
FROM test
5853+
| RENAME languages AS int
5854+
| LOOKUP JOIN int_number_names ON int
5855+
| WHERE int > 10
5856+
""";
5857+
var plan = optimizedPlan(query);
5858+
System.out.println(plan);
5859+
}
5860+
5861+
//
5862+
//
5863+
//
5864+
58475865
public void testTranslateMetricsWithoutGrouping() {
58485866
assumeTrue("requires snapshot builds", Build.current().isSnapshot());
58495867
var query = "METRICS k8s max(rate(network.total_bytes_in))";

0 commit comments

Comments
 (0)