Overview
In this guide, you can learn how to use a change stream to monitor real-time changes to your database. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.
Sample Data
The examples in this guide use the sample_restaurants.restaurants
collection from the Atlas sample datasets. To learn how to create a free MongoDB Atlas cluster and load the sample datasets, see Get Started with PyMongo.
Open a Change Stream
To open a change stream, call the watch()
method. The instance on which you call the watch()
method on determines the scope of events that the change stream listens for. You can call the watch()
method on the following classes:
MongoClient
: To monitor all changes in the MongoDB deploymentDatabase
: To monitor changes in all collections in the databaseCollection
: To monitor changes in the collection
The following example opens a change stream on the restaurants
collection and outputs changes as they occur. Select the Synchronous or Asynchronous tab to see the corresponding code:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch() as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch() as stream: async for change in stream: print(change)
To begin watching for changes, run the application. Then, in a separate application or shell, modify the restaurants
collection. The following example updates a document with a name
field value of Blarney Castle
. Select the Synchronous or Asynchronous tab to see the corresponding code:
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = collection.update_one(query_filter, update_operation)
database = client["sample_restaurants"] collection = database["restaurants"] query_filter = { "name": "Blarney Castle" } update_operation = { '$set' : { "cuisine": "Irish" } } result = await collection.update_one(query_filter, update_operation)
When you update the collection, the change stream application prints the change as it occurs. The printed change event resembles the following:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
Modify the Change Stream Output
You can pass the pipeline
parameter to the watch()
method to modify the change stream output. This parameter allows you to watch for only specified change events. Format the parameter as a list of objects that each represent an aggregation stage.
You can specify the following stages in the pipeline
parameter:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
The following example uses the pipeline
parameter to open a change stream that records only update operations. Select the Synchronous or Asynchronous tab to see the corresponding code:
change_pipeline = { "$match": { "operationType": "update" }}, with collection.watch(pipeline=change_pipeline) as stream: for change in stream: print(change)
change_pipeline = { "$match": { "operationType": "update" }}, async with await collection.watch(pipeline=change_pipeline) as stream: async for change in stream: print(change)
To learn more about modifying your change stream output, see the Modify Change Stream Output section in the MongoDB Server manual.
Modify watch()
Behavior
The watch()
method accepts optional parameters, which represent options you can use to configure the operation. If you don't specify any options, the driver does not customize the operation.
The following table describes the options you can set to customize the behavior of watch()
:
Property | Description |
---|---|
| A list of aggregation pipeline stages that modify the output of the change stream. |
| Specifies whether to show the full document after the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Specifies whether to show the full document as it was before the change, rather than showing only the changes made to the document. To learn more about this option, see Include Pre-Images and Post-Images. |
| Directs watch() to resume returning changes after the operation specified in the resume token.Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.resume_after is mutually exclusive with start_after and start_at_operation_time . |
| Directs watch() to start a new change stream after the operation specified in the resume token. Allows notifications to resume after an invalidate event.Each change stream event document includes a resume token as the _id field. Pass the entire _id field of the change event document that represents the operation you want to resume after.start_after is mutually exclusive with resume_after and start_at_operation_time . |
| Directs watch() to return only events that occur after the specified timestamp.start_at_operation_time is mutually exclusive with resume_after and start_after . |
| The maximum amount of time, in milliseconds, the server waits for new data changes to report to the change stream cursor before returning an empty batch. Defaults to 1000 milliseconds. |
| Starting in MongoDB Server v6.0, change streams support change notifications for Data Definition Language (DDL) events, such as the createIndexes and dropIndexes events. To include expanded events in a change stream, create the change stream cursor and set this parameter to True . |
| The maximum number of change events to return in each batch of the response from the MongoDB cluster. |
| The collation to use for the change stream cursor. |
| An instance of ClientSession . |
| A comment to attach to the operation. |
Include Pre-Images and Post-Images
Important
You can enable pre-images and post-images on collections only if your deployment uses MongoDB v6.0 or later.
By default, when you perform an operation on a collection, the corresponding change event includes only the delta of the fields modified by that operation. To see the full document before or after a change, specify the full_document_before_change
or the full_document
parameters in the watch()
method.
The pre-image is the full version of a document before a change. To include the pre-image in the change stream event, set the full_document_before_change
parameter to one of the following values:
whenAvailable
: The change event includes a pre-image of the modified document for change events only if the pre-image is available.required
: The change event includes a pre-image of the modified document for change events. If the pre-image is not available, the driver raises an error.
The post-image is the full version of a document after a change. To include the post-image in the change stream event, set the full_document
parameter to one of the following values:
updateLookup
: The change event includes a copy of the entire changed document from some time after the change.whenAvailable
: The change event includes a post-image of the modified document for change events only if the post-image is available.required
: The change event includes a post-image of the modified document for change events. If the post-image is not available, the driver raises an error.
The following example calls the watch()
method on a collection and includes the post-image of updated documents by specifying the fullDocument
parameter. Select the Synchronous or Asynchronous tab to see the corresponding code:
database = client["sample_restaurants"] collection = database["restaurants"] with collection.watch(full_document='updateLookup') as stream: for change in stream: print(change)
database = client["sample_restaurants"] collection = database["restaurants"] async with await collection.watch(full_document='updateLookup') as stream: async for change in stream: print(change)
With the change stream application running, updating a document in the restaurants
collection by using the preceding update example prints a change event resembling the following:
{'_id': {'_data': '...'}, 'operationType': 'update', 'clusterTime': Timestamp(...), 'wallTime': datetime.datetime(...), 'fullDocument': {'_id': ObjectId('...'), 'address': {...}, 'borough': 'Queens', 'cuisine': 'Irish', 'grades': [...], 'name': 'Blarney Castle', 'restaurant_id': '40366356'}, 'ns': {'db': 'sample_restaurants', 'coll': 'restaurants'}, 'documentKey': {'_id': ObjectId('...')}, 'updateDescription': {'updatedFields': {'cuisine': 'Irish'}, 'removedFields': [], 'truncatedArrays': []}}
To learn more about pre-images and post-images, see Change Streams with Document Pre- and Post-Images in the MongoDB Server manual.
Additional Information
To learn more about change streams, see Change Streams in the MongoDB Server manual.
API Documentation
To learn more about any of the methods or types discussed in this guide, see the following API documentation: