Skip to content

Commit 23ee3b6

Browse files
JonasKunzkkrik-eselasticsearchmachinefelixbarny
authored
Add mapper for exponential histograms (#132493)
* Add mapper for exponential_histogram type * Fix exception types, remove todos * Fix javadoc * Fix zerobucket comments and visibility * Fix benchmark changes * Fix yaml test name * Review fixes * Split encoding and histogram implementation, general code cleanup * Update x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/EncodedHistogramData.java Co-authored-by: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> * Update x-pack/plugin/mapper-exponential-histogram/src/main/java/org/elasticsearch/xpack/exponentialhistogram/EncodedHistogramData.java Co-authored-by: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> * [CI] Auto commit changes from spotless * use multi-line comment to make spotless not mess up the formatting * Rename "load" to "decode" * Add comment explaning why invalid mapping test does not apply * Refactor bucket encoding, fix comment * Apply suggestions from code review Co-authored-by: Felix Barnsteiner <felixbarny@users.noreply.github.com> * checkstyle --------- Co-authored-by: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co> Co-authored-by: Felix Barnsteiner <felixbarny@users.noreply.github.com>
1 parent 984dea7 commit 23ee3b6

File tree

14 files changed

+2133
-24
lines changed

14 files changed

+2133
-24
lines changed

benchmarks/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ dependencies {
4747
api(project(':x-pack:plugin:core'))
4848
api(project(':x-pack:plugin:esql'))
4949
api(project(':x-pack:plugin:esql:compute'))
50+
api(project(':x-pack:plugin:mapper-exponential-histogram'))
5051
implementation project(path: ':libs:native')
5152
implementation project(path: ':libs:simdvec')
5253
implementation project(path: ':libs:exponential-histogram')

benchmarks/src/main/java/org/elasticsearch/benchmark/exponentialhistogram/ExponentialHistogramMergeBench.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,15 @@
99

1010
package org.elasticsearch.benchmark.exponentialhistogram;
1111

12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.common.io.stream.BytesStreamOutput;
1214
import org.elasticsearch.exponentialhistogram.BucketIterator;
1315
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
1416
import org.elasticsearch.exponentialhistogram.ExponentialHistogramCircuitBreaker;
1517
import org.elasticsearch.exponentialhistogram.ExponentialHistogramGenerator;
1618
import org.elasticsearch.exponentialhistogram.ExponentialHistogramMerger;
19+
import org.elasticsearch.xpack.exponentialhistogram.CompressedExponentialHistogram;
20+
import org.elasticsearch.xpack.exponentialhistogram.IndexWithCount;
1721
import org.openjdk.jmh.annotations.Benchmark;
1822
import org.openjdk.jmh.annotations.BenchmarkMode;
1923
import org.openjdk.jmh.annotations.Fork;
@@ -27,6 +31,8 @@
2731
import org.openjdk.jmh.annotations.Threads;
2832
import org.openjdk.jmh.annotations.Warmup;
2933

34+
import java.io.IOException;
35+
import java.util.ArrayList;
3036
import java.util.List;
3137
import java.util.Random;
3238
import java.util.concurrent.ThreadLocalRandom;
@@ -47,6 +53,9 @@ public class ExponentialHistogramMergeBench {
4753
@Param({ "0.01", "0.1", "0.25", "0.5", "1.0", "2.0" })
4854
double mergedHistoSizeFactor;
4955

56+
@Param({ "array-backed", "compressed" })
57+
String histoImplementation;
58+
5059
Random random;
5160
ExponentialHistogramMerger histoMerger;
5261

@@ -81,16 +90,54 @@ public void setUp() {
8190
bucketIndex += 1 + random.nextInt(bucketCount) % (Math.max(1, bucketCount / dataPointSize));
8291
generator.add(Math.pow(1.001, bucketIndex));
8392
}
84-
toMerge[i] = generator.getAndClear();
85-
cnt = getBucketCount(toMerge[i]);
93+
ExponentialHistogram histogram = generator.getAndClear();
94+
cnt = getBucketCount(histogram);
8695
if (cnt < dataPointSize) {
87-
throw new IllegalArgumentException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
96+
throw new IllegalStateException("Expected bucket count to be " + dataPointSize + ", but was " + cnt);
97+
}
98+
99+
if ("array-backed".equals(histoImplementation)) {
100+
toMerge[i] = histogram;
101+
} else if ("compressed".equals(histoImplementation)) {
102+
toMerge[i] = asCompressedHistogram(histogram);
103+
} else {
104+
throw new IllegalArgumentException("Unknown implementation: " + histoImplementation);
88105
}
89106
}
90107

91108
index = 0;
92109
}
93110

111+
private ExponentialHistogram asCompressedHistogram(ExponentialHistogram histogram) {
112+
List<IndexWithCount> negativeBuckets = new ArrayList<>();
113+
List<IndexWithCount> positiveBuckets = new ArrayList<>();
114+
115+
BucketIterator it = histogram.negativeBuckets().iterator();
116+
while (it.hasNext()) {
117+
negativeBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
118+
it.advance();
119+
}
120+
it = histogram.positiveBuckets().iterator();
121+
while (it.hasNext()) {
122+
positiveBuckets.add(new IndexWithCount(it.peekIndex(), it.peekCount()));
123+
it.advance();
124+
}
125+
126+
long totalCount = histogram.zeroBucket().count() + histogram.negativeBuckets().valueCount() + histogram.positiveBuckets()
127+
.valueCount();
128+
BytesStreamOutput histoBytes = new BytesStreamOutput();
129+
try {
130+
CompressedExponentialHistogram.writeHistogramBytes(histoBytes, histogram.scale(), negativeBuckets, positiveBuckets);
131+
CompressedExponentialHistogram result = new CompressedExponentialHistogram();
132+
BytesRef data = histoBytes.bytes().toBytesRef();
133+
result.reset(histogram.zeroBucket().zeroThreshold(), totalCount, data);
134+
return result;
135+
} catch (IOException e) {
136+
throw new RuntimeException(e);
137+
}
138+
139+
}
140+
94141
private static int getBucketCount(ExponentialHistogram histo) {
95142
int cnt = 0;
96143
for (BucketIterator it : List.of(histo.negativeBuckets().iterator(), histo.positiveBuckets().iterator())) {

libs/exponential-histogram/src/main/java/org/elasticsearch/exponentialhistogram/ZeroBucket.java

Lines changed: 72 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.apache.lucene.util.RamUsageEstimator;
2525

26+
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_INDEX;
2627
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MAX_SCALE;
2728
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_INDEX;
2829
import static org.elasticsearch.exponentialhistogram.ExponentialHistogram.MIN_SCALE;
@@ -36,15 +37,28 @@
3637
* To allow efficient comparison with bucket boundaries, this class internally
3738
* represents the zero threshold as a exponential histogram bucket index with a scale,
3839
* computed via {@link ExponentialScaleUtils#computeIndex(double, int)}.
39-
*
40-
* @param index The index used with the scale to determine the zero threshold.
41-
* @param scale The scale used with the index to determine the zero threshold.
42-
* @param count The number of values in the zero bucket.
4340
*/
44-
public record ZeroBucket(long index, int scale, long count) {
41+
public final class ZeroBucket {
4542

4643
public static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ZeroBucket.class);
4744

45+
/**
46+
* The exponential histogram scale used for {@link #index}
47+
*/
48+
private final int scale;
49+
50+
/**
51+
* The exponential histogram bucket index whose upper boundary corresponds to the zero threshold.
52+
* Might be computed lazily from {@link #realThreshold}, uses {@link Long#MAX_VALUE} as placeholder in this case.
53+
*/
54+
private long index;
55+
56+
/**
57+
* Might be computed lazily from {@link #realThreshold}, uses {@link Double#NaN} as placeholder in this case.
58+
*/
59+
private double realThreshold;
60+
61+
private final long count;
4862
// A singleton for an empty zero bucket with the smallest possible threshold.
4963
private static final ZeroBucket MINIMAL_EMPTY = new ZeroBucket(MIN_INDEX, MIN_SCALE, 0);
5064

@@ -55,7 +69,27 @@ public record ZeroBucket(long index, int scale, long count) {
5569
* @param count The number of values in the bucket.
5670
*/
5771
public ZeroBucket(double zeroThreshold, long count) {
58-
this(computeIndex(zeroThreshold, MAX_SCALE) + 1, MAX_SCALE, count);
72+
assert zeroThreshold >= 0.0 : "zeroThreshold must not be negative";
73+
this.index = Long.MAX_VALUE; // compute lazily when needed
74+
this.scale = MAX_SCALE;
75+
this.realThreshold = zeroThreshold;
76+
this.count = count;
77+
}
78+
79+
private ZeroBucket(long index, int scale, long count) {
80+
assert index >= MIN_INDEX && index <= MAX_INDEX : "index must be in range [" + MIN_INDEX + ", " + MAX_INDEX + "]";
81+
assert scale >= MIN_SCALE && scale <= MAX_SCALE : "scale must be in range [" + MIN_SCALE + ", " + MAX_SCALE + "]";
82+
this.index = index;
83+
this.scale = scale;
84+
this.realThreshold = Double.NaN; // compute lazily when needed
85+
this.count = count;
86+
}
87+
88+
private ZeroBucket(double realThreshold, long index, int scale, long count) {
89+
this.realThreshold = realThreshold;
90+
this.index = index;
91+
this.scale = scale;
92+
this.count = count;
5993
}
6094

6195
/**
@@ -75,8 +109,33 @@ public static ZeroBucket minimalWithCount(long count) {
75109
if (count == 0) {
76110
return MINIMAL_EMPTY;
77111
} else {
78-
return new ZeroBucket(MINIMAL_EMPTY.index, MINIMAL_EMPTY.scale(), count);
112+
return new ZeroBucket(MINIMAL_EMPTY.zeroThreshold(), MINIMAL_EMPTY.index(), MINIMAL_EMPTY.scale(), count);
113+
}
114+
}
115+
116+
/**
117+
* @return The value of the zero threshold.
118+
*/
119+
public double zeroThreshold() {
120+
if (Double.isNaN(realThreshold)) {
121+
realThreshold = exponentiallyScaledToDoubleValue(index(), scale());
79122
}
123+
return realThreshold;
124+
}
125+
126+
public long index() {
127+
if (index == Long.MAX_VALUE) {
128+
index = computeIndex(zeroThreshold(), scale()) + 1;
129+
}
130+
return index;
131+
}
132+
133+
public int scale() {
134+
return scale;
135+
}
136+
137+
public long count() {
138+
return count;
80139
}
81140

82141
/**
@@ -99,9 +158,9 @@ public ZeroBucket merge(ZeroBucket other) {
99158
long totalCount = count + other.count;
100159
// Both are populated, so we need to use the higher zero-threshold.
101160
if (this.compareZeroThreshold(other) >= 0) {
102-
return new ZeroBucket(index, scale, totalCount);
161+
return new ZeroBucket(realThreshold, index, scale, totalCount);
103162
} else {
104-
return new ZeroBucket(other.index, other.scale, totalCount);
163+
return new ZeroBucket(other.realThreshold, other.index, other.scale, totalCount);
105164
}
106165
}
107166
}
@@ -133,14 +192,7 @@ public ZeroBucket collapseOverlappingBucketsForAll(BucketIterator... bucketItera
133192
* equal to, or greater than the other's.
134193
*/
135194
public int compareZeroThreshold(ZeroBucket other) {
136-
return compareExponentiallyScaledValues(index, scale, other.index, other.scale);
137-
}
138-
139-
/**
140-
* @return The value of the zero threshold.
141-
*/
142-
public double zeroThreshold() {
143-
return exponentiallyScaledToDoubleValue(index, scale);
195+
return compareExponentiallyScaledValues(index(), scale(), other.index(), other.scale());
144196
}
145197

146198
/**
@@ -154,7 +206,7 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) {
154206

155207
long collapsedCount = 0;
156208
long highestCollapsedIndex = 0;
157-
while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index, scale) < 0) {
209+
while (buckets.hasNext() && compareExponentiallyScaledValues(buckets.peekIndex(), buckets.scale(), index(), scale()) < 0) {
158210
highestCollapsedIndex = buckets.peekIndex();
159211
collapsedCount += buckets.peekCount();
160212
buckets.advance();
@@ -165,9 +217,9 @@ public ZeroBucket collapseOverlappingBuckets(BucketIterator buckets) {
165217
long newZeroCount = count + collapsedCount;
166218
// +1 because we need to adjust the zero threshold to the upper boundary of the collapsed bucket
167219
long collapsedUpperBoundIndex = highestCollapsedIndex + 1;
168-
if (compareExponentiallyScaledValues(index, scale, collapsedUpperBoundIndex, buckets.scale()) >= 0) {
220+
if (compareExponentiallyScaledValues(index(), scale(), collapsedUpperBoundIndex, buckets.scale()) >= 0) {
169221
// Our current zero-threshold is larger than the upper boundary of the largest collapsed bucket, so we keep it.
170-
return new ZeroBucket(index, scale, newZeroCount);
222+
return new ZeroBucket(realThreshold, index, scale, newZeroCount);
171223
} else {
172224
return new ZeroBucket(collapsedUpperBoundIndex, buckets.scale(), newZeroCount);
173225
}

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ExponentialHistogramMergerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,12 @@ public void testMergeOrderIndependence() {
151151
double[] vals = values.stream().mapToDouble(Double::doubleValue).toArray();
152152
try (ReleasableExponentialHistogram shuffled = ExponentialHistogram.create(20, breaker(), vals)) {
153153
assertThat("Expected same scale", shuffled.scale(), equalTo(reference.scale()));
154-
assertThat("Expected same zero-bucket", shuffled.zeroBucket(), equalTo(reference.zeroBucket()));
154+
assertThat(
155+
"Expected same threshold for zero-bucket",
156+
shuffled.zeroBucket().zeroThreshold(),
157+
equalTo(reference.zeroBucket().zeroThreshold())
158+
);
159+
assertThat("Expected same count for zero-bucket", shuffled.zeroBucket().count(), equalTo(reference.zeroBucket().count()));
155160
assertBucketsEqual(shuffled.negativeBuckets(), reference.negativeBuckets());
156161
assertBucketsEqual(shuffled.positiveBuckets(), reference.positiveBuckets());
157162
}

libs/exponential-histogram/src/test/java/org/elasticsearch/exponentialhistogram/ZeroBucketTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,33 @@ public class ZeroBucketTests extends ExponentialHistogramTestCase {
2828
public void testMinimalBucketHasZeroThreshold() {
2929
assertThat(ZeroBucket.minimalWithCount(42).zeroThreshold(), equalTo(0.0));
3030
}
31+
32+
public void testExactThresholdPreserved() {
33+
ZeroBucket bucket = new ZeroBucket(3.0, 10);
34+
assertThat(bucket.zeroThreshold(), equalTo(3.0));
35+
}
36+
37+
public void testMergingPreservesExactThreshold() {
38+
ZeroBucket bucketA = new ZeroBucket(3.0, 10);
39+
ZeroBucket bucketB = new ZeroBucket(3.5, 20);
40+
ZeroBucket merged = bucketA.merge(bucketB);
41+
assertThat(merged.zeroThreshold(), equalTo(3.5));
42+
assertThat(merged.count(), equalTo(30L));
43+
}
44+
45+
public void testBucketCollapsingPreservesExactThreshold() {
46+
FixedCapacityExponentialHistogram histo = createAutoReleasedHistogram(2);
47+
histo.resetBuckets(0);
48+
histo.tryAddBucket(0, 42, true); // bucket [1,2]
49+
50+
ZeroBucket bucketA = new ZeroBucket(3.0, 10);
51+
52+
CopyableBucketIterator iterator = histo.positiveBuckets().iterator();
53+
ZeroBucket merged = bucketA.collapseOverlappingBuckets(iterator);
54+
55+
assertThat(iterator.hasNext(), equalTo(false));
56+
assertThat(merged.zeroThreshold(), equalTo(3.0));
57+
assertThat(merged.count(), equalTo(52L));
58+
}
59+
3160
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
apply plugin: 'elasticsearch.internal-es-plugin'
9+
apply plugin: 'elasticsearch.internal-yaml-rest-test'
10+
11+
esplugin {
12+
name = 'exponential-histogram'
13+
description = 'Module for the exponential_histogram field type'
14+
classname ='org.elasticsearch.xpack.exponentialhistogram.ExponentialHistogramMapperPlugin'
15+
extendedPlugins = ['x-pack-core']
16+
}
17+
base {
18+
archivesName = 'x-pack-exponential-histogram'
19+
}
20+
21+
dependencies {
22+
api project(":libs:exponential-histogram")
23+
compileOnly project(path: xpackModule('core'))
24+
yamlRestTestImplementation(testArtifact(project(xpackModule('core'))))
25+
}
26+
27+
restResources {
28+
restApi {
29+
include '_common', 'indices', 'index', 'get'
30+
}
31+
}

0 commit comments

Comments
 (0)