@@ -335,7 +335,6 @@ ACTOR static Future<Void> toDocInfo(PlanCheckpoint* checkpoint,
335335inputLock->release ();
336336Void _ = wait (outputLock->take ());
337337lastKey = Key (kv.key , kv.arena ());
338- // fprintf(stderr, "lastkey: %s\n", printable(lastKey).c_str());
339338Standalone<StringRef> last (DataKey::decode_item_rev (kv.key , 0 ), kv.arena ());
340339Reference<ScanReturnedContext> output (new ScanReturnedContext (base->getSubContext (last), scanID, lastKey));
341340dis.send (output);
@@ -373,9 +372,6 @@ ACTOR static Future<bool> simpleWouldBeLast(Reference<ScanReturnedContext> doc,
373372break ;
374373}
375374}
376- // fprintf(stderr, "SWBL last: %s\n", printable(last).c_str());
377- // fprintf(stderr, "SWBL scankey: %s\n", printable(doc->scanKey()).c_str());
378- // fprintf(stderr, "SWBL upperBound: %s\n", printable(indexUpperBound).c_str());
379375if (doc->scanKey ().startsWith (last))
380376return true ;
381377}
@@ -416,9 +412,6 @@ ACTOR static Future<bool> compoundWouldBeLast(Reference<ScanReturnedContext> doc
416412break ;
417413}
418414}
419- // fprintf(stderr, "CWBL last: %s\n", printable(last).c_str());
420- // fprintf(stderr, "CWBL scankey: %s\n", printable(doc->scanKey()).c_str());
421- // fprintf(stderr, "CWBL upperBound: %s\n", printable(indexUpperBound).c_str());
422415if (doc->scanKey ().startsWith (last))
423416return true ;
424417}
@@ -558,7 +551,6 @@ ACTOR static Future<Void> doPKScan(PlanCheckpoint* checkpoint,
558551lastKey = Key (kv.key , kv.arena ());
559552}
560553} catch (Error& e) {
561- // fprintf(stderr, "doPKScan: %s %d\n", e.what(), checkpoint->splitBoundWanted());
562554if (e.code () == error_code_actor_cancelled) {
563555if (checkpoint->splitBoundWanted ()) {
564556DataKey splitKey = DataKey::decode_bytes (lastKey);
@@ -577,8 +569,8 @@ FutureStream<Reference<ScanReturnedContext>> PrimaryKeyLookupPlan::execute(PlanC
577569Reference<CollectionContext> bcx = cx->bindCollectionContext (tr);
578570if (begin.present () && end.present () && begin.get () == end.get ()) {
579571PromiseStream<Reference<ScanReturnedContext>> p;
580- checkpoint-> addOperation ( doSinglePKLookup (checkpoint, p, bcx, begin. get (), scanID),
581- p); // ??? Can we skip this overhead?
572+ // ??? Can we skip this overhead?
573+ checkpoint-> addOperation ( doSinglePKLookup (checkpoint, p, bcx, begin. get (), scanID), p);
582574return p.getFuture ();
583575} else {
584576PromiseStream<Reference<ScanReturnedContext>> p;
@@ -590,7 +582,6 @@ FutureStream<Reference<ScanReturnedContext>> PrimaryKeyLookupPlan::execute(PlanC
590582Standalone<StringRef> endKey = std::max<Standalone<StringRef>>(
591583 beginKey, std::min (end.present () ? strinc (end.get ().encode_key_part ()) : LiteralStringRef (" \xff " ),
592584 checkpoint->getBounds (scanID).end ));
593- // fprintf(stderr, "PK scan executing from %s to %s\n", printable(beginKey).c_str(), printable(endKey).c_str());
594585
595586GenFutureStream<KeyValue> kvs = bcx->cx ->getDescendants (beginKey, endKey, descendantFlowControlLock);
596587checkpoint->addOperation (doPKScan (checkpoint, bcx, scanID, kvs, p, descendantFlowControlLock),
@@ -704,17 +695,14 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
704695try {
705696state uint64_t metadataVersion = wait (cx->bindCollectionContext (dtr)->getMetadataVersion ());
706697loop {
707- // printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults,
708- // printable(innerCheckpoint->getBounds(0).begin).c_str(),
709- // printable(innerCheckpoint->getBounds(0).end).c_str());
710698state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute (innerCheckpoint.getPtr (), dtr);
711699state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock ();
712700state bool first = true ;
713701state Future<Void> timeout = delay (3.0 );
714702
715703loop choose {
716- when (state Reference<ScanReturnedContext> doc =
717- waitNext (docs)) { // throws end_of_stream when totally finished
704+ when (state Reference<ScanReturnedContext> doc = waitNext (docs)) {
705+ // throws end_of_stream when totally finished
718706Void _ = wait (outerLock->take ());
719707innerLock->release ();
720708output.send (doc);
@@ -723,7 +711,6 @@ ACTOR static Future<Void> doNonIsolatedRO(PlanCheckpoint* outerCheckpoint,
723711timeout = delay (DOCLAYER_KNOBS->NONISOLATED_INTERNAL_TIMEOUT );
724712first = false ;
725713}
726- // if (oCount == 3) timeout = delay(0);
727714}
728715when (Void _ = wait (timeout)) { break ; }
729716}
@@ -785,9 +772,10 @@ ACTOR static Future<Void> doNonIsolatedRW(PlanCheckpoint* outerCheckpoint,
785772loop {
786773if (bufferedDocs.size () + committingDocs.size () >=
787774 DOCLAYER_KNOBS->NONISOLATED_RW_INTERNAL_BUFFER_MAX )
788- timeout = delay (0 ); // We do this instead of breaking so that when stopAndCheckpoint() gets
789- // called below, the actor for the plan immediately inside us is never
790- // on the call stack, so gets its actor_cancelled delivered immediately.
775+ // We do this instead of breaking so that when stopAndCheckpoint() gets
776+ // called below, the actor for the plan immediately inside us is never
777+ // on the call stack, so gets its actor_cancelled delivered immediately.
778+ timeout = delay (0 );
791779choose {
792780when (state Reference<ScanReturnedContext> doc =
793781 waitNext (docs)) { // throws end_of_stream when totally finished
@@ -1045,10 +1033,10 @@ ACTOR static Future<Void> doFlushChanges(PlanCheckpoint* checkpoint,
10451033try {
10461034choose {
10471035when (Reference<ScanReturnedContext> nextInput = waitNext (input)) {
1036+ // FIXME: this will be unsafe with unique indexes. Something has to happen here that doesn't
1037+ // kill performance.
10481038futures.push_back (std::pair<Reference<ScanReturnedContext>, Future<Void>>(
1049- nextInput,
1050- nextInput->commitChanges ())); // FIXME: this will be unsafe with unique indexes. Something
1051- // has to happen here that doesn't kill performance.
1039+ nextInput, nextInput->commitChanges ()));
10521040}
10531041when (Void _ = wait (futures.empty () ? Never () : futures.front ().second )) {
10541042output.send (futures.front ().first );
@@ -1127,7 +1115,6 @@ ACTOR static Future<Void> doUpdate(PlanCheckpoint* checkpoint,
11271115
11281116throw end_of_stream ();
11291117} catch (Error& e) {
1130- // printf("doUpdate: %s\n", e.what());
11311118if (e.code () == error_code_actor_cancelled) {
11321119if (checkpoint->splitBoundWanted ()) {
11331120for (int i = futures.size () - 1 ; i >= 0 ; i--)
@@ -1172,18 +1159,15 @@ ACTOR static Future<Void> findAndModify(PlanCheckpoint* outerCheckpoint,
11721159try {
11731160state uint64_t metadataVersion = wait (cx->bindCollectionContext (dtr)->getMetadataVersion ());
11741161loop {
1175- // printf("Trying nonIsolatedRO with %d outputs and checkpoint '%s'-'%s'\n", nResults,
1176- // printable(innerCheckpoint->getBounds(0).begin).c_str(),
1177- // printable(innerCheckpoint->getBounds(0).end).c_str());
11781162state FutureStream<Reference<ScanReturnedContext>> docs = subPlan->execute (innerCheckpoint.getPtr (), dtr);
11791163state FlowLock* innerLock = innerCheckpoint->getDocumentFinishedLock ();
11801164state Future<Void> timeout = delay (1.0 );
11811165state bool done = false ;
11821166
11831167try {
11841168loop choose {
1185- when (state Reference<ScanReturnedContext> doc =
1186- waitNext (docs)) { // throws end_of_stream when totally finished
1169+ when (state Reference<ScanReturnedContext> doc = waitNext (docs)) {
1170+ // throws end_of_stream when totally finished
11871171firstDoc = doc;
11881172innerLock->release ();
11891173done = true ;
@@ -1441,10 +1425,9 @@ ACTOR static Future<Void> doInsert(PlanCheckpoint* checkpoint,
14411425 Reference<MetadataManager> mm,
14421426 Namespace ns,
14431427 PromiseStream<Reference<ScanReturnedContext>> output) {
1444- // state int64_t& inserted = checkpoint->getIntState(0); <- This is broken for now.
14451428state Deque<Future<Reference<IReadWriteContext>>> f;
14461429state FlowLock* flowControlLock = checkpoint->getDocumentFinishedLock ();
1447- state int i = 0 ; // = inserted;
1430+ state int i = 0 ;
14481431
14491432try {
14501433state Reference<UnboundCollectionContext> ucx = wait (mm->getUnboundCollectionContext (tr, ns));
@@ -1459,15 +1442,13 @@ ACTOR static Future<Void> doInsert(PlanCheckpoint* checkpoint,
14591442when (Reference<IReadWriteContext> doc = wait (f.empty () ? Never () : f.front ())) {
14601443output.send (ref (new ScanReturnedContext (doc, -1 , Key ()))); // Are these the right scanId etc?
14611444f.pop_front ();
1462- // inserted++;
14631445}
14641446}
14651447}
14661448state int j = 0 ;
14671449for (; j < f.size (); j++) {
14681450Reference<IReadWriteContext> doc = wait (f[j]);
14691451output.send (ref (new ScanReturnedContext (doc, -1 , Key ()))); // Are these the right scanId etc?
1470- // inserted++;
14711452}
14721453throw end_of_stream ();
14731454} catch (Error& e) {
@@ -1503,11 +1484,9 @@ ACTOR static Future<Void> doSort(PlanCheckpoint* outerCheckpoint,
15031484loop {
15041485try {
15051486Reference<ScanReturnedContext> doc = waitNext (docs);
1506- returnProjections.push_back (
1507- doc->toDataValue ().get ().getPackedObject ().getOwned ()); // Note that this call to get() is safe here but
1508- // not in general, because we know that
1509- // doc is wrapping a BsonContext, which means
1510- // toDataValue() is synchronous.
1487+ // Note that this call to get() is safe here but not in general, because we know that doc is wrapping a
1488+ // BsonContext, which means toDataValue() is synchronous.
1489+ returnProjections.push_back (doc->toDataValue ().get ().getPackedObject ().getOwned ());
15111490innerLock->release ();
15121491} catch (Error& e) {
15131492if (e.code () == error_code_end_of_stream) {
0 commit comments