Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ secrets-export.fish
mo-expansion.sh
mo-expansion.yml
expansions.sh
uri.txt

.drivers-tools/

Expand Down
1 change: 1 addition & 0 deletions .mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module.exports = {
require: [
'source-map-support/register',
'ts-node/register',
'test/tools/runner/throw_rejections.cjs',
'test/tools/runner/chai_addons.ts',
'test/tools/runner/ee_checker.ts'
],
Expand Down
89 changes: 49 additions & 40 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1146,50 +1146,59 @@ class ReadableCursorStream extends Readable {
return;
}

this._cursor.next().then(
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().then(undefined, squashError);
} else {
if (this.push(result)) {
return this._readNext();
this._cursor
.next()
.then(
// result from next()
result => {
if (result == null) {
this.push(null);
} else if (this.destroyed) {
this._cursor.close().then(undefined, squashError);
} else {
if (this.push(result)) {
return this._readNext();
}

this._readInProgress = false;
}
},
// error from next()
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
// propagate the error message by removing this special case.
if (err.message.match(/server is closed/)) {
this._cursor.close().then(undefined, squashError);
return this.push(null);
}

this._readInProgress = false;
}
},
err => {
// NOTE: This is questionable, but we have a test backing the behavior. It seems the
// desired behavior is that a stream ends cleanly when a user explicitly closes
// a client during iteration. Alternatively, we could do the "right" thing and
// propagate the error message by removing this special case.
if (err.message.match(/server is closed/)) {
this._cursor.close().then(undefined, squashError);
return this.push(null);
}
// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
}

// NOTE: This is also perhaps questionable. The rationale here is that these errors tend
// to be "operation was interrupted", where a cursor has been closed but there is an
// active getMore in-flight. This used to check if the cursor was killed but once
// that changed to happen in cleanup legitimate errors would not destroy the
// stream. There are change streams test specifically test these cases.
if (err.message.match(/operation was interrupted/)) {
return this.push(null);
// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}

// NOTE: The two above checks on the message of the error will cause a null to be pushed
// to the stream, thus closing the stream before the destroy call happens. This means
// that either of those error messages on a change stream will not get a proper
// 'error' event to be emitted (the error passed to destroy). Change stream resumability
// relies on that error event to be emitted to create its new cursor and thus was not
// working on 4.4 servers because the error emitted on failover was "interrupted at
// shutdown" while on 5.0+ it is "The server is in quiesce mode and will shut down".
// See NODE-4475.
return this.destroy(err);
}
);
)
// if either of the above handlers throw
.catch(error => {
this._readInProgress = false;
this.destroy(error);
});
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
maxWireVersion,
type MongoDBNamespace,
noop,
squashError,
supportsRetryableWrites
} from '../utils';
import { throwIfWriteConcernError } from '../write_concern';
Expand Down Expand Up @@ -345,9 +346,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError instanceof MongoError &&
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
reauthPromise = this.pool.reauthenticate(conn).catch(error => {
reauthPromise = this.pool.reauthenticate(conn);
reauthPromise.then(undefined, error => {
reauthPromise = null;
throw error;
squashError(error);
});

await abortable(reauthPromise, options);
Expand All @@ -368,9 +370,10 @@ export class Server extends TypedEventEmitter<ServerEvents> {
if (session?.pinnedConnection !== conn) {
if (reauthPromise != null) {
// The reauth promise only exists if it hasn't thrown.
void reauthPromise.finally(() => {
const checkBackIn = () => {
this.pool.checkIn(conn);
});
};
void reauthPromise.then(checkBackIn, checkBackIn);
} else {
this.pool.checkIn(conn);
}
Expand Down
10 changes: 8 additions & 2 deletions src/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { clearTimeout, setTimeout } from 'timers';
import { type Document } from './bson';
import { MongoInvalidArgumentError, MongoOperationTimeoutError, MongoRuntimeError } from './error';
import { type ClientSession } from './sessions';
import { csotMin, noop } from './utils';
import { csotMin, noop, squashError } from './utils';

/** @internal */
export class TimeoutError extends Error {
Expand Down Expand Up @@ -102,7 +102,13 @@ export class Timeout extends Promise<never> {
}

throwIfExpired(): void {
if (this.timedOut) throw new TimeoutError('Timed out', { duration: this.duration });
if (this.timedOut) {
// This method is invoked when someone wants to throw immediately instead of await the result of this promise
// Since they won't be handling the rejection from the promise (because we're about to throw here)
// attach handling to prevent this from bubbling up to Node.js
this.then(undefined, squashError);
throw new TimeoutError('Timed out', { duration: this.duration });
}
}

public static expires(duration: number, unref?: true): Timeout {
Expand Down
1 change: 1 addition & 0 deletions test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ describe('Change Streams', function () {
const write = lastWrite();

const nextP = changeStream.next();
nextP.catch(() => null);

await changeStream.close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ describe('CSOT driver tests', metadata, () => {

beforeEach(async function () {
cs = client.db('db').collection('coll').watch([], { timeoutMS: 120 });
const _changePromise = once(cs, 'change');
cs.once('change', () => null);

await once(cs.cursor, 'init');

await internalClient.db().admin().command(failpoint);
Expand Down
5 changes: 3 additions & 2 deletions test/integration/node-specific/abort_signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ describe('AbortSignal support', () => {
if (args[1].find != null) {
commandStub.restore();
controller.abort();
throw new ReAuthenticationError({});
throw new ReAuthenticationError({ message: 'This is a fake reauthentication error' });
}
return commandStub.wrappedMethod.apply(this, args);
});
Expand Down Expand Up @@ -792,8 +792,9 @@ describe('AbortSignal support', () => {
describe('if reauth throws', () => {
beforeEach(() => {
sinon.stub(ConnectionPool.prototype, 'reauthenticate').callsFake(async function () {
const error = new Error('Rejecting reauthenticate for testing');
await sleep(1000);
throw new Error();
throw error;
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ maybeDescribe('examples(change-stream):', function () {
it('Open A Change Stream', {
metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.6.0' } },
test: async function () {
const looper = new Looper(() => db.collection('inventory').insertOne({ a: 1 }));
const looper = new Looper(async () => {
await db.collection('inventory').insertOne({ a: 1 });
});
looper.run();

// Start Changestream Example 1
Expand Down
37 changes: 15 additions & 22 deletions test/integration/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,25 @@ function ignoreNsNotFound(err) {
if (!err.message.match(/ns not found/)) throw err;
}

function setupDatabase(configuration, dbsToClean) {
async function setupDatabase(configuration, dbsToClean) {
dbsToClean = Array.isArray(dbsToClean) ? dbsToClean : [];
var configDbName = configuration.db;
var client = configuration.newClient(configuration.writeConcernMax(), {
maxPoolSize: 1
});
const configDbName = configuration.db;

dbsToClean.push(configDbName);

return client
.connect()
.then(() =>
dbsToClean.reduce(
(result, dbName) =>
result
.then(() =>
client.db(dbName).command({ dropAllUsersFromDatabase: 1, writeConcern: { w: 1 } })
)
.then(() => client.db(dbName).dropDatabase({ writeConcern: { w: 1 } })),
Promise.resolve()
)
)
.then(
() => client.close(),
err => client.close(() => Promise.reject(err))
);
const client = configuration.newClient();
try {
for (const dbName of dbsToClean) {
const db = await client.db(dbName);
for await (const { name } of db.listCollections({}, { nameOnly: true })) {
const collection = db.collection(name);
await collection.deleteMany({}).catch(() => null);
await collection.drop().catch(() => null);
}
}
} finally {
await client.close();
}
}

/**
Expand Down
6 changes: 5 additions & 1 deletion test/manual/mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ const [major] = process.versions.node.split('.');

/** @type {import("mocha").MochaOptions} */
module.exports = {
require: ['ts-node/register', 'test/tools/runner/chai_addons.ts'],
require: [
'ts-node/register',
'test/tools/runner/throw_rejections.cjs',
'test/tools/runner/chai_addons.ts'
],
reporter: 'test/tools/reporter/mongodb_reporter.js',
failZero: true,
color: true,
Expand Down
5 changes: 4 additions & 1 deletion test/mocha_lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ const [major] = process.versions.node.split('.');

/** @type {import("mocha").MochaOptions} */
module.exports = {
require: ['test/integration/node-specific/examples/setup.js'],
require: [
'test/tools/runner/throw_rejections.cjs',
'test/integration/node-specific/examples/setup.js'
],
extension: ['js'],
ui: 'test/tools/runner/metadata_ui.js',
recursive: true,
Expand Down
1 change: 1 addition & 0 deletions test/mocha_mongodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module.exports = {
require: [
'source-map-support/register',
'ts-node/register',
'test/tools/runner/throw_rejections.cjs',
'test/tools/runner/chai_addons.ts',
'test/tools/runner/ee_checker.ts',
'test/tools/runner/hooks/configuration.ts',
Expand Down
6 changes: 6 additions & 0 deletions test/tools/runner/throw_rejections.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// eslint-disable-next-line @typescript-eslint/no-require-imports
const process = require('process');

process.on('unhandledRejection', error => {
throw error;
});
69 changes: 0 additions & 69 deletions test/unit/assorted/optional_require.test.js

This file was deleted.

Loading