Skip to content

Commit 25b5f7e

Browse files
committed
Pack dimension values in time-series aggregation
1 parent 7339800 commit 25b5f7e

File tree

16 files changed

+1144
-253
lines changed

16 files changed

+1144
-253
lines changed

x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/data/ConstantBytesRefVector.java

Lines changed: 5 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/X-ConstantVector.java.st

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,15 @@ $endif$
4040

4141
@Override
4242
$if(BytesRef)$
43-
public BytesRef getBytesRef(int position, BytesRef ignore) {
43+
public BytesRef getBytesRef(int position, BytesRef scratch) {
44+
scratch.bytes = value.bytes;
45+
scratch.offset = value.offset;
46+
scratch.length = value.length;
47+
return scratch;
4448
$else$
4549
public $type$ get$Type$(int position) {
46-
$endif$
4750
return value;
51+
$endif$
4852
}
4953

5054
@Override

x-pack/plugin/esql/qa/testFixtures/src/main/resources/data/k8s.csv

Lines changed: 201 additions & 201 deletions
Large diffs are not rendered by default.

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-mappings.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
}
1919
}
2020
},
21+
"region": {
22+
"type": "keyword",
23+
"time_series_dimension": true
24+
},
2125
"event": {
2226
"type": "keyword"
2327
},

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-avg-over-time.csv-spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ events:double | pod:keyword | time_bucket:datetime
185185
15.746543778801843 | one | 2024-05-10T00:00:00.000Z
186186
15.396284829721363 | three | 2024-05-10T00:00:00.000Z
187187
15.045454545454547 | three | 2024-05-10T00:10:00.000Z
188-
14.199942045783832 | two | 2024-05-10T00:00:00.000Z
188+
14.19994204578383 | two | 2024-05-10T00:00:00.000Z
189189
11.5 | two | 2024-05-10T00:20:00.000Z
190190
;
191191

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-irate.csv-spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ TS k8s
5151

5252
irate_cost:double | time_bucket:datetime
5353
null | 2024-05-10T00:01:00.000Z
54-
7.836832264957264 | 2024-05-10T00:09:00.000Z
54+
7.836832264957265 | 2024-05-10T00:09:00.000Z
5555
3.590324074074074 | 2024-05-10T00:17:00.000Z
5656
2.6708333333333334 | 2024-05-10T00:02:00.000Z
5757
2.2916666666666665 | 2024-05-10T00:08:00.000Z

x-pack/plugin/esql/qa/testFixtures/src/main/resources/k8s-timeseries-rate.csv-spec

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,3 +223,22 @@ TS k8s
223223

224224
;
225225

226+
sum_rate_per_region
227+
required_capability: ts_command_v0
228+
required_capability: pack_dimensions_in_ts
229+
TS k8s
230+
| STATS rate_bytes_in=avg(rate(network.total_bytes_in)) BY cluster, region, time_bucket = bucket(@timestamp,5minute)
231+
| SORT rate_bytes_in DESC, time_bucket, cluster | LIMIT 10;
232+
233+
rate_bytes_in:double | cluster:keyword | region:keyword | time_bucket:datetime
234+
15.025267167998313 | qa | null | 2024-05-10T00:15:00.000Z
235+
13.638384356589611 | qa | null | 2024-05-10T00:05:00.000Z
236+
11.761724575728252 | prod | [eu, us] | 2024-05-10T00:15:00.000Z
237+
7.453275209904956 | qa | null | 2024-05-10T00:10:00.000Z
238+
7.307225056633641 | staging | us | 2024-05-10T00:05:00.000Z
239+
7.203958127639015 | prod | [eu, us] | 2024-05-10T00:10:00.000Z
240+
6.34494062999877 | staging | us | 2024-05-10T00:10:00.000Z
241+
5.700488689624205 | prod | [eu, us] | 2024-05-10T00:20:00.000Z
242+
5.4539153439153445 | prod | [eu, us] | 2024-05-10T00:00:00.000Z
243+
5.241187469367376 | staging | us | 2024-05-10T00:00:00.000Z
244+
;

