Docs Menu
Docs Home
/
Atlas
/ /

$emit Stage (Stream Processing)

The $emit stage specifies a connection in the Connection Registry to emit messages to. The connection must be either an Apache Kafka broker or a time series collection.

To write processed data to an Apache Kafka broker, use the $emit pipeline stage with the following prototype form:

{
"$emit": {
"connectionName": "<registered-connection>",
"topic": "<target-topic>" | <expression>,
"config": {
"acks": <number-of-acknowledgements>,
"compression_type": "<compression-type>",
"dateFormat": "default" | "ISO8601",
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

The $emit stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Name, as it appears in the Connection Registry, of the connection to ingest data from.

topic

string | expression

Required

Name of the Apache Kafka topic to emit messages to.

config

document

Optional

Document containing fields that override various default values.

config.acks

int

Optional

Number of acknowledgements required from the Apache Kafka cluster for a successful $emit operation.

The default value is all. Atlas Stream Processing supports the following values:

  • -1

  • 0

  • 1

  • all

config.compression_type

string

Optional

Compression type for all data generated by the producer. The default is none (i.e no compression). Valid values are:

  • none

  • gzip

  • snappy

  • lz4

  • zstd

Compression is used for full batches of data, so the efficacy of batching impacts the compression ratio; more batching results in better compression.

config.dateFormat

string

Optional

Date format for the date value. Valid values are:

  • default - to use the default of the outputFormat.

  • ISO8601 - to convert dates to strings in the ISO8601 format, which includes millisecond precision (YYYY-MM-DDTHH:mm:ss.sssZ).

For example:

Consider the following input.

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

If $emit.config.defaultFormat is set to default, output looks similar to the following:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

If $emit.config.defaultFormat is set to ISO8601, output looks similar to the following:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.headers

expression

Optional

Headers to add to the output message. The expression must evaluate to either an object or an array.

If the expression evaluates to an object, Atlas Stream Processing constructs a header from each key-value pair in that object, where the key is the header name, and the value is the header value.

If the expression evaluates to an array, it must take the form of an array of key-value pair objects. For example:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing constructs a header from each object in the array, where the key is the header name, and the value is the header value. Atlas Stream Processing supports header values of the following types:

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

object | string

Optional

Expression that evaluates to a Apache Kafka message key.

If you specify config.key, you must specify config.keyFormat.

config.keyFormat

string

Conditional

Data type used to deserialize Apache Kafka key data. Must be one of the following values:

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

Defaults to binData. If you specify config.key, you must specify config.keyFormat. If the config.key of a document does not deserialize successfully to the specified data type, Atlas Stream Processing sends it to your dead letter queue.

config.outputFormat

string

Optional

JSON format to use when emitting messages to Apache Kafka. Must be one of the following values:

  • "relaxedJson"

  • "canonicalJson"

Defaults to "relaxedJson".

To write processed data to an Atlas time series collection, use the $emit pipeline stage with the following prototype form:

{
"$emit": {
"connectionName": "<registered-connection>",
"db": "<target-db>",
"coll": "<target-coll>",
"timeseries": {
<options>
}
}
}

The $emit stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Name, as it appears in the Connection Registry, of the connection to ingest data from.

db

string

Required

Name of the Atlas database that contains the target time series collection.

coll

string

Required

Name of the Atlas time series collection to write to.

timeseries

document

Required

Document defining the time series fields for the collection.

Note

The maximum size for documents within a time series collection is 4 MB. To learn more, see Time Series Collection Limitations.

To write processed data to an AWS S3 bucket sink connection, use the $emit pipeline stage with the following prototype form:

{
"$emit": {
"connectionName": "<registered-connection>",
"bucket": "<target-bucket>",
"region": "<target-region>",
"path": "<key-prefix>" | <expression>,
"config": {
"writeOptions": {
"count": <doc-count>,
"bytes": <threshold>,
"interval": {
"size": <unit-count>,
"unit": "<time-denomination>"
}
},
"delimiter": "<delimiter>",
"outputFormat": "basicJson" | "canonicalJson" | "relaxedJson",
"dateFormat": "default" | "ISO8601",
"compression": "gzip" | "snappy",
"compressionLevel": <level>
}
}
}

The $emit stage takes a document with the following fields:

Field
Type
Necessity
Description

connectionName

string

Required

Name, as it appears in the Connection Registry, of the connection to write data to.

bucket

string

Required

Name of the S3 bucket to which to write data.

region

string

Optional

Name of the AWS region in which the target bucket resides. If you host your stream processing instance in an AWS region, this parameter defaults to that region. Otherwise, it defaults to the AWS region nearest your stream processing instance host region.

path

string | expression

Required

Prefix of the key of objects written to the S3 bucket. Must either be a literal prefix string or an expression that evaluates to a string.

config

document

Optional

Document containing additional parameters that override various default values.

config.writeOptions

document

Optional

Document containing additional parameters governing write behavior. These parameters trigger write behavior according to which threshold is met first.

For example, if the ingested documents reach the config.writeOptions.count threshold without reaching the config.writeOptions.interval threshold, the stream processor still emits these documents to S3 according to the config.writeOptions.count threshold.

config.writeOptions.count

integer

Optional

Number of documents to group into each file written to S3.

config.writeOptions.bytes

integer

Optional

Specifies the minimum number of bytes that must accumulate before a file is written to S3. The byte count is determined by the size of the BSON documents ingested by the pipeline, not the size of the final output file.

config.writeOptions.interval

document

Optional

Specifies a timer for bulk writing documents as a combination of size and units.

Defaults to 1 minute. You can't set the size to 0 for any unit. The maximum interval is 7 days.

config.writeOptions.interval.size

integer

Conditional

The number of units specified by writeOptions.interval.units after which the stream processor bulk writes documents to S3.

Defaults to 1. You can't set a size of 0. If you define writeOptions.interval, you must also define this parameter.

config.writeOptions.interval.units

string

Conditional

The denomination of time in which to count the bulk write timer. This parameter supports the following values:

  • ms

  • second

  • minute

  • hour

  • day

Defaults to minute. If you define writeOptions.interval, you must also define this parameter.

config.delimiter

string

Optional

Delimiter between each entry in the emitted file.

Defaults to \n.

config.outputFormat

string

Optional

Specifies the output format of the JSON written to S3. Must be one of the following values:

  • "basicJson"

  • "canonicalJson"

  • "relaxedJson"

Defaults to "relaxedJson".

To learn more, see Basic JSON.

config.dateFormat

string

Optional

Date format for the date value. Valid values are:

  • default - to use the default of the outputFormat.

  • ISO8601 - to convert dates to strings in the ISO8601 format, which includes millisecond precision (YYYY-MM-DDTHH:mm:ss.sssZ).

For example, if you add the following record to the pipeline:

{ "flightTime" : ISODate('2025-01-10T20:17:38.387Z') }

then if $emit.config.defaultFormat is set to default, output looks similar to the following:

{ "flightTime" : {$date :"2025-01-10T20:17:38.387Z"}}

If $emit.config.defaultFormat is set to ISO8601, output looks similar to the following:

{ "flightTime" : "2025-01-10T20:17:38.387Z" }

config.compression

string

Optional

Name of the compression algorithm to use. Must be one of the following values:

  • "gzip"

  • "snappy"

config.compressionLevel

string

Conditional

Level of compression to apply to the emitted message. Supports values 1-9 inclusive; higher values mean more compression.

Defaults to 6.

This parameter is required for and limited to gzip. If you set config.compression to snappy, setting this parameter has no effect.

To ease ingestion of messages, Atlas Stream Processing supports the Basic JSON, which simplifies the RelaxedJSON format. The following table provides examples of these simplifications for all affected fields.

Field Type
relaxedJson
basicJson

Binary

{ "binary": { "$binary": { "base64": "gf1UcxdHTJ2HQ/EGQrO7mQ==", "subType": "00" }}}

{ "binary": "gf1UcxdHTJ2HQ/EGQrO7mQ=="}

Date

{ "date": { "$date": "2024-10-24T18:07:29.636Z"}}

{ "date": 1729625275856}

Decimal

{ "decimal": { "$numberDecimal": "9.9" }}

{ "decimal": "9.9" }

Timestamp

{ "timestamp": { "$timestamp": { "t": 1729793249, "i": 1 }}}

{ "timestamp": 1729793249000}

ObjectId

{ "_id": { "$oid": "671a8ce1497407eff0e17cba" }}

{ "_id": "6717fcbba18c8a8f74b6d977" }

Negative Infinity

{ "negInf": { "$numberDouble": "-Infinity" }}

{ "negInf": "-Infinity" }

Positive Infinity

{ "posInf": { "$numberDouble": "Infinity" }}

{ "posInf": "Infinity" }

Regular Expressions

{ "regex": { "$regularExpression": { "pattern": "ab+c", "options": "i" }}}

{ "regex": { "pattern": "ab+c", "options": "i" }}

UUID

{ "uuid": { "$binary": { "base64": "Kat+fHk6RkuAmotUmsU7gA==", "subType": "04" }}}

{ "uuid": "420b7ade-811a-4698-aa64-c8347c719cf1"}

$emit must be the last stage of any pipeline it appears in. You can use only one $emit stage per pipeline.

You can only write to a single Atlas time series collection per stream processor. If you specify a collection that doesn't exist, Atlas creates the collection with the time series fields you specified. You must specify an existing database.

You can use a dynamic expression as the value of the topic field to enable your stream processor to write to different target Apache Kafka topics on a message-by-message basis. The expression must evaluate to a string.

Example

You have a stream of transaction events that generates messages of the following form:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

To sort each of these into a distinct Apache Kafka topic, you can write the following $emit stage:

{
"$emit": {
"connectionName": "kafka1",
"topic": "$customerStatus"
}
}

This $emit stage:

  • Writes the Very Important Industries message to a topic named VIP.

  • Writes the N. E. Buddy message to a topic named employee.

  • Writes the Khan Traktor message to a topic named contractor.

For more information on dynamic expressions, see expression operators.

If you specify a topic that doesn't already exist, Apache Kafka automatically creates the topic when it receives the first message that targets it.

If you specify a topic with a dynamic expression, but Atlas Stream Processing cannot evaluate the expression for a given message, Atlas Stream Processing sends that message to the dead letter queue if configured and processes subsequent messages. If there is no dead letter queue configured, then Atlas Stream Processing skips the message completely and processes subsequent messages.

A streaming data source generates detailed weather reports from various locations, conformant to the schema of the Sample Weather Dataset. The following aggregation has three stages:

  1. The $source stage establishes a connection with the Apache Kafka broker collecting these reports in a topic named my_weatherdata, exposing each record as it is ingested to the subsequent aggregation stages. This stage also overrides the name of the timestamp field it projects, setting it to ingestionTime.

  2. The $match stage excludes documents that have an airTemperature.value of greater than or equal to 30.0 and passes along the documents with an airTemperature.value less than 30.0 to the next stage.

  3. The $addFields stage enriches the stream with metadata.

  4. The $emit stage writes the output to a topic named stream over the weatherStreamOutput Kafka broker connection.

{
"$source": {
"connectionName": "sample_weatherdata",
"topic": "my_weatherdata",
"tsFieldName": "ingestionTime"
}
},
{
"$match": {
"airTemperature.value": {
"$lt": 30
}
}
},
{
"$addFields": {
"processorMetadata": {
"$meta": "stream"
}
}
},
{
"$emit": {
"connectionName": "weatherStreamOutput",
"topic": "stream"
}
}

Documents in the stream topic take the following form:

{
"st": "x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8, 116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1", "AG1", "UG1", "SA1", "MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight": {
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime": {
"$date": "2024-09-26T17:34:41.843Z"
},
"_stream_meta": {
"source": {
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

Note

The preceding is a representative example. Streaming data are not static, and each user sees distinct documents.

Back

$tumblingWindow

On this page