Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
public abstract class Node<T extends Node<T>> implements NamedWriteable {
private static final int TO_STRING_MAX_PROP = 10;
private static final int TO_STRING_MAX_WIDTH = 110;
public static final int TO_STRING_MAX_WIDTH = 110;

private final Source source;
private final List<T> children;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,8 @@ emp_no:integer | job_positions:keyword | _fork:keyword
10087 | Junior Developer | fork2
;

forkBeforeInlineStats
forkBeforeInlineStats-Ignore
// temporarily forbid LIMIT before INLINE STATS
required_capability: fork_v9
required_capability: inline_stats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ emp_no:integer |avg_worked_seconds:long|avg_avg_worked_seconds:double|languages:
10023 |330870342 |3.181719481E8 |null |5 |F
;

three
three-Ignore
// temporarily forbid LIMIT before INLINE STATS
required_capability: inline_stats

FROM employees
Expand Down Expand Up @@ -2567,7 +2568,8 @@ emp_no:integer | gender:keyword | languages:integer | greater_than:double | less
10010 | null | 4 | 6.94921 | 7.12723 | 27921.22074 | 10
;

twoConsecutiveInlinestatsWithFalseFilters
twoConsecutiveInlinestatsWithFalseFilters-Ignore
// temporarily forbid LIMIT before INLINE STATS
required_capability: inline_stats
from employees
| keep emp_no
Expand Down Expand Up @@ -2946,7 +2948,8 @@ mc:l | count:l | event_duration:l
6 |2 |3450233
;

multiIndexIpStringInlinestats_Inline4
multiIndexIpStringInlinestats_Inline4-Ignore
// temporarily forbid LIMIT before INLINE STATS
required_capability: inline_stats

FROM sample_data, sample_data_str
Expand Down Expand Up @@ -4105,7 +4108,8 @@ null | null | null | null
;


inlineStatsAfterPruningAggregate7
inlineStatsAfterPruningAggregate7-Ignore
// temporarily forbid LIMIT before INLINE STATS
// https://github.com/elastic/elasticsearch/issues/135679
required_capability: inline_stats_double_release_fix
required_capability: join_lookup_v12
Expand All @@ -4132,8 +4136,9 @@ VizcTbjMnBlQ:long | wXPJguff:double
;


inlineStatsAfterPruningAggregate8
inlineStatsAfterPruningAggregate8-Ignore
// https://github.com/elastic/elasticsearch/issues/135679
// temporarily forbid LIMIT before INLINE STATS
required_capability: inline_stats_double_release_fix

from dense_vector,airport_city_boundaries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1535,7 +1535,9 @@ public enum Cap {
* Support for pushing down EVAL with SCORE
* https://github.com/elastic/elasticsearch/issues/133462
*/
PUSHING_DOWN_EVAL_WITH_SCORE
PUSHING_DOWN_EVAL_WITH_SCORE,

FORBID_LIMIT_BEFORE_INLINE_STATS(INLINE_STATS.enabled)

;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
import org.elasticsearch.xpack.esql.core.expression.function.Function;
import org.elasticsearch.xpack.esql.core.expression.predicate.BinaryOperator;
import org.elasticsearch.xpack.esql.core.expression.predicate.operator.comparison.BinaryComparison;
import org.elasticsearch.xpack.esql.core.tree.Location;
import org.elasticsearch.xpack.esql.core.tree.Node;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute;
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Neg;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Equals;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.EsqlBinaryComparison;
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.NotEquals;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.InlineStats;
import org.elasticsearch.xpack.esql.plan.logical.Insist;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
import org.elasticsearch.xpack.esql.plan.logical.Project;
Expand Down Expand Up @@ -108,6 +112,7 @@ Collection<Failure> verify(LogicalPlan plan, BitSet partialMetrics) {
checkOperationsOnUnsignedLong(p, failures);
checkBinaryComparison(p, failures);
checkInsist(p, failures);
checkLimitBeforeInlineStats(p, failures);
});

if (failures.hasFailures() == false) {
Expand Down Expand Up @@ -256,6 +261,46 @@ private static void checkInsist(LogicalPlan p, Failures failures) {
}
}

/*
* This is a rudimentary check to prevent INLINE STATS after LIMIT. A LIMIT command can be added by other commands by default,
* the best example being FORK. A more robust solution would be to track the commands that add LIMIT and prevent them from doing so
* if INLINE STATS is present in the plan. However, this would require authors of new such commands to be aware of this limitation and
* implement the necessary checks, which is error-prone.
*/
private static void checkLimitBeforeInlineStats(LogicalPlan plan, Failures failures) {
if (plan instanceof InlineStats is) {
Holder<Limit> inlineStatsDescendantLimit = new Holder<>();
is.forEachDownMayReturnEarly((p, breakEarly) -> {
if (p instanceof Limit l) {
inlineStatsDescendantLimit.set(l);
breakEarly.set(true);
return;
}
});

var firstLimit = inlineStatsDescendantLimit.get();
if (firstLimit != null) {
Location l = firstLimit.source().source();
var limitLocation = "line " + l.getLineNumber() + ":" + l.getColumnNumber();
var isString = is.sourceText().length() > Node.TO_STRING_MAX_WIDTH
? is.sourceText().substring(0, Node.TO_STRING_MAX_WIDTH) + "..."
: is.sourceText();
var limitString = firstLimit.sourceText().length() > Node.TO_STRING_MAX_WIDTH
? firstLimit.sourceText().substring(0, Node.TO_STRING_MAX_WIDTH) + "..."
: firstLimit.sourceText();
failures.add(
fail(
is,
"INLINE STATS cannot be used after LIMIT command, but was [{}] after [{}] at [{}]",
isString,
limitString,
limitLocation
)
);
}
}
}

private void licenseCheck(LogicalPlan plan, Failures failures) {
Consumer<Node<?>> licenseCheck = n -> {
if (n instanceof LicenseAware la && la.licenseCheck(licenseState) == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static org.elasticsearch.xpack.esql.core.type.DataType.LONG;
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
import static org.elasticsearch.xpack.esql.core.type.DataType.VERSION;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
Expand Down Expand Up @@ -2813,6 +2814,130 @@ STATS max(max_over_time(network.connections)) BY host, time_bucket = bucket(@tim
can only be used after STATS when used with TS command"""));
}

public void testLimitBeforeInlineStats_WithTS() {
assumeTrue("LIMIT before INLINE STATS limitation check", EsqlCapabilities.Cap.FORBID_LIMIT_BEFORE_INLINE_STATS.isEnabled());
assertThat(
error("TS test | STATS m=max(languages) BY s=salary/10000 | LIMIT 5 | INLINE STATS max(s) BY m"),
containsString(
"1:64: INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max(s) BY m] after [LIMIT 5] at [line 1:54]"
)
);
}

public void testLimitBeforeInlineStats_WithFork() {
assumeTrue("LIMIT before INLINE STATS limitation check", EsqlCapabilities.Cap.FORBID_LIMIT_BEFORE_INLINE_STATS.isEnabled());
assertThat(
error(
"FROM test\n"
+ "| WHERE emp_no == 10048 OR emp_no == 10081\n"
+ "| FORK (EVAL a = CONCAT(first_name, \" \", emp_no::keyword, \" \", last_name)\n"
+ " | GROK a \"%{WORD:x} %{WORD:y} %{WORD:z}\" )\n"
+ " (EVAL b = CONCAT(last_name, \" \", emp_no::keyword, \" \", first_name)\n"
+ " | GROK b \"%{WORD:x} %{WORD:y} %{WORD:z}\" )\n"
+ "| SORT _fork, emp_no"
+ "| INLINE STATS max_lang = MAX(languages) BY gender"
),
containsString(
"7:23: INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max_lang = MAX(languages) BY gender] "
+ "after [(EVAL a = CONCAT(first_name, \" \", emp_no::keyword, \" \", last_name)\n"
+ " | GROK a \"%{WORD:x} %{WORD:y} %{WOR...] at [line 3:8]"
)
);

assertThat(
error(
"FROM employees\n"
+ "| KEEP emp_no, languages, gender\n"
+ "| FORK (WHERE emp_no == 10048 OR emp_no == 10081)\n"
+ " (WHERE emp_no == 10081 OR emp_no == 10087)\n"
+ "| LIMIT 5"
+ "| INLINE STATS max_lang = MAX(languages) BY gender \n"
+ "| SORT emp_no, gender, _fork\n"
),
containsString(
"5:12: INLINE STATS cannot be used after LIMIT command, "
+ "but was [INLINE STATS max_lang = MAX(languages) BY gender] after [LIMIT 5] at [line 5:3]"
)
);

assertThat(
error(
"FROM employees\n"
+ "| KEEP emp_no, languages, gender\n"
+ "| FORK (WHERE emp_no == 10048 OR emp_no == 10081)\n"
+ " (WHERE emp_no == 10081 OR emp_no == 10087)\n"
+ "| INLINE STATS max_lang = MAX(languages) BY gender \n"
+ "| SORT emp_no, gender, _fork\n"
+ "| LIMIT 5"
),
containsString(
"5:3: INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max_lang = MAX(languages) BY gender] "
+ "after [(WHERE emp_no == 10048 OR emp_no == 10081)\n"
+ " (WHERE emp_no == 10081 OR emp_no == 10087)] at [line 3:8]"
)
);
}

public void testLimitBeforeInlineStats_WithFrom_And_Row() {
assumeTrue("LIMIT before INLINE STATS limitation check", EsqlCapabilities.Cap.FORBID_LIMIT_BEFORE_INLINE_STATS.isEnabled());
var sourceCommands = new String[] { "FROM test | ", "ROW salary=1,gender=\"M\",languages=1 | " };

assertThat(
error(randomFrom(sourceCommands) + "LIMIT 5 | INLINE STATS max(salary) BY gender"),
containsString(
"INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max(salary) BY gender] after [LIMIT 5] at [line "
)
);

assertThat(
error(randomFrom(sourceCommands) + "SORT languages | LIMIT 5 | INLINE STATS max(salary) BY gender"),
containsString(
"INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max(salary) BY gender] after [LIMIT 5] at [line "
)
);

assertThat(
error(randomFrom(sourceCommands) + "INLINE STATS avg(salary) | LIMIT 5 | INLINE STATS max(salary) BY gender"),
containsString(
"INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max(salary) BY gender] after [LIMIT 5] at [line "
)
);

assertThat(
error(
randomFrom(sourceCommands) + "LIMIT 1 | LIMIT 2 | INLINE STATS avg(salary) | LIMIT 5 | INLINE STATS max(salary) BY gender"
),
allOf(
containsString(
"INLINE STATS cannot be used after LIMIT command, "
+ "but was [INLINE STATS max(salary) BY gender] after [LIMIT 5] at [line "
),
containsString(
"INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS avg(salary)] after [LIMIT 2] at [line "
)
)
);

assertThat(
error(randomFrom(sourceCommands) + """
EVAL x = 1
| LIMIT 1
| INLINE STATS avg(salary)
| STATS m=max(languages) BY s=salary/10000
| LIMIT 5
| INLINE STATS max(s) BY m
"""),
allOf(
containsString(
"INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS max(s) BY m] after [LIMIT 5] at [line "
),
containsString(
"INLINE STATS cannot be used after LIMIT command, but was [INLINE STATS avg(salary)] after [LIMIT 1] at [line "
)
)
);
}

private void checkVectorFunctionsNullArgs(String functionInvocation) throws Exception {
query("from test | eval similarity = " + functionInvocation, fullTextAnalyzer);
}
Expand Down