An Atlas Stream Processing stream processor applies the logic of a uniquely named stream aggregation pipeline to your streaming data. Atlas Stream Processing saves each stream processor definition to persistent storage so that it can be reused. You can only use a given stream processor in the stream processing instance its definition is stored in. Atlas Stream Processing supports up to 4 stream processors per worker. For additional processors that exceed this limit, Atlas Stream Processing allocates a new resource.
Prerequisites
To create and manage a stream processor, you must have:
mongosh
version 2.0 or higherA database user with the
atlasAdmin
role to create and run stream processorsAn Atlas cluster
Considerations
Many stream processor commands require you to specify the name of the relevant stream processor in the method invocation. The syntax described in the following sections assumes strictly alphanumeric names. If your stream processor's name includes non-alphanumeric characters such as hyphens (-
) or full stops (.
), you must enclose the name in square brackets ([]
) and double quotes (""
) in the method invocation, as in sp.["special-name-stream"].stats()
.
Create a Stream Processor Interactively
You can create a stream processor interactively with the sp.process()
method. Stream processors that you create interactively exhibit the following behavior:
Write output and dead letter queue documents to the shell
Begin running immediately upon creation
Run for either 10 minutes or until the user stops them
Don't persist after stopping
Stream processors that you create interactively are intended for prototyping. To create a persistent stream processor, see Create a Stream Processor.
sp.process()
has the following syntax:
sp.process(<pipeline>)
Field | Type | Necessity | Description |
---|---|---|---|
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named streamOwner
using x.059 authentication:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the aggregation stages you want to apply to a variable named pipeline
.
The following example uses the stuff
topic in the myKafka
connection in the connection registry as the $source
, matches records where the temperature
field has a value of 46
and emits the processed messages to the output
topic of the mySink
connection in the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Create a Stream Processor
To create a stream processor:
The Atlas Administration API provides an endpoint for creating a stream processor.
To create a new stream processor with mongosh
, use the sp.createStreamProcessor()
method. It has the following syntax:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Type | Necessity | Description |
---|---|---|---|
| string | Required | Logical name for the stream processor. This must be unique within the stream processing instance. This name should contain only alphanumeric characters. |
| array | Required | Stream aggregation pipeline you want to apply to your streaming data. |
| object | Optional | Object defining various optional settings for your stream processor. |
| object | Conditional | Object assigning a dead letter queue for your stream processing instance. This field is necessary if you define the |
| string | Conditional | Human-readable label that identifies a connection in your connection registry. This connection must reference an Atlas cluster. This field is necessary if you define the |
| string | Conditional | Name of an Atlas database on the cluster specified in |
| string | Conditional | Name of a collection in the database specified in |
Connect to your stream processing instance.
Use the connection string associated with your stream processing instance to connect using mongosh
.
Example
The following command connects to a stream processing instance as a user named streamOwner
using x.059 authentication.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Provide your user password when prompted.
Define a pipeline.
In the mongosh
prompt, assign an array containing the aggregation stages you want to apply to a variable named pipeline
.
The following example uses the stuff
topic in the myKafka
connection in the connection registry as the $source
, matches records where the temperature
field has a value of 46
and emits the processed messages to the output
topic of the mySink
connection in the connection registry:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Optional) Define a DLQ.
In the mongosh
prompt, assign an object containing the following properties of your DLQ:
Connection name
Database name
Collection name
The following example defines a DLQ over the cluster01
connection, in the metadata.dlq
database collection.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Start a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which have been stopped
for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.
To start a stream processor:
The Atlas Administration API provides an endpoint for starting a stream processor.
To start an existing stream processor with mongosh
, use the sp.<streamprocessor>.start()
method. <streamprocessor>
must be the name of a stream processor defined for the current stream processing instance.
For example, to start a stream processor named proc01
, run the following command:
sp.proc01.start()
This method returns:
true
if the stream processor exists and isn't currently running.false
if you try to start a stream processor that doesn't exist, or exists and is currently running.
Stop a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which have been stopped
for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.
To stop a stream processor:
The Atlas Administration API provides an endpoint for stopping a stream processor.
To stop an existing stream processor with mongosh
, use the sp.<streamprocessor>.stop()
method. <streamprocessor>
must be the name of a currently running stream processor defined for the current stream processing instance.
For example, to stop a stream processor named proc01
, run the following command:
sp.proc01.stop()
This method returns:
true
if the stream processor exists and is currently running.false
if the stream processor doesn't exist, or if the stream processor isn't currently running.
Modify a Stream Processor
You can modify the following elements of an existing stream processor:
To modify a stream processor, do the following:
Apply your update to the stream processor.
By default, modified processors restore from the last checkpoint. Alternatively, you can set resumeFromCheckpoint=false
, in which case the processor only retains summary stats. When you modify a processor with open windows, the windows are entirely recomputed on the updated pipeline.
Note
If you change the name of a stream processor for which you had configured the Stream Processor State is failed alert by using an Operator (which contains matcher expressions like is
, contains
, and more), Atlas won't trigger alerts for the renamed stream processor if the matcher expression doesn't match the new name. To monitor the renamed stream processor, reconfigure the alert.
Limitations
When the default setting resumeFromCheckpoint=true
is enabled, the following limitations apply:
You can't modify the
$source
stage.You can't modify the interval of your window.
You can't remove a window.
You can only modify a pipeline with a window if that window has either a
$group
or$sort
stage in its inner pipeline.You can't change an existing window type. For example, you can't change from a
$tumblingWindow
to a$hoppingWindow
or vice versa.Processors with windows may reprocess some data as a product of recalculating the windows.
To modify a stream processor:
Requires mongosh
v2.3.4+.
Use the sp.<streamprocessor>.modify()
command to modify an existing stream processor. <streamprocessor>
must be the name of a stopped stream processor defined for the current stream processing instance.
For example, to modify a stream processor named proc01
, run the following command:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
Add a Stage to an Existing Pipeline
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modify the Input Source of a Stream Processor
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Remove a Dead Letter Queue from a Stream Processor
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modify a Stream Processor with a Window
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
The Atlas Administration API provides an endpoint for modifying a stream processor.
Drop a Stream Processor
To drop a stream processor:
The Atlas Administration API provides an endpoint for deleting a stream processor.
To delete an existing stream processor with mongosh
, use the sp.<streamprocessor>.drop()
method. <streamprocessor>
must be the name of a stream processor defined for the current stream processing instance.
For example, to drop a stream processor named proc01
, run the following command:
sp.proc01.drop()
This method returns:
true
if the stream processor exists.false
if the stream processor doesn't exist.
When you drop a stream processor, all resources that Atlas Stream Processing provisioned for it are destroyed, along with all saved state.
List Available Stream Processors
To list all available stream processors:
The Atlas Administration API provides an endpoint for listing all available stream processors.
To list all available stream processors on the current stream processing instance with mongosh
, use the sp.listStreamProcessors()
method. It returns a list of documents containing the name, start time, current state, and pipeline associated with each stream processor. It has the following syntax:
sp.listStreamProcessors(<filter>)
<filter>
is a document specifying which field(s) to filter the list by.
Example
The following example shows a return value for an unfiltered request:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
If you run the command again on the same stream processing instance, filtering for a "state"
of "running"
, you see the following output:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Sample from a Stream Processor
To return an array of sampled results from an existing stream processor to STDOUT
with mongosh
, use the sp.<streamprocessor>.sample()
method. <streamprocessor>
must be the name of a currently running stream processor defined for the current stream processing instance. For example, the following command samples from a stream processor named proc01
.
sp.proc01.sample()
This command runs continuously until you cancel it by using CTRL-C
, or until the returned samples cumulatively reach 40 MB in size. The stream processor reports invalid documents in the sample in a _dlqMessage
document of the following form:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
You can use these messages to diagnose data hygiene issues without defining a dead letter queue collection.
View Statistics of a Stream Processor
Note
Atlas Stream Processing discards the internal state of stream processors which have been stopped
for 45 days or more. When you start such a processor, it operates and reports statistics identically to its initial run.
To view statistics of a stream processor:
The Atlas Administration API provides an endpoint for viewing the statistics of a stream processor.
To return a document summarizing the current status of an existing stream processor with mongosh
, use the sp.<streamprocessor>.stats()
method. streamprocessor
must be the name of a currently running stream processor defined for the current stream processing instance. It has the following syntax:
sp.<streamprocessor>.stats({options: {<options>}})
Where options
is an optional document with the following fields:
Field | Type | Description |
---|---|---|
| integer | Unit to use for the size of items in the output. By default, Atlas Stream Processing displays item size in bytes. To display in KB, specify a |
| boolean | Flag that specifies the verbosity level of the output document. If set to |
The output document has the following fields:
Field | Type | Description |
---|---|---|
| string | The namespace the stream processor is defined in. |
| object | A document describing the operational state of the stream processor. |
| string | The name of the stream processor. |
| string | The status of the stream processor. This field can have the following values:
|
| integer | The scale in which the size field displays. If set to |
| integer | The number of documents published to the stream. A document is considered 'published' to the stream once it passes through the |
| integer | The number of bytes or kilobytes published to the stream. Bytes are considered 'published' to the stream once they pass through the |
| integer | The number of documents processed by the stream. A document is considered 'processed' by the stream once it passes through the entire pipeline. |
| integer | The number of bytes or kilobytes processed by the stream. Bytes are considered 'processed' by the stream once they pass through the entire pipeline. |
| integer | The number of documents sent to the Dead Letter Queue. |
| integer | The number of bytes or kilobytes sent to the Dead Letter Queue. |
| integer | The difference, in seconds, between the event time represented by the most recent change stream resume token and the latest event in the oplog. |
| token | The most recent change stream resume token. Only applies to stream processors with a change stream source. |
| integer | The number of bytes used by windows to store processor state. |
| integer | The timestamp of the current watermark. |
| array | The statistics for each operator in the processor pipeline. Atlas Stream Processing returns this field only if you pass in the
|
| integer | The maximum memory usage of the operator in bytes or kilobytes. |
| integer | The total execution time of the operator in seconds. |
| date | The start time of the minimum open window. This value is optional. |
| date | The start time of the maximum open window. This value is optional. |
| array | Offset information for an Apache Kafka broker's partitions. |
| integer | The Apache Kafka topic partition number. |
| integer | The offset that the stream processor is on for the specified partition. This value equals the previous offset that the stream processor processed plus |
| integer | The offset that the stream processor last committed to the Apache Kafka broker and the checkpoint for the specified partition. All messages through this offset are recorded in the last checkpoint. |
| boolean | The flag that indicates whether the partition is idle. This value defaults to |
For example, the following shows the status of a stream processor named proc01
on a stream processing instance named inst01
with item sizes displayed in KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }