Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,15 @@ mongoc_change_stream_next(mongoc_change_stream_t *stream, const bson_t **bson)
goto end;
}

/* the cursor is closed. */
if (stream->cursor->cursor_id == 0) {
_mongoc_set_error(&stream->err,
MONGOC_ERROR_CURSOR,
MONGOC_ERROR_CURSOR_INVALID_CURSOR,
"Cannot advance a closed change stream.");
goto end;
}

resumable = _is_resumable_error(stream, err_doc);
while (resumable) {
/* recreate the cursor. */
Expand Down
78 changes: 78 additions & 0 deletions src/libmongoc/tests/test-mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,78 @@ prose_test_18(void)
mock_server_destroy(server);
}

// Test that a resume does not occur after an "invalidate" event.
static void
iterate_after_invalidate(void *test_ctx)
{
mongoc_client_t *client = test_framework_new_default_client();
mongoc_collection_t *coll = mongoc_client_get_collection(client, "db", "coll");
bson_error_t error;
int64_t start_time = bson_get_monotonic_time();

BSON_UNUSED(test_ctx);

// Insert a document into the collection to ensure the collection is created.
bool ok = mongoc_collection_insert_one(coll, tmp_bson("{'foo': 'bar'}"), NULL /* opts */, NULL /* reply */, &error);
ASSERT_OR_PRINT(ok, error);

mongoc_change_stream_t *cs = mongoc_collection_watch(coll, tmp_bson("{}"), NULL /* opts */);

ASSERT_OR_PRINT(mongoc_collection_drop(coll, &error), error);

// Iterate until the next event. Expect "drop" event.
{
const bson_t *event;
bool found_event = false;
while (!found_event) {
found_event = mongoc_change_stream_next(cs, &event);
if (!found_event) {
ASSERT_OR_PRINT(!mongoc_change_stream_error_document(cs, &error, NULL /* document */), error);
}

int64_t delta = bson_get_monotonic_time() - start_time;
if (delta > 10 * 1000 * 1000) {
test_error("test exceeded 10 seconds");
}
}
ASSERT_MATCH(event, "{'operationType': 'drop'}");
}

// Iterate until the next event. Expect "invalidate" event.
{
const bson_t *event;
bool found_event = false;
while (!found_event) {
found_event = mongoc_change_stream_next(cs, &event);
if (!found_event) {
ASSERT_OR_PRINT(!mongoc_change_stream_error_document(cs, &error, NULL /* document */), error);
}

int64_t delta = bson_get_monotonic_time() - start_time;
if (delta > 10 * 1000 * 1000) {
test_error("test exceeded 10 seconds");
}
}
ASSERT_MATCH(event, "{'operationType': 'invalidate'}");
}

// Iterate. Expect error suggesting failure to iterate a closed cursor.
{
const bson_t *event;
bool found_error = false;
while (!found_error) {
ASSERT_WITH_MSG(!mongoc_change_stream_next(cs, &event), "expected no event, got: %s", tmp_json(event));
found_error = mongoc_change_stream_error_document(cs, &error, NULL /* document */);
}
ASSERT_ERROR_CONTAINS(
error, MONGOC_ERROR_CURSOR, MONGOC_ERROR_CURSOR_INVALID_CURSOR, "Cannot advance a closed change stream");
}

mongoc_change_stream_destroy(cs);
mongoc_collection_destroy(coll);
mongoc_client_destroy(client);
}

typedef struct {
bson_t *commands[6];
size_t commands_len;
Expand Down Expand Up @@ -2176,6 +2248,12 @@ test_change_stream_install(TestSuite *suite)
test_framework_skip_if_not_replset);
TestSuite_AddMockServerTest(suite, "/change_streams/prose_test_17", prose_test_17);
TestSuite_AddMockServerTest(suite, "/change_streams/prose_test_18", prose_test_18);
TestSuite_AddFull(suite,
"/change_streams/iterate_after_invalidate",
iterate_after_invalidate,
NULL,
NULL,
test_framework_skip_if_not_replset);
TestSuite_AddFull(suite,
"/change_stream/batchSize0",
test_change_stream_batchSize0,
Expand Down