Skip to content

Commit 0064bc5

Browse files
committed
addressed most comments minus tests
1 parent ae16694 commit 0064bc5

File tree

11 files changed

+83
-92
lines changed

11 files changed

+83
-92
lines changed

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ public enum DataType {
309309
*/
310310
SEMANTIC_TEXT(builder().esType("semantic_text").unknownSize()),
311311

312-
AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").unknownSize());
312+
AGGREGATE_METRIC_DOUBLE(builder().esType("aggregate_metric_double").estimatedSize(Double.BYTES * 3 + Integer.BYTES));
313313

314314
/**
315315
* Types that are actively being built. These types are not returned

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/AggregateDoubleMetricBlockBuilder.java

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,36 @@
77

88
package org.elasticsearch.compute.data;
99

10+
import org.elasticsearch.common.breaker.CircuitBreakingException;
11+
import org.elasticsearch.core.Releasables;
1012
import org.elasticsearch.index.mapper.BlockLoader;
1113

1214
public class AggregateDoubleMetricBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateDoubleMetricBuilder {
1315

14-
private final DoubleBlockBuilder minBuilder;
15-
private final DoubleBlockBuilder maxBuilder;
16-
private final DoubleBlockBuilder sumBuilder;
17-
private final IntBlockBuilder countBuilder;
16+
private DoubleBlockBuilder minBuilder;
17+
private DoubleBlockBuilder maxBuilder;
18+
private DoubleBlockBuilder sumBuilder;
19+
private IntBlockBuilder countBuilder;
1820

1921
public AggregateDoubleMetricBlockBuilder(int estimatedSize, BlockFactory blockFactory) {
2022
super(blockFactory);
21-
minBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
22-
maxBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
23-
sumBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
24-
countBuilder = new IntBlockBuilder(estimatedSize, blockFactory);
23+
minBuilder = null;
24+
maxBuilder = null;
25+
sumBuilder = null;
26+
countBuilder = null;
27+
try {
28+
minBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
29+
maxBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
30+
sumBuilder = new DoubleBlockBuilder(estimatedSize, blockFactory);
31+
countBuilder = new IntBlockBuilder(estimatedSize, blockFactory);
32+
} finally {
33+
if (countBuilder == null) {
34+
Releasables.closeWhileHandlingException(minBuilder);
35+
Releasables.closeWhileHandlingException(maxBuilder);
36+
Releasables.closeWhileHandlingException(sumBuilder);
37+
Releasables.closeWhileHandlingException(countBuilder);
38+
}
39+
}
2540
}
2641

2742
@Override
@@ -45,10 +60,10 @@ protected int elementSize() {
4560
@Override
4661
public Block.Builder copyFrom(Block block, int beginInclusive, int endExclusive) {
4762
CompositeBlock composite = (CompositeBlock) block;
48-
minBuilder.copyFrom(composite.getBlock(Metric.MIN.ordinal()), beginInclusive, endExclusive);
49-
maxBuilder.copyFrom(composite.getBlock(Metric.MAX.ordinal()), beginInclusive, endExclusive);
50-
sumBuilder.copyFrom(composite.getBlock(Metric.SUM.ordinal()), beginInclusive, endExclusive);
51-
countBuilder.copyFrom(composite.getBlock(Metric.COUNT.ordinal()), beginInclusive, endExclusive);
63+
minBuilder.copyFrom(composite.getBlock(Metric.MIN.getIndex()), beginInclusive, endExclusive);
64+
maxBuilder.copyFrom(composite.getBlock(Metric.MAX.getIndex()), beginInclusive, endExclusive);
65+
sumBuilder.copyFrom(composite.getBlock(Metric.SUM.getIndex()), beginInclusive, endExclusive);
66+
countBuilder.copyFrom(composite.getBlock(Metric.COUNT.getIndex()), beginInclusive, endExclusive);
5267
return this;
5368
}
5469

@@ -64,11 +79,19 @@ public Block.Builder mvOrdering(Block.MvOrdering mvOrdering) {
6479
@Override
6580
public Block build() {
6681
Block[] blocks = new Block[4];
67-
blocks[Metric.MIN.ordinal()] = minBuilder.build();
68-
blocks[Metric.MAX.ordinal()] = maxBuilder.build();
69-
blocks[Metric.SUM.ordinal()] = sumBuilder.build();
70-
blocks[Metric.COUNT.ordinal()] = countBuilder.build();
71-
return new CompositeBlock(blocks);
82+
try {
83+
finish();
84+
blocks[Metric.MIN.getIndex()] = minBuilder.build();
85+
blocks[Metric.MAX.getIndex()] = maxBuilder.build();
86+
blocks[Metric.SUM.getIndex()] = sumBuilder.build();
87+
blocks[Metric.COUNT.getIndex()] = countBuilder.build();
88+
return new CompositeBlock(blocks);
89+
} catch (CircuitBreakingException e) {
90+
for (Block block : blocks) {
91+
block.close();
92+
}
93+
throw e;
94+
}
7295
}
7396

7497
@Override
@@ -81,9 +104,19 @@ public BlockLoader.AggregateDoubleMetricBuilder append(double min, double max, d
81104
}
82105

83106
public enum Metric {
84-
MIN,
85-
MAX,
86-
SUM,
87-
COUNT;
107+
MIN(0),
108+
MAX(1),
109+
SUM(2),
110+
COUNT(3);
111+
112+
private final int index;
113+
114+
Metric(int index) {
115+
this.index = index;
116+
}
117+
118+
public int getIndex() {
119+
return index;
120+
}
88121
}
89122
}

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/CompositeBlock.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,29 +86,21 @@ public int getPositionCount() {
8686

8787
@Override
8888
public int getTotalValueCount() {
89-
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
90-
// So check just for one sub block is sufficient.
9189
return blocks[0].getTotalValueCount();
9290
}
9391

9492
@Override
9593
public int getFirstValueIndex(int position) {
96-
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
97-
// So check just for one sub block is sufficient.
9894
return blocks[0].getFirstValueIndex(position);
9995
}
10096

10197
@Override
10298
public int getValueCount(int position) {
103-
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
104-
// So check just for one sub block is sufficient.
10599
return blocks[0].getValueCount(position);
106100
}
107101

108102
@Override
109103
public boolean isNull(int position) {
110-
// TODO: this works for aggregate metric double fields, because the four blocks are guarenteed to have the same length / positions.
111-
// So check just for one sub block is sufficient.
112104
return blocks[0].isNull(position);
113105
}
114106

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import static java.util.Collections.emptyList;
3636
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
3737
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
38-
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
3938

4039
public class Count extends AggregateFunction implements ToAggregator, SurrogateExpression {
4140
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Count", Count::new);
@@ -146,14 +145,7 @@ public Expression surrogate() {
146145
var s = source();
147146
var field = field();
148147
if (field.dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
149-
return new Sum(
150-
s,
151-
new FromAggregateDoubleMetric(
152-
source(),
153-
field,
154-
new Literal(s, AggregateDoubleMetricBlockBuilder.Metric.COUNT.ordinal(), INTEGER)
155-
)
156-
);
148+
return new Sum(s, new FromAggregateDoubleMetric(source(), field, AggregateDoubleMetricBlockBuilder.Metric.COUNT));
157149
}
158150

159151
if (field.foldable()) {

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,7 @@ public final AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {
153153
@Override
154154
public Expression surrogate() {
155155
if (field().dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
156-
return new Max(
157-
source(),
158-
new FromAggregateDoubleMetric(
159-
source(),
160-
field(),
161-
new Literal(source(), AggregateDoubleMetricBlockBuilder.Metric.MAX.ordinal(), DataType.INTEGER)
162-
)
163-
);
156+
return new Max(source(), new FromAggregateDoubleMetric(source(), field(), AggregateDoubleMetricBlockBuilder.Metric.MAX));
164157
}
165158
return field().foldable() ? new MvMax(source(), field()) : null;
166159
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Min.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,14 +153,7 @@ public final AggregatorFunctionSupplier supplier(List<Integer> inputChannels) {
153153
@Override
154154
public Expression surrogate() {
155155
if (field().dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
156-
return new Min(
157-
source(),
158-
new FromAggregateDoubleMetric(
159-
source(),
160-
field(),
161-
new Literal(source(), AggregateDoubleMetricBlockBuilder.Metric.MIN.ordinal(), DataType.INTEGER)
162-
)
163-
);
156+
return new Min(source(), new FromAggregateDoubleMetric(source(), field(), AggregateDoubleMetricBlockBuilder.Metric.MIN));
164157
}
165158
return field().foldable() ? new MvMin(source(), field()) : null;
166159
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
3737
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
3838
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
39-
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
4039
import static org.elasticsearch.xpack.esql.core.type.DataType.LONG;
4140
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
4241

@@ -139,14 +138,7 @@ public Expression surrogate() {
139138
var s = source();
140139
var field = field();
141140
if (field.dataType() == AGGREGATE_METRIC_DOUBLE) {
142-
return new Sum(
143-
s,
144-
new FromAggregateDoubleMetric(
145-
source(),
146-
field,
147-
new Literal(s, AggregateDoubleMetricBlockBuilder.Metric.SUM.ordinal(), INTEGER)
148-
)
149-
);
141+
return new Sum(s, new FromAggregateDoubleMetric(source(), field, AggregateDoubleMetricBlockBuilder.Metric.SUM));
150142
}
151143

152144
// SUM(const) is equivalent to MV_SUM(const)*COUNT(*).

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/convert/FromAggregateDoubleMetric.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.xpack.esql.core.expression.Expression;
2121
import org.elasticsearch.xpack.esql.core.expression.Expressions;
2222
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
23+
import org.elasticsearch.xpack.esql.core.expression.Literal;
2324
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
2425
import org.elasticsearch.xpack.esql.core.tree.Source;
2526
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -61,6 +62,10 @@ public FromAggregateDoubleMetric(
6162
this.subfieldIndex = subfieldIndex;
6263
}
6364

65+
public FromAggregateDoubleMetric(Source source, Expression field, AggregateDoubleMetricBlockBuilder.Metric metric) {
66+
this(source, field, new Literal(source, metric.getIndex(), INTEGER));
67+
}
68+
6469
private FromAggregateDoubleMetric(StreamInput in) throws IOException {
6570
this(Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class));
6671
}
@@ -84,7 +89,7 @@ public DataType dataType() {
8489
}
8590

8691
var subfield = ((Number) subfieldIndex.fold(FoldContext.small())).intValue();
87-
if (subfield == AggregateDoubleMetricBlockBuilder.Metric.COUNT.ordinal()) {
92+
if (subfield == AggregateDoubleMetricBlockBuilder.Metric.COUNT.getIndex()) {
8893
return INTEGER;
8994
}
9095
return DOUBLE;
@@ -121,13 +126,10 @@ public EvalOperator.ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvalua
121126
return new EvalOperator.ExpressionEvaluator() {
122127
@Override
123128
public Block eval(Page page) {
124-
CompositeBlock compositeBlock = (CompositeBlock) eval.eval(page);
125-
try {
129+
try (CompositeBlock compositeBlock = (CompositeBlock) eval.eval(page)) {
126130
Block block = compositeBlock.getBlock(((Number) subfieldIndex.fold(FoldContext.small())).intValue());
127131
block.incRef();
128132
return block;
129-
} finally {
130-
compositeBlock.close();
131133
}
132134
}
133135

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AggregateMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private static Stream<AggDef> groupingAndNonGrouping(Tuple<Class<?>, Tuple<Strin
213213
// rate doesn't support non-grouping aggregations
214214
return Stream.of(new AggDef(tuple.v1(), tuple.v2().v1(), tuple.v2().v2(), true));
215215
} else if (tuple.v2().v1().equals("AggregatedMetricDouble")) {
216-
// not supporting grouping aggregations yet
216+
// TODO: support grouping aggregations for aggregate metric double
217217
return Stream.of(new AggDef(tuple.v1(), tuple.v2().v1(), tuple.v2().v2(), false));
218218
} else {
219219
return Stream.of(
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
2+
3+
public class FromAggregateDoubleMetricTests {
4+
}

0 commit comments

Comments
 (0)