@@ -374,7 +374,9 @@ public LeafBucketCollector getLeafCollector(final AggregationExecutionContext ag
374
374
}
375
375
376
376
void bulkCollection () throws IOException {
377
- for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors .reversed ()) {
377
+ // The leaf bucked collectors newer timestamp go first, other we capture the incorrect last value for counters and labels.
378
+ leafBucketCollectors .sort ((o1 , o2 ) -> -Long .compare (o1 .firstTimeStampForBulkCollection , o2 .firstTimeStampForBulkCollection ));
379
+ for (LeafDownsampleCollector leafBucketCollector : leafBucketCollectors ) {
378
380
leafBucketCollector .leafBulkCollection ();
379
381
}
380
382
}
@@ -386,6 +388,8 @@ class LeafDownsampleCollector extends LeafBucketCollector {
386
388
final FormattedDocValues [] formattedDocValues ;
387
389
final AbstractDownsampleFieldProducer [] fieldProducers ;
388
390
391
+ // Capture the first timestamp in order to determine which leaf collector's leafBulkCollection() is invoked first.
392
+ long firstTimeStampForBulkCollection ;
389
393
final IntArrayList buffer = new IntArrayList (DOCID_BUFFER_SIZE );
390
394
final long timestampBoundStartTime = searchExecutionContext .getIndexSettings ().getTimestampBounds ().startTime ();
391
395
@@ -462,6 +466,11 @@ public void collect(int docId, long owningBucketOrd) throws IOException {
462
466
}
463
467
bucketsCreated ++;
464
468
}
469
+
470
+ if (buffer .isEmpty ()) {
471
+ firstTimeStampForBulkCollection = aggCtx .getTimestamp ();
472
+ }
473
+ // buffer.add() always delegates to system.arraycopy() and checks buffer size for resizing purposes:
465
474
buffer .buffer [buffer .elementsCount ++] = docId ;
466
475
if (buffer .size () == DOCID_BUFFER_SIZE ) {
467
476
bulkCollection ();
0 commit comments