x-pack/plugin/esql/qa/testFixtures/src/main/resources/lookup-join.csv-spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4742,7 +4742,7 @@ FROM k8s
47424742
| RENAME network.bytes_in AS language_code
47434743
| WHERE language_code < 10
47444744
| LOOKUP JOIN languages_lookup ON language_code
4745-
| DROP network*, event*
4745+
| DROP network*, event*, region
47464746
| SORT language_code, @timestamp
47474747
;
47484748

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1499,7 +1499,14 @@ public enum Cap {
14991499
/**
15001500
* Support for dots in FUSE attributes
15011501
*/
1502-
DOTS_IN_FUSE;
1502+
DOTS_IN_FUSE,
1503+
1504+
/**
1505+
* Pack dimension values in TS command
1506+
*/
1507+
PACK_DIMENSIONS_IN_TS
1508+
1509+
;
15031510

15041511
private final boolean enabled;
15051512

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.expression.function.scalar.internal;
9+
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.compute.data.BooleanBlock;
12+
import org.elasticsearch.compute.data.BytesRefBlock;
13+
import org.elasticsearch.compute.data.BytesRefVector;
14+
import org.elasticsearch.compute.data.IntBlock;
15+
import org.elasticsearch.compute.data.LongBlock;
16+
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
17+
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
18+
import org.elasticsearch.compute.operator.DriverContext;
19+
import org.elasticsearch.compute.operator.topn.TopNEncoder;
20+
21+
final class InternalPacks {
22+
private static final TopNEncoder ENCODER = TopNEncoder.DEFAULT_UNSORTABLE;
23+
public static final int INITIAL_SIZE_IN_BYTES = 6 * 1024;
24+
25+
static int estimateForBytesBuilder(int positionCount) {
26+
// allocate at least one page for the bytes block builder to avoid copying during resizing
27+
return Math.max(INITIAL_SIZE_IN_BYTES, positionCount);
28+
}
29+
30+
static BytesRefBlock packBytesValues(DriverContext driverContext, BytesRefBlock raw) {
31+
BytesRefVector vector = raw.asVector();
32+
if (vector != null) {
33+
OrdinalBytesRefVector ordinals = vector.asOrdinals();
34+
if (ordinals != null) {
35+
var encoded = packBytesVector(driverContext, ordinals.getDictionaryVector());
36+
ordinals.getOrdinalsVector().incRef();
37+
return new OrdinalBytesRefVector(ordinals.getOrdinalsVector(), encoded).asBlock();
38+
} else {
39+
return packBytesVector(driverContext, vector).asBlock();
40+
}
41+
}
42+
int positionCount = raw.getPositionCount();
43+
try (
44+
var builder = driverContext.blockFactory().newBytesRefBlockBuilder(estimateForBytesBuilder(positionCount));
45+
var work = new BreakingBytesRefBuilder(driverContext.breaker(), "pack_dimensions", 1024)
46+
) {
47+
BytesRef scratch = new BytesRef();
48+
for (int p = 0; p < positionCount; p++) {
49+
int valueCount = raw.getValueCount(p);
50+
if (valueCount == 0) {
51+
builder.appendNull();
52+
continue;
53+
}
54+
work.clear();
55+
int first = raw.getFirstValueIndex(p);
56+
int end = first + valueCount;
57+
for (int i = first; i < end; i++) {
58+
raw.getBytesRef(i, scratch);
59+
ENCODER.encodeBytesRef(scratch, work);
60+
}
61+
builder.appendBytesRef(work.bytesRefView());
62+
}
63+
return builder.build();
64+
}
65+
}
66+
67+
static BytesRefVector packBytesVector(DriverContext driverContext, BytesRefVector encode) {
68+
int positionCount = encode.getPositionCount();
69+
try (
70+
var builder = driverContext.blockFactory().newBytesRefVectorBuilder(estimateForBytesBuilder(positionCount));
71+
var work = new BreakingBytesRefBuilder(driverContext.breaker(), "pack_values", 1024)
72+
) {
73+
BytesRef scratch = new BytesRef();
74+
for (int p = 0; p < positionCount; p++) {
75+
ENCODER.encodeBytesRef(encode.getBytesRef(p, scratch), work);
76+
builder.appendBytesRef(work.bytesRefView());
77+
work.clear();
78+
}
79+
return builder.build();
80+
}
81+
}
82+
83+
static BytesRefBlock unpackBytesValues(DriverContext driverContext, BytesRefBlock encoded) {
84+
int positionCount = encoded.getPositionCount();
85+
try (var builder = driverContext.blockFactory().newBytesRefBlockBuilder(estimateForBytesBuilder(positionCount));) {
86+
BytesRef inScratch = new BytesRef();
87+
BytesRef outScratch = new BytesRef();
88+
for (int p = 0; p < positionCount; p++) {
89+
if (encoded.isNull(p)) {
90+
builder.appendNull();
91+
continue;
92+
}
93+
BytesRef row = encoded.getBytesRef(p, inScratch);
94+
var v = ENCODER.decodeBytesRef(row, outScratch);
95+
if (row.length == 0) {
96+
builder.appendBytesRef(v);
97+
} else {
98+
builder.beginPositionEntry();
99+
builder.appendBytesRef(v);
100+
while (row.length > 0) {
101+
v = ENCODER.decodeBytesRef(row, outScratch);
102+
builder.appendBytesRef(v);
103+
}
104+
builder.endPositionEntry();
105+
}
106+
}
107+
return builder.build();
108+
}
109+
}
110+
111+
static BytesRefBlock packLongValues(DriverContext driverContext, LongBlock raw) {
112+
int positionCount = raw.getPositionCount();
113+
try (
114+
var builder = driverContext.blockFactory().newBytesRefBlockBuilder(estimateForBytesBuilder(positionCount));
115+
var work = new BreakingBytesRefBuilder(driverContext.breaker(), "pack_values", 32)
116+
) {
117+
for (int p = 0; p < positionCount; p++) {
118+
int valueCount = raw.getValueCount(p);
119+
if (valueCount == 0) {
120+
builder.appendNull();
121+
continue;
122+
}
123+
work.clear();
124+
int first = raw.getFirstValueIndex(p);
125+
if (valueCount == 1) {
126+
ENCODER.encodeLong(raw.getLong(first), work);
127+
} else {
128+
int end = first + valueCount;
129+
for (int i = first; i < end; i++) {
130+
ENCODER.encodeLong(raw.getLong(i), work);
131+
}
132+
}
133+
builder.appendBytesRef(work.bytesRefView());
134+
}
135+
return builder.build();
136+
}
137+
}
138+
139+
static LongBlock unpackLongValues(DriverContext driverContext, BytesRefBlock encoded) {
140+
int positionCount = encoded.getPositionCount();
141+
try (var builder = driverContext.blockFactory().newLongBlockBuilder(positionCount)) {
142+
BytesRef inScratch = new BytesRef();
143+
for (int p = 0; p < positionCount; p++) {
144+
if (encoded.isNull(p)) {
145+
builder.appendNull();
146+
continue;
147+
}
148+
BytesRef row = encoded.getBytesRef(p, inScratch);
149+
var v = ENCODER.decodeLong(row);
150+
if (row.length == 0) {
151+
builder.appendLong(v);
152+
} else {
153+
builder.beginPositionEntry();
154+
builder.appendLong(v);
155+
while (row.length > 0) {
156+
builder.appendLong(ENCODER.decodeLong(row));
157+
}
158+
builder.endPositionEntry();
159+
}
160+
}
161+
return builder.build();
162+
}
163+
}
164+
165+
static BytesRefBlock packIntValues(DriverContext driverContext, IntBlock raw) {
166+
int positionCount = raw.getPositionCount();
167+
try (
168+
var builder = driverContext.blockFactory().newBytesRefBlockBuilder(estimateForBytesBuilder(positionCount));
169+
var work = new BreakingBytesRefBuilder(driverContext.breaker(), "pack_values", 32)
170+
) {
171+
for (int p = 0; p < positionCount; p++) {
172+
int valueCount = raw.getValueCount(p);
173+
if (valueCount == 0) {
174+
builder.appendNull();
175+
continue;
176+
}
177+
work.clear();
178+
int first = raw.getFirstValueIndex(p);
179+
if (valueCount == 1) {
180+
ENCODER.encodeInt(raw.getInt(first), work);
181+
} else {
182+
int end = first + valueCount;
183+
for (int i = first; i < end; i++) {
184+
ENCODER.encodeInt(raw.getInt(i), work);
185+
}
186+
}
187+
builder.appendBytesRef(work.bytesRefView());
188+
}
189+
return builder.build();
190+
}
191+
}
192+
193+
static IntBlock unpackIntValues(DriverContext driverContext, BytesRefBlock encoded) {
194+
int positionCount = encoded.getPositionCount();
195+
try (IntBlock.Builder builder = driverContext.blockFactory().newIntBlockBuilder(positionCount)) {
196+
BytesRef inScratch = new BytesRef();
197+
for (int p = 0; p < positionCount; p++) {
198+
if (encoded.isNull(p)) {
199+
builder.appendNull();
200+
continue;
201+
}
202+
BytesRef row = encoded.getBytesRef(p, inScratch);
203+
int v = ENCODER.decodeInt(row);
204+
if (row.length == 0) {
205+
builder.appendInt(v);
206+
} else {
207+
builder.beginPositionEntry();
208+
builder.appendInt(v);
209+
while (row.length > 0) {
210+
builder.appendInt(ENCODER.decodeInt(row));
211+
}
212+
builder.endPositionEntry();
213+
}
214+
}
215+
return builder.build();
216+
}
217+
}
218+
219+
static BytesRefBlock packBooleanValues(DriverContext driverContext, BooleanBlock raw) {
220+
int positionCount = raw.getPositionCount();
221+
try (
222+
var builder = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount);
223+
var work = new BreakingBytesRefBuilder(driverContext.breaker(), "pack_values", 32)
224+
) {
225+
for (int p = 0; p < positionCount; p++) {
226+
work.clear();
227+
int valueCount = raw.getValueCount(p);
228+
if (valueCount == 0) {
229+
builder.appendNull();
230+
continue;
231+
}
232+
int first = raw.getFirstValueIndex(p);
233+
if (valueCount == 1) {
234+
ENCODER.encodeBoolean(raw.getBoolean(first), work);
235+
} else {
236+
int end = first + valueCount;
237+
for (int i = first; i < end; i++) {
238+
ENCODER.encodeBoolean(raw.getBoolean(i), work);
239+
}
240+
}
241+
builder.appendBytesRef(work.bytesRefView());
242+
}
243+
return builder.build();
244+
}
245+
}
246+
247+
static BooleanBlock unpackBooleanValues(DriverContext driverContext, BytesRefBlock encoded) {
248+
int positionCount = encoded.getPositionCount();
249+
try (var builder = driverContext.blockFactory().newBooleanBlockBuilder(positionCount)) {
250+
BytesRef inScratch = new BytesRef();
251+
for (int p = 0; p < positionCount; p++) {
252+
if (encoded.isNull(p)) {
253+
builder.appendNull();
254+
continue;
255+
}
256+
BytesRef row = encoded.getBytesRef(p, inScratch);
257+
boolean v = ENCODER.decodeBoolean(row);
258+
if (row.length == 0) {
259+
builder.appendBoolean(v);
260+
} else {
261+
builder.beginPositionEntry();
262+
builder.appendBoolean(v);
263+
while (row.length > 0) {
264+
builder.appendBoolean(ENCODER.decodeBoolean(row));
265+
}
266+
builder.endPositionEntry();
267+
}
268+
}
269+
return builder.build();
270+
}
271+
}
272+
}

0 commit comments

Comments
 (0)