10
10
import org .apache .logging .log4j .Logger ;
11
11
import org .apache .lucene .document .SortedSetDocValuesField ;
12
12
import org .apache .lucene .index .LeafReaderContext ;
13
+ import org .apache .lucene .internal .hppc .IntArrayList ;
13
14
import org .apache .lucene .search .MatchAllDocsQuery ;
14
15
import org .apache .lucene .search .MatchNoDocsQuery ;
15
16
import org .apache .lucene .search .Query ;
80
81
class DownsampleShardIndexer {
81
82
82
83
private static final Logger logger = LogManager .getLogger (DownsampleShardIndexer .class );
84
+ private static final int DOCID_BUFFER_SIZE = 8096 ;
83
85
public static final int DOWNSAMPLE_BULK_ACTIONS = 10000 ;
84
86
public static final ByteSizeValue DOWNSAMPLE_BULK_SIZE = ByteSizeValue .of (1 , ByteSizeUnit .MB );
85
87
public static final ByteSizeValue DOWNSAMPLE_MAX_BYTES_IN_FLIGHT = ByteSizeValue .of (50 , ByteSizeUnit .MB );
@@ -338,6 +340,7 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
338
340
private class TimeSeriesBucketCollector extends BucketCollector {
339
341
private final BulkProcessor2 bulkProcessor ;
340
342
private final DownsampleBucketBuilder downsampleBucketBuilder ;
343
+ private final List <LeafDownsampleCollector > leafBucketCollectors = new ArrayList <>();
341
344
private long docsProcessed ;
342
345
private long bucketsCreated ;
343
346
long lastTimestamp = Long .MAX_VALUE ;
@@ -365,83 +368,138 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
365
368
formattedDocValues [i ] = fieldValueFetchers .get (i ).getLeaf (ctx );
366
369
}
367
370
368
- return new LeafBucketCollector () {
369
- @ Override
370
- public void collect (int docId , long owningBucketOrd ) throws IOException {
371
- task .addNumReceived (1 );
372
- final BytesRef tsidHash = aggCtx .getTsidHash ();
373
- assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
374
- final int tsidHashOrd = aggCtx .getTsidHashOrd ();
375
- final long timestamp = timestampField .resolution ().roundDownToMillis (aggCtx .getTimestamp ());
376
-
377
- boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder .tsidOrd ();
378
- if (tsidChanged || timestamp < lastHistoTimestamp ) {
379
- lastHistoTimestamp = Math .max (
380
- rounding .round (timestamp ),
381
- searchExecutionContext .getIndexSettings ().getTimestampBounds ().startTime ()
382
- );
383
- }
384
- task .setLastSourceTimestamp (timestamp );
385
- task .setLastTargetTimestamp (lastHistoTimestamp );
386
-
387
- if (logger .isTraceEnabled ()) {
388
- logger .trace (
389
- "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]" ,
390
- docId ,
391
- DocValueFormat .TIME_SERIES_ID .format (tsidHash ),
392
- timestampFormat .format (timestamp ),
393
- timestampFormat .format (lastHistoTimestamp )
394
- );
395
- }
371
+ var leafBucketCollector = new LeafDownsampleCollector (aggCtx , docCountProvider , fieldProducers , formattedDocValues );
372
+ leafBucketCollectors .add (leafBucketCollector );
373
+ return leafBucketCollector ;
374
+ }
375
+
376
+ void bulkCollection () throws IOException {
377
+ // The leaf bucket collectors with newer timestamp go first, to correctly capture the last value for counters and labels.
378
+ leafBucketCollectors .sort ((o1 , o2 ) -> -Long .compare (o1 .firstTimeStampForBulkCollection , o2 .firstTimeStampForBulkCollection ));
379
+ for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors ) {
380
+ leafBucketCollector .leafBulkCollection ();
381
+ }
382
+ }
396
383
397
- /*
398
- * Sanity checks to ensure that we receive documents in the correct order
399
- * - _tsid must be sorted in ascending order
400
- * - @timestamp must be sorted in descending order within the same _tsid
401
- */
402
- BytesRef lastTsid = downsampleBucketBuilder .tsid ();
403
- assert lastTsid == null || lastTsid .compareTo (tsidHash ) <= 0
404
- : "_tsid is not sorted in ascending order: ["
405
- + DocValueFormat .TIME_SERIES_ID .format (lastTsid )
406
- + "] -> ["
407
- + DocValueFormat .TIME_SERIES_ID .format (tsidHash )
408
- + "]" ;
409
- assert tsidHash .equals (lastTsid ) == false || lastTimestamp >= timestamp
410
- : "@timestamp is not sorted in descending order: ["
411
- + timestampFormat .format (lastTimestamp )
412
- + "] -> ["
413
- + timestampFormat .format (timestamp )
414
- + "]" ;
415
- lastTimestamp = timestamp ;
416
-
417
- if (tsidChanged || downsampleBucketBuilder .timestamp () != lastHistoTimestamp ) {
418
- // Flush downsample doc if not empty
419
- if (downsampleBucketBuilder .isEmpty () == false ) {
420
- XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
421
- indexBucket (doc );
422
- }
423
-
424
- // Create new downsample bucket
425
- if (tsidChanged ) {
426
- downsampleBucketBuilder .resetTsid (tsidHash , tsidHashOrd , lastHistoTimestamp );
427
- } else {
428
- downsampleBucketBuilder .resetTimestamp (lastHistoTimestamp );
429
- }
430
- bucketsCreated ++;
384
+ class LeafDownsampleCollector extends LeafBucketCollector {
385
+
386
+ final AggregationExecutionContext aggCtx ;
387
+ final DocCountProvider docCountProvider ;
388
+ final FormattedDocValues [] formattedDocValues ;
389
+ final AbstractDownsampleFieldProducer [] fieldProducers ;
390
+
391
+ // Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
392
+ long firstTimeStampForBulkCollection ;
393
+ final IntArrayList docIdBuffer = new IntArrayList (DOCID_BUFFER_SIZE );
394
+ final long timestampBoundStartTime = searchExecutionContext .getIndexSettings ().getTimestampBounds ().startTime ();
395
+
396
+ LeafDownsampleCollector (
397
+ AggregationExecutionContext aggCtx ,
398
+ DocCountProvider docCountProvider ,
399
+ AbstractDownsampleFieldProducer [] fieldProducers ,
400
+ FormattedDocValues [] formattedDocValues
401
+ ) {
402
+ this .aggCtx = aggCtx ;
403
+ this .docCountProvider = docCountProvider ;
404
+ this .fieldProducers = fieldProducers ;
405
+ this .formattedDocValues = formattedDocValues ;
406
+ }
407
+
408
+ @ Override
409
+ public void collect (int docId , long owningBucketOrd ) throws IOException {
410
+ task .addNumReceived (1 );
411
+ final BytesRef tsidHash = aggCtx .getTsidHash ();
412
+ assert tsidHash != null : "Document without [" + TimeSeriesIdFieldMapper .NAME + "] field was found." ;
413
+ final int tsidHashOrd = aggCtx .getTsidHashOrd ();
414
+ final long timestamp = timestampField .resolution ().roundDownToMillis (aggCtx .getTimestamp ());
415
+
416
+ boolean tsidChanged = tsidHashOrd != downsampleBucketBuilder .tsidOrd ();
417
+ if (tsidChanged || timestamp < lastHistoTimestamp ) {
418
+ lastHistoTimestamp = Math .max (rounding .round (timestamp ), timestampBoundStartTime );
419
+ }
420
+ task .setLastSourceTimestamp (timestamp );
421
+ task .setLastTargetTimestamp (lastHistoTimestamp );
422
+
423
+ if (logger .isTraceEnabled ()) {
424
+ logger .trace (
425
+ "Doc: [{}] - _tsid: [{}], @timestamp: [{}] -> downsample bucket ts: [{}]" ,
426
+ docId ,
427
+ DocValueFormat .TIME_SERIES_ID .format (tsidHash ),
428
+ timestampFormat .format (timestamp ),
429
+ timestampFormat .format (lastHistoTimestamp )
430
+ );
431
+ }
432
+
433
+ /*
434
+ * Sanity checks to ensure that we receive documents in the correct order
435
+ * - _tsid must be sorted in ascending order
436
+ * - @timestamp must be sorted in descending order within the same _tsid
437
+ */
438
+ BytesRef lastTsid = downsampleBucketBuilder .tsid ();
439
+ assert lastTsid == null || lastTsid .compareTo (tsidHash ) <= 0
440
+ : "_tsid is not sorted in ascending order: ["
441
+ + DocValueFormat .TIME_SERIES_ID .format (lastTsid )
442
+ + "] -> ["
443
+ + DocValueFormat .TIME_SERIES_ID .format (tsidHash )
444
+ + "]" ;
445
+ assert tsidHash .equals (lastTsid ) == false || lastTimestamp >= timestamp
446
+ : "@timestamp is not sorted in descending order: ["
447
+ + timestampFormat .format (lastTimestamp )
448
+ + "] -> ["
449
+ + timestampFormat .format (timestamp )
450
+ + "]" ;
451
+ lastTimestamp = timestamp ;
452
+
453
+ if (tsidChanged || downsampleBucketBuilder .timestamp () != lastHistoTimestamp ) {
454
+ bulkCollection ();
455
+ // Flush downsample doc if not empty
456
+ if (downsampleBucketBuilder .isEmpty () == false ) {
457
+ XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
458
+ indexBucket (doc );
431
459
}
432
460
433
- final int docCount = docCountProvider .getDocCount (docId );
434
- downsampleBucketBuilder .collectDocCount (docCount );
435
- // Iterate over all field values and collect the doc_values for this docId
436
- for (int i = 0 ; i < fieldProducers .length ; i ++) {
437
- AbstractDownsampleFieldProducer fieldProducer = fieldProducers [i ];
438
- FormattedDocValues docValues = formattedDocValues [i ];
439
- fieldProducer .collect (docValues , docId );
461
+ // Create new downsample bucket
462
+ if (tsidChanged ) {
463
+ downsampleBucketBuilder .resetTsid (tsidHash , tsidHashOrd , lastHistoTimestamp );
464
+ } else {
465
+ downsampleBucketBuilder .resetTimestamp (lastHistoTimestamp );
440
466
}
441
- docsProcessed ++;
442
- task .setDocsProcessed (docsProcessed );
467
+ bucketsCreated ++;
443
468
}
444
- };
469
+
470
+ if (docIdBuffer .isEmpty ()) {
471
+ firstTimeStampForBulkCollection = aggCtx .getTimestamp ();
472
+ }
473
+ // buffer.add() always delegates to system.arraycopy() and checks buffer size for resizing purposes:
474
+ docIdBuffer .buffer [docIdBuffer .elementsCount ++] = docId ;
475
+ if (docIdBuffer .size () == DOCID_BUFFER_SIZE ) {
476
+ bulkCollection ();
477
+ }
478
+ }
479
+
480
+ void leafBulkCollection () throws IOException {
481
+ if (docIdBuffer .isEmpty ()) {
482
+ return ;
483
+ }
484
+
485
+ if (logger .isDebugEnabled ()) {
486
+ logger .debug ("buffered {} docids" , docIdBuffer .size ());
487
+ }
488
+
489
+ downsampleBucketBuilder .collectDocCount (docIdBuffer , docCountProvider );
490
+ // Iterate over all field values and collect the doc_values for this docId
491
+ for (int i = 0 ; i < fieldProducers .length ; i ++) {
492
+ AbstractDownsampleFieldProducer fieldProducer = fieldProducers [i ];
493
+ FormattedDocValues docValues = formattedDocValues [i ];
494
+ fieldProducer .collect (docValues , docIdBuffer );
495
+ }
496
+
497
+ docsProcessed += docIdBuffer .size ();
498
+ task .setDocsProcessed (docsProcessed );
499
+
500
+ // buffer.clean() also overwrites all slots with zeros
501
+ docIdBuffer .elementsCount = 0 ;
502
+ }
445
503
}
446
504
447
505
private void indexBucket (XContentBuilder doc ) {
@@ -464,6 +522,7 @@ public void preCollection() {
464
522
@ Override
465
523
public void postCollection () throws IOException {
466
524
// Flush downsample doc if not empty
525
+ bulkCollection ();
467
526
if (downsampleBucketBuilder .isEmpty () == false ) {
468
527
XContentBuilder doc = downsampleBucketBuilder .buildDownsampleDocument ();
469
528
indexBucket (doc );
@@ -545,8 +604,15 @@ public void resetTimestamp(long timestamp) {
545
604
}
546
605
}
547
606
548
- public void collectDocCount (int docCount ) {
549
- this .docCount += docCount ;
607
+ public void collectDocCount (IntArrayList buffer , DocCountProvider docCountProvider ) throws IOException {
608
+ if (docCountProvider .alwaysOne ()) {
609
+ this .docCount += buffer .size ();
610
+ } else {
611
+ for (int i = 0 ; i < buffer .size (); i ++) {
612
+ int docId = buffer .get (i );
613
+ this .docCount += docCountProvider .getDocCount (docId );
614
+ }
615
+ }
550
616
}
551
617
552
618
public XContentBuilder buildDownsampleDocument () throws IOException {
0 commit comments