Photo by Daniel Seßler on Unsplash
Recently, the number one use case for streaming responses is probably interactions with LLMs. It allows providing a better user experience in chat interfaces.
Now it is possible to stream a response from Lambda functions using the API REST Gateway. It is big news, as previously streaming response from AWS Lambda has been working in a limited way. It was possible only using the Function URL functionality, and it didn't fit into all use cases.
Goal
In this blog post, I will create a simple AWS Lambda that calls AWS Bedrock Model and streams the response back to the caller. I will use Rust to keep my Lambda small, fast and cheap.
The infrastructure is to be defined with AWS CDK.
Architecture
Lambda Function
I am using rig.rs as my AI framework. Let' start from defining main function
mod http_handler; use http_handler::function_handler; use lambda_runtime::{run, service_fn, tracing, Error}; use rig::client::completion::CompletionClientDyn; use rig_bedrock::{client::Client, completion::AMAZON_NOVA_MICRO}; #[tokio::main] async fn main() -> Result<(), Error> { tracing::init_default_subscriber(); let aws_config = aws_config::from_env().region("us-east-1").load().await; let aws_client = aws_sdk_bedrockruntime::Client::new(&aws_config); let agent = Client::from(aws_client) .agent(AMAZON_NOVA_MICRO) .preamble("be concise") .temperature(0.5) .build(); run(service_fn(|ev| function_handler(agent.clone(), ev))).await } Now the handler. It starts almost identically as a regular one. The main difference is that Response and Body types are imported from lambda_runtime::streaming module.
#[derive(Serialize, Deserialize, Debug, Clone)] pub struct PromptRequest { prompt: String, } pub(crate) async fn function_handler( agent: Agent<CompletionModelHandle<'static>>, event: LambdaEvent<ApiGatewayProxyRequest>, ) -> Result<Response<Body>, Error> { let (mut tx, rx) = channel(); println!("prompt: {:?}", &event.payload); let body = event .payload .body .ok_or(Box::new(Error::from("failed to read body"))) .expect("failed to read body"); let prompt_request: PromptRequest = serde_json::from_str(&body).map_err(|_| Error::from("failed to parse body"))?; let mut bedrock_stream = agent.stream_prompt(prompt_request.prompt).await; // ... The crucial thing is that we are creating a transmitter tx and a receiver rx for handling stream operations.
In the rig library, agent.stream_prompt returns a stream of MultiTurnStreamItem. This is the enum:
// rig crate pub enum MultiTurnStreamItem<R> { /// A streamed assistant content item. StreamAssistantItem(StreamedAssistantContent<R>), /// A streamed user content item (mostly for tool results). StreamUserItem(StreamedUserContent), /// The final result from the stream. FinalResponse(FinalResponse), } The idea is that having strongly typed streamed items, I can easily pattern match when receiving "something" from the stream, and fully control the logic. In my case, I will stream text from AssistantItem.
The tricky part is how to "translate" the streamed response from rig to lambda format. To make it possible, we need to spawn a new tokio thread, so the execution of the function won't finish before receiving the response from the Bedrock model.
Then I will read from bedrock_stream using the next() method from the futures crate.
tokio::spawn(async move { while let Some(chunk_result) = bedrock_stream.next().await { match chunk_result { Ok(chunk) => match chunk { MultiTurnStreamItem::StreamAssistantItem(assistant_item) => { match assistant_item { StreamedAssistantContent::Text(text) => { if tx .send_data(Bytes::from(text.text().as_bytes().to_vec())) .await .is_err() { error!("error client disconnected"); break; } } StreamedAssistantContent::Reasoning(_reasoning) => { if tx .send_data(Bytes::from("--- thinking ---".as_bytes())) .await .is_err() { error!("error client disconnected"); break; } } _ => { info!("processing tools") } } } MultiTurnStreamItem::FinalResponse(_agent_response) => { info!("final response"); } _ => { info!("-- processing") } }, Err(_) => { error!("stream error"); break; } } } }); At the end of the execution, I return rx
Ok(Response::from(rx)) Infrastructure
Setup is basically identical to the buffered version of lambda integration for the REST API Gateway. The only thing is a new responseTransferMode property set as STREAM. Make sure that you are using the newest version of the cdk-lib
import * as cdk from "aws-cdk-lib"; import * as apigw from "aws-cdk-lib/aws-apigateway"; import * as iam from "aws-cdk-lib/aws-iam"; import { Construct } from "constructs"; import { RustFunction } from "cargo-lambda-cdk"; export class StreamingInfraStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: cdk.StackProps) { super(scope, id, props); const handler = new RustFunction(this, "Streaming Function", { manifestPath: __dirname + "/../../streaming_function/Cargo.toml", }); handler.addToRolePolicy( new iam.PolicyStatement({ effect: iam.Effect.ALLOW, actions: [ "bedrock:InvokeModel", "bedrock:InvokeModelWithResponseStream", ], resources: [`arn:aws:bedrock:${this.region}::foundation-model/*`], }), ); const api = new apigw.RestApi(this, "Streaming API", { restApiName: "Streaming API", description: "API for streaming data", deployOptions: { stageName: "prod", }, }); const promptResource = api.root.addResource("prompt"); const promptIntegration = new apigw.LambdaIntegration(handler, { responseTransferMode: apigw.ResponseTransferMode.STREAM, }); const promptResourceMethod = promptResource.addMethod( "POST", promptIntegration, ); new cdk.CfnOutput(this, "ApiEndpointUrl", { value: api.url, description: "Base URL of the REST API Gateway", exportName: "MyRestApiEndpoint", }); } } Testing
To test the solution, I can use curl and see the response streamed back
The response is almost instant, but we can still see that the text is rendered gradually.
Summary
Today, the API REST Gateway can stream responses from Lambda functions. It makes it much easier to utilize serverless infrastructure to provide users with output from LLMs.
Using Rust helps keep the latency at a low level, with the init duration around 100ms for a cold start. As a strongly typed language, it also makes it easier to control the flow of the streaming.


Top comments (0)