Docs Menu
Docs Home
/
Atlas
/ /

$merge (Stream Processing)

The $merge stage specifies a connection in the Connection Registry to write messages to. The connection must be an Atlas connection.

A $merge pipeline stage has the following prototype form:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge | delete | <pipeline> | <expression>",
"whenNotMatched": "insert | discard | expression",
"parallelism": <integer>
}
}

The Atlas Stream Processing version of $merge uses most of the same fields as the Atlas Data Federation version. Atlas Stream Processing also uses the following fields that are either unique to its implementation of $merge, or modified to suit it. To learn more about the fields shared with Atlas Data Federation $merge, see $merge syntax.

Field
Necessity
Description

into

Required

Simplified to reflect Atlas Stream Processing supporting $merge only into Atlas connections.

To learn more, see this description of the Atlas Data Federation $merge fields.

whenMatched

Optional

Extends functionality compared to the Atlas Data Federation $merge stage with support for "delete" and dynamic expressions.

When set to "delete", Atlas deletes all messages matching the condition from the target collection.

If you use a dynamic expression value, it must resolve to one of the following strings:

  • "merge"

  • "replace"

  • "keepExisting"

  • "delete"

whenNotMatched

Optional

Extends functionality compared to the Atlas Data Federation $merge stage with support for dynamic expressions.

If you use a dynamic expression value, it must resolve to one of the following strings:

  • "insert"

  • "discard"

  • "expression"

parallelism

Conditional

Number of threads to which to distribute write operations. Must be an integer value between 1 and 16. Higher parallelism values increase throughput. However, higher values also require the stream processor and the cluster to which it writes to use more computational resources.

If you use a dynamic expression value for into.coll or into.db, you can't set this value greater than 1.

$merge must be the last stage of any pipeline it appears in. You can use only one $merge stage per pipeline.

The on field has special requirements for $merge against sharded collections. To learn more, see $merge syntax.

If you use a dynamic expression value for into.coll or into.db, you can't set a parallelism value greater than 1.

$merge can't write to time series collections. To write documents to time series collections, use the $emit stage.

You can use a dynamic expression as the value of the following fields:

  • into.db

  • into.coll

This enables your stream processor to write messages to different target Atlas collections on a message-by-message basis.

Example

You have a stream of transaction events that generates messages of the following form:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

To sort each of these into a distinct Atlas database and collection, you can write the following $merge stage:

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}
}

This $merge stage:

  • Writes the Very Important Industries message to a Atlas collection named VIP.subscription.

  • Writes the N. E. Buddy message to a Atlas collection named employee.requisition.

  • Writes the Khan Traktor message to a Atlas collection named contractor.billableHours.

You can only use dynamic expressions that evaluate to strings. For more information on dynamic expressions, see expression operators.

If you specify a database or collection with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.

To save streaming data from multiple Apache Kafka Topics into collections in your Atlas cluster, use the $merge stage with the $source stage. The $source stage specifies the topics from which to read data. The $merge stage writes the data to the target collection.

Use the following syntax:

{
"$source": {
"connectionName": "<registered-kafka-connection>",
"topic": [ "<topic-name-1>", "<topic-name-2>", ... ]
}
},
{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
}
},
...
}

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $match stage excludes documents that have a dewPoint.value of less than or equal to 5.0 and passes along the documents with dewPoint.value greater than 5.0 to the next stage.

  3. The $merge stage writes the output to an Atlas collection named stream in the sample_weatherstream database. If no such database or collection exist, Atlas creates them.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

To view the documents in the resulting sample_weatherstream.stream collection, connect to your Atlas cluster and run the following command:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

You can use the $merge.whenMatched and $merge.whenNotMatched parameters to replicate the effects of Change Stream events according to their operation type.

The following aggregation has four stages:

  1. The $source stage establishes a connection to the db1.coll1 collection on an Atlas cluster over the atlas1 connection.

  2. The $addFields stage enriches the ingested documents with a fullDocument._isDelete field set to the value of an equality check between the "$operationType value of each document and "delete". This equality evaluates to a boolean.

  3. The $replaceRoot stage replaces the document with the value of the enriched $fullDocument field.

  4. The $merge stage writes to db1.coll1 over the atlas2 connection, performing two checks on each document:

    • First, the whenMatched field checks if the document matches an existing document in db1.coll1 collection by _id, the default match field since on is not explicitly set. If it does and fullDocument._isDelete is set to true, then Atlas deletes the matching document. If it does match and fullDocument._isDelete is set to false, then Atlas replaces the matching document with the new one from the streaming data source.

    • Second, if Atlas Stream Processing finds no such matching document and fullDocument._isDelete is true, Atlas discards the document instead of writing it to the collection. If there is no such matching document and fullDocument._isDelete is false, Atlas inserts the document from the streaming data source into the collection.

{
$source: {
connectionName: “atlas1”,
db: “db1”,
coll: “coll1”,
fullDocument: “required”
}
},
{
$addFields: {
“fullDocument._isDelete”: {
$eq: [
“$operationType”,
“delete”
]
}
}
},
{
$replaceRoot: {
newRoot: “$fullDocument”
}
},
{
$merge: {
into: {
connectionName: “atlas2”,
db: “db1”,
coll: “coll1”
},
whenMatched: {
$cond: {
if: “$_isDelete”,
then: “delete”,
else: “replace”
}
},
whenNotMatched: {
$cond: {
if: “$_isDelete”,
then: “discard”,
else: “insert”
}
},
}
}

Back

$emit

On this page