New in MongoDB Atlas Stream Processing: External Function Support

Laura Zhukas

Today we're excited to introduce External Functions, a new capability in MongoDB Atlas Stream Processing that lets you invoke AWS Lambda, directly from your streaming pipelines. The addition of External Functions to Atlas Stream Processing unlocks new ways to enrich, validate, and transform data in-flight, enabling smarter and more modular event-driven applications. This functionality is available through a new pipeline stage, $externalFunction.

What are external functions?

External functions allow you to integrate Atlas Stream Processing with external logic services such as AWS Lambda. This lets you reuse existing business logic, perform AI/ML inference, or enrich and validate data as it moves through your pipeline, all without needing to rebuild that logic directly in your pipeline definition.

AWS Lambda is a serverless compute service that runs your code in response to events, scales automatically, and supports multiple languages (JavaScript, Python, Go, etc.). Because there’s no infrastructure to manage, Lambda is ideal for event-driven systems. Now, by using external functions, you can seamlessly plug that logic into your streaming workloads.

Where $externalFunction fits in your pipeline

MongoDB Atlas Stream Processing can connect to a wide range of sources and output to various sinks. The diagram below shows a typical streaming architecture: Atlas Stream Processing ingests data, enriches it with stages like $https and $externalFunction, and routes the transformed results to various destinations.

Figure 1. A high-level visual of a stream processing pipeline.
This diagram is broken down into three portions from left to right. On the left, is a box labeled sources, with the description connect to a variety of streaming sources or Atlas databases. This box then points to the center, which is a box labeled Atlas Stream Processing; there is an arrow pointing up and an arrow pointing down from there. The up arrow goes to HTTPS operator, and the down arrow goes to synchronous external function. To the right, is a box labeled Sinks, with a description of land data in Atlas databases or streaming destinations.

The $externalFunction stage can be placed anywhere in your pipeline (except as the initial source stage) allowing you to inject external logic at any step. Atlas Stream Processing supports two modes for invoking external functions—synchronous and asynchronous.

Synchronous execution type

In synchronous mode, the pipeline calls the Lambda function and waits for a response. The result is stored in a user-defined field (using the “as” key) and passed into the following stages.

let syncEF = { 
 $externalFunction: {
 connectionName: "myLambdaConnection",
 functionName: "arn:aws:lambda:region:account-id:function:function-name",
 execution: "sync",
 as: "response",
 onError: "fail",
 payload: [
 { $replaceRoot: { newRoot: "$fullDocument.payloadToSend" } },
 { $addFields: { sum: { $sum: "$randomArray" }}},
 { $project: { success: 1, sum: 1 }}
 ]
 }
}

Let’s walk through what each part of the $externalFunction stage does in this synchronous setup:

  • connectionName: external function connection name specified in the Connection Registry.

  • functionName: full AWS ARN or the name of the AWS Lambda function.

  • execution: Indicates synchronous execution ("sync") as opposed to asynchronous (“async).

  • as: specifies the Lambda response will be stored in the “response” field.

  • onError: behavior when the operator encounters an error (in this case "fail" stops the processor). The default is to add the event to the dead letter queue.

  • payload: inner pipeline that allows you to customize the request body sent, using this allows you to decrease the size of the data passed and ensure only relevant data is sent to the external function.

This type is useful when you want to enrich or transform a document using external logic before it proceeds through the rest of the pipeline.

Asynchronous execution type

In async mode, the function is called, but the pipeline does not wait for a response. This is useful when you want to notify downstream systems, trigger external workflows, or pass data into AWS without halting the pipeline.

let asyncEF = {
 $externalFunction: {
 connectionName: "EF-Connection",
 functionName: "arn:aws:lambda:us-west-1:12112121212:function:EF-Test",
 execution: "async"
 }
}

Use the async execution type for propagating information outward, for example:

  • Triggering downstream AWS applications or analytics

  • Notifying external systems

  • Firing off alerts or billing logic

Real-world use case: Solar device diagnostics

