Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static com.mongodb.internal.operation.CommandBatchCursorHelper.getMoreCommandDocument;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.logCommandCursorResult;
import static com.mongodb.internal.operation.CommandBatchCursorHelper.translateCommandException;
import static com.mongodb.internal.operation.CommandCursorResult.withEmptyResults;
import static java.util.Collections.emptyList;

class AsyncCommandBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
Expand Down Expand Up @@ -117,6 +118,7 @@ public void next(final SingleResultCallback<List<T>> callback) {
}

if (serverCursorIsNull || !batchResults.isEmpty()) {
commandCursorResult = withEmptyResults(commandCursorResult);
funcCallback.onResult(batchResults, null);
} else {
getMore(localServerCursor, funcCallback);
Expand Down Expand Up @@ -206,6 +208,7 @@ private void getMoreLoop(final AsyncConnection connection, final ServerCursor se
resourceManager.setServerCursor(nextServerCursor);
List<T> nextBatch = commandCursorResult.getResults();
if (nextServerCursor == null || !nextBatch.isEmpty()) {
commandCursorResult = withEmptyResults(commandCursorResult);
callback.onResult(nextBatch, null);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
boolean hasNext();

/**
* Returns the next batch of results. A tailable cursor will block until another batch exists.
* Returns the next batch of results as a mutable list. Modifications to the list will not affect the cursor state.
* A tailable cursor will block until another batch exists.
*
* @return the next batch of results
* @throws java.util.NoSuchElementException if no next batch exists
Expand Down Expand Up @@ -89,7 +90,8 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
int getBatchSize();

/**
* A special {@code next()} case that returns the next batch if available or null.
* A special {@code next()} case that returns the next batch as a mutable list if available or null.
* Modifications to the list will not affect the cursor state.
*
* <p>Tailable cursors are an example where this is useful. A call to {@code tryNext()} may return null, but in the future calling
* {@code tryNext()} would return a new batch if a document had been added to the capped collection.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ private List<T> doNext() {

List<T> retVal = nextBatch;
nextBatch = null;
commandCursorResult = CommandCursorResult.withEmptyResults(commandCursorResult);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.bson.BsonDocument;
import org.bson.BsonTimestamp;

import java.util.Collections;
import java.util.List;

import static com.mongodb.assertions.Assertions.isTrue;
Expand Down Expand Up @@ -60,6 +61,31 @@ public CommandCursorResult(
this.postBatchResumeToken = cursorDocument.getDocument(POST_BATCH_RESUME_TOKEN, null);
}

private CommandCursorResult(
final ServerAddress serverAddress,
final List<T> results,
final MongoNamespace namespace,
final long cursorId,
@Nullable final BsonTimestamp operationTime,
@Nullable final BsonDocument postBatchResumeToken) {
this.serverAddress = serverAddress;
this.results = results;
this.namespace = namespace;
this.cursorId = cursorId;
this.operationTime = operationTime;
this.postBatchResumeToken = postBatchResumeToken;
}

public static <T> CommandCursorResult<T> withEmptyResults(final CommandCursorResult<T> commandCursorResult) {
return new CommandCursorResult<>(
commandCursorResult.getServerAddress(),
Collections.emptyList(),
commandCursorResult.getNamespace(),
commandCursorResult.getCursorId(),
commandCursorResult.getOperationTime(),
commandCursorResult.getPostBatchResumeToken());
}

/**
* Gets the namespace.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public ServerAddress getServerAddress() {

private T getNextInBatch() {
T nextInBatch = curBatch.get(curPos);
curBatch.set(curPos, null);
if (curPos < curBatch.size() - 1) {
curPos++;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public ServerAddress getServerAddress() {

private T getNextInBatch() {
RawBsonDocument nextInBatch = curBatch.get(curPos);
curBatch.set(curPos, null);
resumeToken = nextInBatch.getDocument("_id");
if (curPos < curBatch.size() - 1) {
curPos++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ class GridFSFindIterableSpecification extends Specification {
, null),
]
def cursor = {
def batchToReturn = cannedResults.collect();
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,13 @@ class AggregateIterableSpecification extends Specification {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
def batchToReturn = cannedResults.collect()
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down Expand Up @@ -591,12 +592,13 @@ class AggregateIterableSpecification extends Specification {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
def batchToReturn = cannedResults.collect()
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ class ChangeStreamIterableSpecification extends Specification {
def cannedResults = ['{_id: {_data: 1}}', '{_id: {_data: 2}}', '{_id: {_data: 3}}'].collect {
RawBsonDocument.parse(it)
}
def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults),
cursor(cannedResults)])
def executor = new TestOperationExecutor([cursor(cannedResults.collect()), cursor(cannedResults.collect()),
cursor(cannedResults.collect()), cursor(cannedResults.collect())])
def mongoIterable = new ChangeStreamIterableImpl(null, namespace, codecRegistry, readPreference, readConcern, executor, [],
Document, ChangeStreamLevel.COLLECTION, true, TIMEOUT_SETTINGS)

Expand Down Expand Up @@ -208,8 +208,9 @@ class ChangeStreamIterableSpecification extends Specification {
given:
def count = 0
def cannedResults = ['{_id: { _data: 1}}', '{_id: {_data: 2}}', '{_id: {_data: 3}}'].collect { RawBsonDocument.parse(it) }
def executor = new TestOperationExecutor([cursor(cannedResults), cursor(cannedResults), cursor(cannedResults),
cursor(cannedResults)])
def executor = new TestOperationExecutor([cursor(cannedResults.collect()), cursor(cannedResults.collect()),
cursor(cannedResults.collect()), cursor(cannedResults.collect()),
cursor(cannedResults.collect())])
def mongoIterable = new ChangeStreamIterableImpl(null, namespace, codecRegistry, readPreference, readConcern, executor, [],
Document, ChangeStreamLevel.COLLECTION, true, TIMEOUT_SETTINGS).withDocumentClass(RawBsonDocument)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ class DistinctIterableSpecification extends Specification {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
def batchToReturn = cannedResults.collect()
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,12 @@ class FindIterableSpecification extends Specification {
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
Stub(BatchCursor) {
def batchToReturn = cannedResults.collect()
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,13 @@ class ListCollectionsIterableSpecification extends Specification {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
def batchToReturn = cannedResults.collect()
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ class ListDatabasesIterableSpecification extends Specification {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
def batchToReturn = cannedResults.collect()
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,12 @@ class ListIndexesIterableSpecification extends Specification {
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
Stub(BatchCursor) {
def batchToReturn = cannedResults.collect()
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,13 @@ class MapReduceIterableSpecification extends Specification {
given:
def cannedResults = [new Document('_id', 1), new Document('_id', 2), new Document('_id', 3)]
def cursor = {
def batchToReturn = cannedResults.collect()
Stub(BatchCursor) {
def count = 0
def results
def getResult = {
count++
results = count == 1 ? cannedResults : null
results = count == 1 ? batchToReturn : null
results
}
next() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,42 +80,46 @@ class MongoBatchCursorAdapterSpecification extends Specification {

def 'should get next from batch cursor'() {
given:
def firstBatch = [new Document('x', 1), new Document('x', 1)]
def secondBatch = [new Document('x', 2)]
def firstBatchFromBatchCursor = [new Document('x', 1), new Document('x', 1)]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [new Document('x', 2)]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(BatchCursor)

batchCursor.hasNext() >>> [true, true, true, true, false]
batchCursor.next() >>> [firstBatch, secondBatch]
batchCursor.next() >>> [firstBatchFromBatchCursor, secondBatchFromBatchCursor]

def cursor = new MongoBatchCursorAdapter(batchCursor)

expect:
cursor.hasNext()
cursor.next() == firstBatch[0]
cursor.next() == expectedFirstBatch[0]
cursor.hasNext()
cursor.next() == firstBatch[1]
cursor.next() == expectedFirstBatch[1]
cursor.hasNext()
cursor.next() == secondBatch[0]
cursor.next() == expectedSecondBatch[0]
!cursor.hasNext()
}

def 'should try next from batch cursor'() {
given:
def firstBatch = [new Document('x', 1), new Document('x', 1)]
def secondBatch = [new Document('x', 2)]
def firstBatchFromBatchCursor = [new Document('x', 1), new Document('x', 1)]
def expectedFirstBatch = firstBatchFromBatchCursor.collect()
def secondBatchFromBatchCursor = [new Document('x', 2)]
def expectedSecondBatch = secondBatchFromBatchCursor.collect()

def batchCursor = Stub(BatchCursor)

batchCursor.tryNext() >>> [firstBatch, null, secondBatch, null]
batchCursor.tryNext() >>> [firstBatchFromBatchCursor, null, secondBatchFromBatchCursor, null]

def cursor = new MongoBatchCursorAdapter(batchCursor)

expect:
cursor.tryNext() == firstBatch[0]
cursor.tryNext() == firstBatch[1]
cursor.tryNext() == expectedFirstBatch[0]
cursor.tryNext() == expectedFirstBatch[1]
cursor.tryNext() == null
cursor.tryNext() == secondBatch[0]
cursor.tryNext() == expectedSecondBatch[0]
cursor.tryNext() == null
}

Expand Down
Loading