Skip to content

Commit 79ad42c

Browse files
authored
Add support for dynamic pruning to cardinality aggregations on low-cardinality keyword fields. (#92060)
On low-cardinality keyword fields, the `cardinality` aggregation currently uses the `global_ordinals` execution mode most of the time, which consists of collecting all documents that match the query, reading ordinals of the values that these documents contain, and setting bits in a bitset for these ordinals. This commit introduces a feedback loop between the query and the `cardinality` aggregator, which allows the query to skip documents that only contain values that have already been seen by the `cardinality` aggregator. On the `nyc_taxis` dataset, a `match_all` query and the `vendor_id` field (2 unique values), the `cardinality` aggregation went from 3s to 3ms. The speedup would certainly not be as good in all cases, but I would still expect in to be very significant in many cases.
1 parent f2d9a3f commit 79ad42c

File tree

6 files changed

+559
-1
lines changed

6 files changed

+559
-1
lines changed

docs/changelog/92060.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 92060
2+
summary: Add support for dynamic pruning to cardinality aggregations on low-cardinality
3+
keyword fields
4+
area: Aggregations
5+
type: enhancement
6+
issues: []

server/src/main/java/org/elasticsearch/common/util/BitArray.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ public void set(long index) {
3939
bits.set(wordNum, bits.get(wordNum) | bitmask(index));
4040
}
4141

42+
/**
43+
* Set the {@code index}th bit and return {@code true} if the bit was set already.
44+
*/
45+
public boolean getAndSet(long index) {
46+
long wordNum = wordNum(index);
47+
bits = bigArrays.grow(bits, wordNum + 1);
48+
long word = bits.get(wordNum);
49+
long bitMask = bitmask(index);
50+
bits.set(wordNum, word | bitMask);
51+
return (word & bitMask) != 0;
52+
}
53+
4254
/** this = this OR other */
4355
public void or(BitArray other) {
4456
or(other.bits);

server/src/main/java/org/elasticsearch/search/aggregations/metrics/CardinalityAggregatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,17 @@ public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
157157
if (valuesSourceConfig.hasValues()) {
158158
if (valuesSourceConfig.getValuesSource() instanceof final ValuesSource.Bytes.WithOrdinals source) {
159159
if (executionMode.useGlobalOrdinals(context, source, precision)) {
160+
final String field;
161+
if (valuesSourceConfig.alignesWithSearchIndex()) {
162+
field = valuesSourceConfig.fieldType().name();
163+
} else {
164+
field = null;
165+
}
160166
final long maxOrd = source.globalMaxOrd(context.searcher().getIndexReader());
161167
return new GlobalOrdCardinalityAggregator(
162168
name,
163169
source,
170+
field,
164171
precision,
165172
Math.toIntExact(maxOrd),
166173
context,

server/src/main/java/org/elasticsearch/search/aggregations/metrics/GlobalOrdCardinalityAggregator.java

Lines changed: 206 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@
88

99
package org.elasticsearch.search.aggregations.metrics;
1010

11+
import org.apache.lucene.index.FieldInfo;
12+
import org.apache.lucene.index.IndexOptions;
13+
import org.apache.lucene.index.PostingsEnum;
1114
import org.apache.lucene.index.SortedSetDocValues;
15+
import org.apache.lucene.index.Terms;
16+
import org.apache.lucene.index.TermsEnum;
17+
import org.apache.lucene.search.DocIdSetIterator;
1218
import org.apache.lucene.search.ScoreMode;
1319
import org.apache.lucene.util.BytesRef;
20+
import org.apache.lucene.util.PriorityQueue;
1421
import org.elasticsearch.common.hash.MurmurHash3;
1522
import org.elasticsearch.common.util.BigArrays;
1623
import org.elasticsearch.common.util.BitArray;
@@ -26,18 +33,35 @@
2633
import org.elasticsearch.search.aggregations.support.ValuesSource;
2734

2835
import java.io.IOException;
36+
import java.util.HashMap;
2937
import java.util.Map;
38+
import java.util.Objects;
39+
import java.util.function.BiConsumer;
3040

3141
/**
3242
* An aggregator that computes approximate counts of unique values
3343
* using global ords.
3444
*/
3545
public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.SingleValue {
3646

47+
// Don't try to dynamically prune fields that have more than 1024 unique terms, there is a chance we never get to 128 unseen terms, and
48+
// we'd be paying the overhead of dynamic pruning without getting any benefits.
49+
private static final int MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING = 1024;
50+
51+
// Only start dynamic pruning when 128 ordinals or less have not been seen yet.
52+
private static final int MAX_TERMS_FOR_DYNAMIC_PRUNING = 128;
53+
3754
private final ValuesSource.Bytes.WithOrdinals valuesSource;
55+
// The field that this cardinality aggregation runs on, or null if there is no field, or the field doesn't directly map to an index
56+
// field.
57+
private final String field;
3858
private final BigArrays bigArrays;
3959
private final int maxOrd;
4060
private final int precision;
61+
private int dynamicPruningAttempts;
62+
private int dynamicPruningSuccess;
63+
private int bruteForce;
64+
private int noData;
4165

4266
// Build at post-collection phase
4367
@Nullable
@@ -48,6 +72,7 @@ public class GlobalOrdCardinalityAggregator extends NumericMetricsAggregator.Sin
4872
public GlobalOrdCardinalityAggregator(
4973
String name,
5074
ValuesSource.Bytes.WithOrdinals valuesSource,
75+
String field,
5176
int precision,
5277
int maxOrd,
5378
AggregationContext context,
@@ -56,6 +81,7 @@ public GlobalOrdCardinalityAggregator(
5681
) throws IOException {
5782
super(name, context, parent, metadata);
5883
this.valuesSource = valuesSource;
84+
this.field = field;
5985
this.precision = precision;
6086
this.maxOrd = maxOrd;
6187
this.bigArrays = context.bigArrays();
@@ -64,12 +90,182 @@ public GlobalOrdCardinalityAggregator(
6490

6591
@Override
6692
public ScoreMode scoreMode() {
67-
return valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
93+
if (field != null && valuesSource.needsScores() == false && maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING) {
94+
return ScoreMode.TOP_DOCS;
95+
} else if (valuesSource.needsScores()) {
96+
return ScoreMode.COMPLETE;
97+
} else {
98+
return ScoreMode.COMPLETE_NO_SCORES;
99+
}
100+
}
101+
102+
/**
103+
* A competitive iterator that helps only collect values that have not been collected so far.
104+
*/
105+
private class CompetitiveIterator extends DocIdSetIterator {
106+
107+
private final BitArray visitedOrds;
108+
private long numNonVisitedOrds;
109+
private final TermsEnum indexTerms;
110+
private final DocIdSetIterator docsWithField;
111+
112+
CompetitiveIterator(int numNonVisitedOrds, BitArray visitedOrds, Terms indexTerms, DocIdSetIterator docsWithField)
113+
throws IOException {
114+
this.visitedOrds = visitedOrds;
115+
this.numNonVisitedOrds = numNonVisitedOrds;
116+
this.indexTerms = Objects.requireNonNull(indexTerms).iterator();
117+
this.docsWithField = docsWithField;
118+
}
119+
120+
private Map<Long, PostingsEnum> nonVisitedOrds;
121+
private PriorityQueue<PostingsEnum> nonVisitedPostings;
122+
123+
private int doc = -1;
124+
125+
@Override
126+
public int docID() {
127+
return doc;
128+
}
129+
130+
@Override
131+
public int nextDoc() throws IOException {
132+
return advance(doc + 1);
133+
}
134+
135+
@Override
136+
public int advance(int target) throws IOException {
137+
if (nonVisitedPostings == null) {
138+
// We haven't started pruning yet, iterate on docs that have a value. This may already help a lot on sparse fields.
139+
return doc = docsWithField.advance(target);
140+
} else if (nonVisitedPostings.size() == 0) {
141+
return doc = DocIdSetIterator.NO_MORE_DOCS;
142+
} else {
143+
PostingsEnum top = nonVisitedPostings.top();
144+
while (top.docID() < target) {
145+
top.advance(target);
146+
top = nonVisitedPostings.updateTop();
147+
}
148+
return doc = top.docID();
149+
}
150+
}
151+
152+
@Override
153+
public long cost() {
154+
return docsWithField.cost();
155+
}
156+
157+
void startPruning() throws IOException {
158+
dynamicPruningSuccess++;
159+
nonVisitedOrds = new HashMap<>();
160+
// TODO: iterate the bitset using a `nextClearBit` operation?
161+
for (long ord = 0; ord < maxOrd; ++ord) {
162+
if (visitedOrds.get(ord)) {
163+
continue;
164+
}
165+
BytesRef term = values.lookupOrd(ord);
166+
if (indexTerms.seekExact(term) == false) {
167+
// This global ordinal maps to a value that doesn't exist in this segment
168+
continue;
169+
}
170+
nonVisitedOrds.put(ord, indexTerms.postings(null, PostingsEnum.NONE));
171+
}
172+
nonVisitedPostings = new PriorityQueue<>(nonVisitedOrds.size()) {
173+
@Override
174+
protected boolean lessThan(PostingsEnum a, PostingsEnum b) {
175+
return a.docID() < b.docID();
176+
}
177+
};
178+
for (PostingsEnum pe : nonVisitedOrds.values()) {
179+
nonVisitedPostings.add(pe);
180+
}
181+
}
182+
183+
void onVisitedOrdinal(long ordinal) throws IOException {
184+
numNonVisitedOrds--;
185+
if (nonVisitedOrds == null) {
186+
if (numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
187+
startPruning();
188+
}
189+
} else {
190+
if (nonVisitedOrds.remove(ordinal) != null) {
191+
// Could we make this more efficient?
192+
nonVisitedPostings.clear();
193+
for (PostingsEnum pe : nonVisitedOrds.values()) {
194+
nonVisitedPostings.add(pe);
195+
}
196+
}
197+
}
198+
}
68199
}
69200

70201
@Override
71202
public LeafBucketCollector getLeafCollector(AggregationExecutionContext aggCtx, final LeafBucketCollector sub) throws IOException {
72203
values = valuesSource.globalOrdinalsValues(aggCtx.getLeafReaderContext());
204+
205+
if (parent == null && field != null) {
206+
// This optimization only applies to top-level cardinality aggregations that apply to fields indexed with an inverted index.
207+
final Terms indexTerms = aggCtx.getLeafReaderContext().reader().terms(field);
208+
if (indexTerms != null) {
209+
BitArray bits = visitedOrds.get(0);
210+
final int numNonVisitedOrds = maxOrd - (bits == null ? 0 : (int) bits.cardinality());
211+
if (maxOrd <= MAX_FIELD_CARDINALITY_FOR_DYNAMIC_PRUNING || numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
212+
dynamicPruningAttempts++;
213+
return new LeafBucketCollector() {
214+
215+
final BitArray bits;
216+
final CompetitiveIterator competitiveIterator;
217+
218+
{
219+
// This optimization only works for top-level cardinality aggregations that collect bucket 0, so we can retrieve
220+
// the appropriate BitArray ahead of time.
221+
visitedOrds = bigArrays.grow(visitedOrds, 1);
222+
BitArray bits = visitedOrds.get(0);
223+
if (bits == null) {
224+
bits = new BitArray(maxOrd, bigArrays);
225+
visitedOrds.set(0, bits);
226+
}
227+
this.bits = bits;
228+
final DocIdSetIterator docsWithField = valuesSource.ordinalsValues(aggCtx.getLeafReaderContext());
229+
competitiveIterator = new CompetitiveIterator(numNonVisitedOrds, bits, indexTerms, docsWithField);
230+
if (numNonVisitedOrds <= MAX_TERMS_FOR_DYNAMIC_PRUNING) {
231+
competitiveIterator.startPruning();
232+
}
233+
}
234+
235+
@Override
236+
public void collect(int doc, long bucketOrd) throws IOException {
237+
if (values.advanceExact(doc)) {
238+
for (long ord = values.nextOrd(); ord != SortedSetDocValues.NO_MORE_ORDS; ord = values.nextOrd()) {
239+
if (bits.getAndSet(ord) == false) {
240+
competitiveIterator.onVisitedOrdinal(ord);
241+
}
242+
}
243+
}
244+
}
245+
246+
@Override
247+
public CompetitiveIterator competitiveIterator() {
248+
return competitiveIterator;
249+
}
250+
};
251+
}
252+
} else {
253+
final FieldInfo fi = aggCtx.getLeafReaderContext().reader().getFieldInfos().fieldInfo(field);
254+
if (fi == null) {
255+
// The field doesn't exist at all, we can skip the segment entirely
256+
noData++;
257+
return LeafBucketCollector.NO_OP_COLLECTOR;
258+
} else if (fi.getIndexOptions() != IndexOptions.NONE) {
259+
// The field doesn't have terms while index options are not NONE. This means that this segment doesn't have a single
260+
// value for the field.
261+
noData++;
262+
return LeafBucketCollector.NO_OP_COLLECTOR;
263+
}
264+
// Otherwise we might be aggregating e.g. an IP field, which indexes data using points rather than an inverted index.
265+
}
266+
}
267+
268+
bruteForce++;
73269
return new LeafBucketCollector() {
74270
@Override
75271
public void collect(int doc, long bucketOrd) throws IOException {
@@ -157,4 +353,13 @@ protected void doClose() {
157353
}
158354
Releasables.close(visitedOrds, counts);
159355
}
356+
357+
@Override
358+
public void collectDebugInfo(BiConsumer<String, Object> add) {
359+
super.collectDebugInfo(add);
360+
add.accept("dynamic_pruning_attempted", dynamicPruningAttempts);
361+
add.accept("dynamic_pruning_used", dynamicPruningSuccess);
362+
add.accept("brute_force_used", bruteForce);
363+
add.accept("skipped_due_to_no_data", noData);
364+
}
160365
}

server/src/test/java/org/elasticsearch/common/util/BitArrayTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,4 +154,21 @@ public void testCardinality() {
154154
}
155155
}
156156
}
157+
158+
public void testGetAndSet() {
159+
try (BitArray bitArray = new BitArray(1, BigArrays.NON_RECYCLING_INSTANCE)) {
160+
assertFalse(bitArray.getAndSet(100));
161+
assertFalse(bitArray.getAndSet(1000));
162+
assertTrue(bitArray.getAndSet(100));
163+
assertFalse(bitArray.getAndSet(101));
164+
assertFalse(bitArray.getAndSet(999));
165+
assertTrue(bitArray.getAndSet(1000));
166+
assertFalse(bitArray.get(99));
167+
assertTrue(bitArray.get(100));
168+
assertTrue(bitArray.get(101));
169+
assertTrue(bitArray.get(999));
170+
assertTrue(bitArray.get(1000));
171+
assertFalse(bitArray.get(1001));
172+
}
173+
}
157174
}

0 commit comments

Comments
 (0)