To illustrate the power of external functions, let’s walk through an example: a solar energy company wants to monitor real-time telemetry from thousands of solar devices. Each event includes sensor readings (e.g., temperature, power output) and metadata like device_id and timestamp. These events need to be processed, enriched and then stored into a MongoDB Atlas collection for dashboards and alerts.

This can easily be accomplished using a synchronous external function. Each event will be sent to a Lambda function that enriches the record with a status (e.g., ok, warning, critical) as well as diagnostic comments. After which the function waits for the enriched events to be returned and then sends them to the desired MongoDB collection.

Step 1: Define the external function connection

First, create a new AWS Lambda connection in the Connection Registry within Atlas. You can authenticate using Atlas's Unified AWS Access, which securely connects Atlas and your AWS account.

Figure 2. Adding an AWS Lambda connection in the UI.
Screenshot of the Atlas UI for adding a AWS Lambda connection.

2. Implement the lambda function

Here’s a simple diagnostic function. It receives solar telemetry data, checks it against thresholds, and returns a structured result.

export const handler = async (event) => {
 const {
 device_id,
 group_id,
 watts,
 temp,
 max_watts,
 timestamp
 } = event;


 // Default thresholds
 const expectedTempRange = [20, 40]; // Celsius
 const wattsLowerBound = 0.6 * max_watts; // 60% of max output


 let status = "ok";
 let messages = [];


 // Wattage check
 if (watts < wattsLowerBound) {
 status = "warning";
 messages.push(`Observed watts (${watts}) below 60% of max_watts (${max_watts}).`);
 }


 // Temperature check
 if (temp < expectedTempRange[0] || temp > expectedTempRange[1]) {
 status = "warning";
 messages.push(`Temperature (${temp}°C) out of expected range [${expectedTempRange[0]}–${expectedTempRange[1]}].`);
 }


 // If multiple warnings, escalate to critical
 if (messages.length > 1) {
 status = "critical";
 }


 return {
 device_id,
 status,
 timestamp,
 watts_expected_range: [wattsLowerBound, max_watts],
 temp_expected_range: expectedTempRange,
 comment: messages.length ? messages.join(" ") : "All readings within expected ranges."
 };
};

3. Create the streaming pipeline

Using VS Code, define a stream processor using the sample solar stream as input.

let s = {
 $source: {
 connectionName: 'sample_stream_solar'
 }
};


// Define the External Function
let EFStage = {
 $externalFunction: {
 connectionName: "telemetryCheckExternalFunction",
 onError: "fail",
 functionName: "arn:aws:lambda:us-east-1:121212121212:function:checkDeviceTelemetry",
 as: "responseFromLambda",
 }
};
// Replace the original document with the Lambda response
let projectStage = {
 $replaceRoot: {
 newRoot: "$responseFromLambda"
 }
};
// Merge the results into a DeviceTelemetryResults collection
let sink = {
 $merge: {
 into: {
 connectionName: "IoTDevicesCluster",
 db: "SolarDevices",
 coll: "DeviceTelemetryResults"
 }
 }
};


sp.createStreamProcessor("monitorSolarDevices", [s, EFStage, projectStage, sink]);
sp.monitorSolarDevices.start();

Once running, the processor ingests live telemetry data, invokes the Lambda diagnostics logic, and returns enriched results to MongoDB Atlas, complete with status and diagnostic comments.

4. View enriched results in MongoDB Atlas

Explore the enriched data in MongoDB Atlas using the Data Explorer. For example, filter all documents where status = "ok" after a specific date.

Figure 3. Data Explorer filtering for all documents with a status of “ok” from June 14 onwards.
Screenshot of the Data Explorer UI.

Smarter stream processing with external logic

MongoDB Atlas Stream Processing external functions allow you to enrich your data stream with logic that lives outside the pipeline, making your processing smarter and more adaptable. In this example, we used AWS Lambda to apply device diagnostics in real-time and store results in MongoDB. You could easily extend this to use cases in fraud detection, personalization, enrichment from third-party APIs, and more.

Log in today to get started, or check out our documentation to create your first external function. Have an idea for how you'd use external functions in your pipelines? Let us know in the MongoDB community forum!