@@ -209,75 +209,68 @@ describe('Change Streams', function () {
209209 }
210210 } ) ;
211211
212- it ( 'should support creating multiple simultaneous ChangeStreams' , {
213- metadata : { requires : { topology : 'replicaset' } } ,
214-
215- test : function ( done ) {
212+ it (
213+ 'should support creating multiple simultaneous ChangeStreams' ,
214+ { requires : { topology : 'replicaset' } } ,
215+ async function ( ) {
216216 const configuration = this . configuration ;
217217 const client = configuration . newClient ( ) ;
218218
219- client . connect ( ( err , client ) => {
220- expect ( err ) . to . not . exist ;
221- this . defer ( ( ) => client . close ( ) ) ;
219+ await client . connect ( ) ;
222220
223- const database = client . db ( 'integration_tests' ) ;
224- const collection1 = database . collection ( 'simultaneous1' ) ;
225- const collection2 = database . collection ( 'simultaneous2' ) ;
221+ this . defer ( ( ) => client . close ( ) ) ;
226222
227- const changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
228- this . defer ( ( ) => changeStream1 . close ( ) ) ;
229- const changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
230- this . defer ( ( ) => changeStream2 . close ( ) ) ;
231- const changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
232- this . defer ( ( ) => changeStream3 . close ( ) ) ;
223+ const database = client . db ( 'integration_tests' ) ;
224+ const collection1 = database . collection ( 'simultaneous1' ) ;
225+ const collection2 = database . collection ( 'simultaneous2' ) ;
233226
234- setTimeout ( ( ) => {
235- this . defer (
236- collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) )
237- ) ;
238- } , 50 ) ;
239-
240- Promise . resolve ( )
241- . then ( ( ) =>
242- Promise . all ( [ changeStream1 . hasNext ( ) , changeStream2 . hasNext ( ) , changeStream3 . hasNext ( ) ] )
243- )
244- . then ( function ( hasNexts ) {
245- // Check all the Change Streams have a next item
246- assert . ok ( hasNexts [ 0 ] ) ;
247- assert . ok ( hasNexts [ 1 ] ) ;
248- assert . ok ( hasNexts [ 2 ] ) ;
249-
250- return Promise . all ( [ changeStream1 . next ( ) , changeStream2 . next ( ) , changeStream3 . next ( ) ] ) ;
251- } )
252- . then ( function ( changes ) {
253- // Check the values of the change documents are correct
254- assert . equal ( changes [ 0 ] . operationType , 'insert' ) ;
255- assert . equal ( changes [ 1 ] . operationType , 'insert' ) ;
256- assert . equal ( changes [ 2 ] . operationType , 'insert' ) ;
257-
258- expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
259- expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
260- expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
261-
262- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
263- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests' ) ;
264- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests' ) ;
265-
266- expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous1' ) ;
267- expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous2 ' ) ;
268- expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous2 ' ) ;
269-
270- expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
271- expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber ' , 2 ) ;
272- expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber ' , 3 ) ;
273- } )
274- . then (
275- ( ) => done ( ) ,
276- err => done ( err )
277- ) ;
278- } ) ;
227+ const changeStream1 = collection1 . watch ( [ { $addFields : { changeStreamNumber : 1 } } ] ) ;
228+ this . defer ( ( ) => changeStream1 . close ( ) ) ;
229+ const changeStream2 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 2 } } ] ) ;
230+ this . defer ( ( ) => changeStream2 . close ( ) ) ;
231+ const changeStream3 = collection2 . watch ( [ { $addFields : { changeStreamNumber : 3 } } ] ) ;
232+ this . defer ( ( ) => changeStream3 . close ( ) ) ;
233+
234+ setTimeout ( ( ) => {
235+ collection1 . insertMany ( [ { a : 1 } ] ) . then ( ( ) => collection2 . insertMany ( [ { a : 1 } ] ) ) ;
236+ } , 50 ) ;
237+
238+ await Promise . resolve ( )
239+ . then ( ( ) =>
240+ Promise . all ( [ changeStream1 . hasNext ( ) , changeStream2 . hasNext ( ) , changeStream3 . hasNext ( ) ] )
241+ )
242+ . then ( function ( hasNexts ) {
243+ // Check all the Change Streams have a next item
244+ assert . ok ( hasNexts [ 0 ] ) ;
245+ assert . ok ( hasNexts [ 1 ] ) ;
246+ assert . ok ( hasNexts [ 2 ] ) ;
247+
248+ return Promise . all ( [ changeStream1 . next ( ) , changeStream2 . next ( ) , changeStream3 . next ( ) ] ) ;
249+ } )
250+ . then ( function ( changes ) {
251+ // Check the values of the change documents are correct
252+ assert . equal ( changes [ 0 ] . operationType , 'insert' ) ;
253+ assert . equal ( changes [ 1 ] . operationType , 'insert' ) ;
254+ assert . equal ( changes [ 2 ] . operationType , 'insert' ) ;
255+
256+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'fullDocument.a ' , 1 ) ;
257+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'fullDocument.a ' , 1 ) ;
258+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'fullDocument.a' , 1 ) ;
259+
260+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests ' ) ;
261+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.db ' , 'integration_tests ' ) ;
262+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.db' , 'integration_tests' ) ;
263+
264+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous1' ) ;
265+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'ns.coll ' , 'simultaneous2' ) ;
266+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'ns.coll' , 'simultaneous2' ) ;
267+
268+ expect ( changes [ 0 ] ) . to . have . nested . property ( 'changeStreamNumber' , 1 ) ;
269+ expect ( changes [ 1 ] ) . to . have . nested . property ( 'changeStreamNumber' , 2 ) ;
270+ expect ( changes [ 2 ] ) . to . have . nested . property ( 'changeStreamNumber' , 3 ) ;
271+ } ) ;
279272 }
280- } ) ;
273+ ) ;
281274
282275 it ( 'should properly close ChangeStream cursor' , {
283276 metadata : { requires : { topology : 'replicaset' } } ,
@@ -807,22 +800,27 @@ describe('Change Streams', function () {
807800
808801 it ( 'when invoked with promises' , {
809802 metadata : { requires : { topology : 'replicaset' } } ,
810- test : function ( ) {
811- const read = ( ) => {
812- return Promise . resolve ( )
813- . then ( ( ) => changeStream . next ( ) )
814- . then ( ( ) => changeStream . next ( ) )
815- . then ( ( ) => {
816- this . defer ( lastWrite ( ) ) ;
817- const nextP = changeStream . next ( ) ;
818- return changeStream . close ( ) . then ( ( ) => nextP ) ;
819- } ) ;
803+ test : async function ( ) {
804+ const read = async ( ) => {
805+ await changeStream . next ( ) ;
806+ await changeStream . next ( ) ;
807+
808+ const write = lastWrite ( ) ;
809+
810+ const nextP = changeStream . next ( ) ;
811+
812+ await changeStream . close ( ) ;
813+
814+ await write ;
815+ await nextP ;
820816 } ;
821817
822- return Promise . all ( [ read ( ) , write ( ) ] ) . then (
823- ( ) => Promise . reject ( new Error ( 'Expected operation to fail with error' ) ) ,
824- err => expect ( err . message ) . to . equal ( 'ChangeStream is closed' )
818+ const error = await Promise . all ( [ read ( ) , write ( ) ] ) . then (
819+ ( ) => null ,
820+ error => error
825821 ) ;
822+
823+ expect ( error . message ) . to . equal ( 'ChangeStream is closed' ) ;
826824 }
827825 } ) ;
828826
0 commit comments