Skip to content

Commit 438903f

Browse files
committed
Ensure LuceneChangesSnapshot reads in leaf order (elastic#31246)
Today we re-initialize DV instances while we read docs for the snapshot. This is caused by the fact that we sort the docs by seqID which causes then to be our of order. This change sorts documents temporarily by docID, fetches the metadata (not source) into a in-memory datastructure and sorts it back. This allows efficient reuse of DV instances.
1 parent 97743f3 commit 438903f

File tree

1 file changed

+88
-47
lines changed

1 file changed

+88
-47
lines changed

server/src/main/java/org/elasticsearch/index/engine/LuceneChangesSnapshot.java

Lines changed: 88 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@
2323
import org.apache.lucene.index.LeafReader;
2424
import org.apache.lucene.index.LeafReaderContext;
2525
import org.apache.lucene.index.NumericDocValues;
26-
import org.apache.lucene.index.ReaderUtil;
2726
import org.apache.lucene.index.Term;
28-
import org.apache.lucene.search.DocIdSetIterator;
2927
import org.apache.lucene.search.IndexSearcher;
3028
import org.apache.lucene.search.Query;
3129
import org.apache.lucene.search.ScoreDoc;
3230
import org.apache.lucene.search.Sort;
3331
import org.apache.lucene.search.SortField;
3432
import org.apache.lucene.search.SortedNumericSortField;
3533
import org.apache.lucene.search.TopDocs;
34+
import org.apache.lucene.util.ArrayUtil;
3635
import org.elasticsearch.common.bytes.BytesReference;
3736
import org.elasticsearch.common.lucene.Lucene;
3837
import org.elasticsearch.index.VersionType;
@@ -47,6 +46,7 @@
4746

4847
import java.io.Closeable;
4948
import java.io.IOException;
49+
import java.util.Comparator;
5050
import java.util.List;
5151
import java.util.Objects;
5252

@@ -67,9 +67,8 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
6767
private int docIndex = 0;
6868
private final int totalHits;
6969
private ScoreDoc[] scoreDocs;
70-
70+
private final ParallelArray parallelArray;
7171
private final Closeable onClose;
72-
private final CombinedDocValues[] docValues; // Cache of DocValues
7372

7473
/**
7574
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
@@ -97,15 +96,13 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
9796
this.requiredFullRange = requiredFullRange;
9897
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
9998
this.indexSearcher.setQueryCache(null);
99+
this.parallelArray = new ParallelArray(searchBatchSize);
100100
final TopDocs topDocs = searchOperations(null);
101+
101102
this.totalHits = Math.toIntExact(topDocs.totalHits);
102103
this.scoreDocs = topDocs.scoreDocs;
103-
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
104-
this.docValues = new CombinedDocValues[leaves.size()];
105-
for (LeafReaderContext leaf : leaves) {
106-
this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader());
107-
}
108104
this.onClose = engineSearcher;
105+
fillParallelArray(scoreDocs, parallelArray);
109106
}
110107

111108
@Override
@@ -126,8 +123,8 @@ public int overriddenOperations() {
126123
@Override
127124
public Translog.Operation next() throws IOException {
128125
Translog.Operation op = null;
129-
for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) {
130-
op = readDocAsOp(docId);
126+
for (int idx = nextDocIndex(); idx != -1; idx = nextDocIndex()) {
127+
op = readDocAsOp(idx);
131128
if (op != null) {
132129
break;
133130
}
@@ -156,19 +153,58 @@ private void rangeCheck(Translog.Operation op) {
156153
}
157154
}
158155

159-
private int nextDocId() throws IOException {
156+
private int nextDocIndex() throws IOException {
160157
// we have processed all docs in the current search - fetch the next batch
161158
if (docIndex == scoreDocs.length && docIndex > 0) {
162159
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
163160
scoreDocs = searchOperations(prev).scoreDocs;
161+
fillParallelArray(scoreDocs, parallelArray);
164162
docIndex = 0;
165163
}
166164
if (docIndex < scoreDocs.length) {
167-
int docId = scoreDocs[docIndex].doc;
165+
int idx = docIndex;
168166
docIndex++;
169-
return docId;
167+
return idx;
168+
}
169+
return -1;
170+
}
171+
172+
private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray) throws IOException {
173+
if (scoreDocs.length > 0) {
174+
for (int i = 0; i < scoreDocs.length; i++) {
175+
scoreDocs[i].shardIndex = i;
176+
}
177+
// for better loading performance we sort the array by docID and
178+
// then visit all leaves in order.
179+
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.doc));
180+
int docBase = -1;
181+
int maxDoc = 0;
182+
List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
183+
int readerIndex = 0;
184+
CombinedDocValues combinedDocValues = null;
185+
LeafReaderContext leaf = null;
186+
for (int i = 0; i < scoreDocs.length; i++) {
187+
ScoreDoc scoreDoc = scoreDocs[i];
188+
if (scoreDoc.doc >= docBase + maxDoc) {
189+
do {
190+
leaf = leaves.get(readerIndex++);
191+
docBase = leaf.docBase;
192+
maxDoc = leaf.reader().maxDoc();
193+
} while (scoreDoc.doc >= docBase + maxDoc);
194+
combinedDocValues = new CombinedDocValues(leaf.reader());
195+
}
196+
final int segmentDocID = scoreDoc.doc - docBase;
197+
final int index = scoreDoc.shardIndex;
198+
parallelArray.leafReaderContexts[index] = leaf;
199+
parallelArray.seqNo[index] = combinedDocValues.docSeqNo(segmentDocID);
200+
parallelArray.primaryTerm[index] = combinedDocValues.docPrimaryTerm(segmentDocID);
201+
parallelArray.version[index] = combinedDocValues.docVersion(segmentDocID);
202+
parallelArray.isTombStone[index] = combinedDocValues.isTombstone(segmentDocID);
203+
parallelArray.hasRecoverySource[index] = combinedDocValues.hasRecoverySource(segmentDocID);
204+
}
205+
// now sort back based on the shardIndex. we use this to store the previous index
206+
ArrayUtil.introSort(scoreDocs, Comparator.comparingInt(i -> i.shardIndex));
170207
}
171-
return DocIdSetIterator.NO_MORE_DOCS;
172208
}
173209

174210
private TopDocs searchOperations(ScoreDoc after) throws IOException {
@@ -180,31 +216,30 @@ private TopDocs searchOperations(ScoreDoc after) throws IOException {
180216
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNoThenByTerm);
181217
}
182218

183-
private Translog.Operation readDocAsOp(int docID) throws IOException {
184-
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
185-
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
186-
final int segmentDocID = docID - leaf.docBase;
187-
final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID);
219+
private Translog.Operation readDocAsOp(int docIndex) throws IOException {
220+
final LeafReaderContext leaf = parallelArray.leafReaderContexts[docIndex];
221+
final int segmentDocID = scoreDocs[docIndex].doc - leaf.docBase;
222+
final long primaryTerm = parallelArray.primaryTerm[docIndex];
188223
// We don't have to read the nested child documents - those docs don't have primary terms.
189224
if (primaryTerm == -1) {
190225
skippedOperations++;
191226
return null;
192227
}
193-
final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID);
228+
final long seqNo = parallelArray.seqNo[docIndex];
194229
// Only pick the first seen seq#
195230
if (seqNo == lastSeenSeqNo) {
196231
skippedOperations++;
197232
return null;
198233
}
199-
final long version = docValues[leaf.ord].docVersion(segmentDocID);
200-
final String sourceField = docValues[leaf.ord].hasRecoverySource(segmentDocID) ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
234+
final long version = parallelArray.version[docIndex];
235+
final String sourceField = parallelArray.hasRecoverySource[docIndex] ? SourceFieldMapper.RECOVERY_SOURCE_NAME :
201236
SourceFieldMapper.NAME;
202237
final FieldsVisitor fields = new FieldsVisitor(true, sourceField);
203-
indexSearcher.doc(docID, fields);
238+
leaf.reader().document(segmentDocID, fields);
204239
fields.postProcess(mapperService);
205240

206241
final Translog.Operation op;
207-
final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID);
242+
final boolean isTombstone = parallelArray.isTombStone[docIndex];
208243
if (isTombstone && fields.uid() == null) {
209244
op = new Translog.NoOp(seqNo, primaryTerm, fields.source().utf8ToString());
210245
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
@@ -237,16 +272,32 @@ private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) th
237272
return ndv.longValue() == 1;
238273
}
239274

275+
private static final class ParallelArray {
276+
final LeafReaderContext[] leafReaderContexts;
277+
final long[] version;
278+
final long[] seqNo;
279+
final long[] primaryTerm;
280+
final boolean[] isTombStone;
281+
final boolean[] hasRecoverySource;
282+
283+
ParallelArray(int size) {
284+
version = new long[size];
285+
seqNo = new long[size];
286+
primaryTerm = new long[size];
287+
isTombStone = new boolean[size];
288+
hasRecoverySource = new boolean[size];
289+
leafReaderContexts = new LeafReaderContext[size];
290+
}
291+
}
292+
240293
private static final class CombinedDocValues {
241-
private final LeafReader leafReader;
242-
private NumericDocValues versionDV;
243-
private NumericDocValues seqNoDV;
244-
private NumericDocValues primaryTermDV;
245-
private NumericDocValues tombstoneDV;
246-
private NumericDocValues recoverySource;
294+
private final NumericDocValues versionDV;
295+
private final NumericDocValues seqNoDV;
296+
private final NumericDocValues primaryTermDV;
297+
private final NumericDocValues tombstoneDV;
298+
private final NumericDocValues recoverySource;
247299

248300
CombinedDocValues(LeafReader leafReader) throws IOException {
249-
this.leafReader = leafReader;
250301
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
251302
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
252303
this.primaryTermDV = Objects.requireNonNull(
@@ -256,19 +307,15 @@ private static final class CombinedDocValues {
256307
}
257308

258309
long docVersion(int segmentDocId) throws IOException {
259-
if (versionDV.docID() > segmentDocId) {
260-
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
261-
}
310+
assert versionDV.docID() < segmentDocId;
262311
if (versionDV.advanceExact(segmentDocId) == false) {
263312
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
264313
}
265314
return versionDV.longValue();
266315
}
267316

268317
long docSeqNo(int segmentDocId) throws IOException {
269-
if (seqNoDV.docID() > segmentDocId) {
270-
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
271-
}
318+
assert seqNoDV.docID() < segmentDocId;
272319
if (seqNoDV.advanceExact(segmentDocId) == false) {
273320
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
274321
}
@@ -279,9 +326,7 @@ long docPrimaryTerm(int segmentDocId) throws IOException {
279326
if (primaryTermDV == null) {
280327
return -1L;
281328
}
282-
if (primaryTermDV.docID() > segmentDocId) {
283-
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
284-
}
329+
assert primaryTermDV.docID() < segmentDocId;
285330
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
286331
if (primaryTermDV.advanceExact(segmentDocId) == false) {
287332
return -1;
@@ -293,19 +338,15 @@ boolean isTombstone(int segmentDocId) throws IOException {
293338
if (tombstoneDV == null) {
294339
return false;
295340
}
296-
if (tombstoneDV.docID() > segmentDocId) {
297-
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
298-
}
341+
assert tombstoneDV.docID() < segmentDocId;
299342
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
300343
}
301344

302345
boolean hasRecoverySource(int segmentDocId) throws IOException {
303346
if (recoverySource == null) {
304347
return false;
305348
}
306-
if (recoverySource.docID() > segmentDocId) {
307-
recoverySource = leafReader.getNumericDocValues(SourceFieldMapper.RECOVERY_SOURCE_NAME);
308-
}
349+
assert recoverySource.docID() < segmentDocId;
309350
return recoverySource.advanceExact(segmentDocId);
310351
}
311352
}

0 commit comments

Comments
 (0)