Skip to content

Commit d98b9cb

Browse files
authored
Introduce a filtered collector manager (#96824)
In order to add support for inter-segment search concurrency, we need to implement collector managers for all of our custom collectors. This PR introduces a collector manager that is based on FilteredCollector, used when a post_filter is provided as part of a search request. Note that the collector manager is not yet integrated in the query phase.
1 parent 8a2ff09 commit d98b9cb

File tree

3 files changed

+61
-4
lines changed

3 files changed

+61
-4
lines changed

docs/changelog/96824.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96824
2+
summary: Introduce a filtered collector manager
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/common/lucene/search/FilteredCollector.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.lucene.index.LeafReaderContext;
1111
import org.apache.lucene.search.Collector;
12+
import org.apache.lucene.search.CollectorManager;
1213
import org.apache.lucene.search.FilterLeafCollector;
1314
import org.apache.lucene.search.LeafCollector;
1415
import org.apache.lucene.search.ScoreMode;
@@ -18,6 +19,8 @@
1819
import org.elasticsearch.common.lucene.Lucene;
1920

2021
import java.io.IOException;
22+
import java.util.Collection;
23+
import java.util.List;
2124

2225
/**
2326
* Collector that wraps another collector and collects only documents that match the provided filter.
@@ -60,4 +63,27 @@ public void collect(int doc) throws IOException {
6063
public ScoreMode scoreMode() {
6164
return collector.scoreMode();
6265
}
66+
67+
/**
68+
* Creates a {@link CollectorManager} for {@link FilteredCollector}, which enables inter-segment search concurrency
69+
* when a <code>post_filter</code> is provided as part of a search request.
70+
*/
71+
public static <C extends Collector, T> CollectorManager<FilteredCollector, T> createManager(
72+
CollectorManager<C, T> collectorManager,
73+
Weight filter
74+
) {
75+
return new CollectorManager<>() {
76+
@Override
77+
public FilteredCollector newCollector() throws IOException {
78+
return new FilteredCollector(collectorManager.newCollector(), filter);
79+
}
80+
81+
@Override
82+
public T reduce(Collection<FilteredCollector> collectors) throws IOException {
83+
@SuppressWarnings("unchecked")
84+
List<C> innerCollectors = collectors.stream().map(filteredCollector -> (C) filteredCollector.collector).toList();
85+
return collectorManager.reduce(innerCollectors);
86+
}
87+
};
88+
}
6389
}

server/src/test/java/org/elasticsearch/common/lucene/search/FilteredCollectorTests.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
import org.apache.lucene.document.StringField;
1414
import org.apache.lucene.index.IndexReader;
1515
import org.apache.lucene.index.Term;
16+
import org.apache.lucene.search.CollectorManager;
1617
import org.apache.lucene.search.IndexSearcher;
1718
import org.apache.lucene.search.MatchAllDocsQuery;
1819
import org.apache.lucene.search.ScoreMode;
1920
import org.apache.lucene.search.TermQuery;
21+
import org.apache.lucene.search.TopDocs;
2022
import org.apache.lucene.search.TopScoreDocCollector;
2123
import org.apache.lucene.search.TotalHitCountCollector;
2224
import org.apache.lucene.search.Weight;
@@ -39,7 +41,7 @@ public void setUp() throws Exception {
3941
super.setUp();
4042
directory = newDirectory();
4143
RandomIndexWriter writer = new RandomIndexWriter(random(), directory, newIndexWriterConfig());
42-
numDocs = randomIntBetween(10, 100);
44+
numDocs = randomIntBetween(900, 1000);
4345
for (int i = 0; i < numDocs; i++) {
4446
Document doc = new Document();
4547
doc.add(new StringField("field1", "value", Field.Store.NO));
@@ -62,19 +64,19 @@ public void tearDown() throws Exception {
6264

6365
public void testFiltering() throws IOException {
6466
{
65-
TopScoreDocCollector topScoreDocCollector = TopScoreDocCollector.create(1, 100);
67+
TopScoreDocCollector topScoreDocCollector = TopScoreDocCollector.create(1, 1000);
6668
searcher.search(new MatchAllDocsQuery(), topScoreDocCollector);
6769
assertEquals(numDocs, topScoreDocCollector.topDocs().totalHits.value);
6870
}
6971
{
70-
TopScoreDocCollector topScoreDocCollector = TopScoreDocCollector.create(1, 100);
72+
TopScoreDocCollector topScoreDocCollector = TopScoreDocCollector.create(1, 1000);
7173
TermQuery termQuery = new TermQuery(new Term("field2", "value"));
7274
Weight filterWeight = termQuery.createWeight(searcher, ScoreMode.TOP_DOCS, 1f);
7375
searcher.search(new MatchAllDocsQuery(), new FilteredCollector(topScoreDocCollector, filterWeight));
7476
assertEquals(1, topScoreDocCollector.topDocs().totalHits.value);
7577
}
7678
{
77-
TopScoreDocCollector topScoreDocCollector = TopScoreDocCollector.create(1, 100);
79+
TopScoreDocCollector topScoreDocCollector = TopScoreDocCollector.create(1, 1000);
7880
TermQuery termQuery = new TermQuery(new Term("field1", "value"));
7981
Weight filterWeight = termQuery.createWeight(searcher, ScoreMode.TOP_DOCS, 1f);
8082
searcher.search(new MatchAllDocsQuery(), new FilteredCollector(topScoreDocCollector, filterWeight));
@@ -96,4 +98,28 @@ public void testWeightIsNotPropagated() throws IOException {
9698
assertEquals(1, totalHitCountCollector.getTotalHits());
9799
}
98100
}
101+
102+
public void testManager() throws IOException {
103+
{
104+
CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(1, null, 1000);
105+
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), topDocsManager);
106+
assertEquals(numDocs, topDocs.totalHits.value);
107+
}
108+
{
109+
CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(1, null, 1000);
110+
TermQuery termQuery = new TermQuery(new Term("field2", "value"));
111+
Weight filterWeight = termQuery.createWeight(searcher, ScoreMode.TOP_DOCS, 1f);
112+
CollectorManager<FilteredCollector, TopDocs> filteredManager = FilteredCollector.createManager(topDocsManager, filterWeight);
113+
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), filteredManager);
114+
assertEquals(1, topDocs.totalHits.value);
115+
}
116+
{
117+
CollectorManager<TopScoreDocCollector, TopDocs> topDocsManager = TopScoreDocCollector.createSharedManager(1, null, 1000);
118+
TermQuery termQuery = new TermQuery(new Term("field1", "value"));
119+
Weight filterWeight = termQuery.createWeight(searcher, ScoreMode.TOP_DOCS, 1f);
120+
CollectorManager<FilteredCollector, TopDocs> filteredManager = FilteredCollector.createManager(topDocsManager, filterWeight);
121+
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), filteredManager);
122+
assertEquals(numDocs, topDocs.totalHits.value);
123+
}
124+
}
99125
}

0 commit comments

Comments
 (0)