The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.
stateDiagram-v2 direction LR BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/> LambdaInit: Lambda invocation BatchProcessor: Batch Processor RecordHandler: Record Handler function YourLogic: Your logic to process each batch item LambdaResponse: Lambda response BatchSource --> LambdaInit LambdaInit --> BatchProcessor BatchProcessor --> RecordHandler state BatchProcessor { [*] --> RecordHandler: Your function RecordHandler --> YourLogic } RecordHandler --> BatchProcessor: Collect results BatchProcessor --> LambdaResponse: Report items that failed processing
When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.
If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.
journey section Conditions Successful response: 5: Success Maximum retries: 3: Failure Records expired: 1: Failure
This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:
SQS queues. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
Kinesis data streams and DynamoDB streams. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it
We recommend implementing processing logic in an idempotent manner wherever possible.
You can find more details on how Lambda works with either SQS, Kinesis, or DynamoDB in the AWS Documentation.
Migrating to v3
If you're upgrading to v3, please review the Migration Guide v3 for important breaking changes including .NET 8 requirement and AWS SDK v4 migration.
For this feature to work, you need to (1) configure your Lambda function event source to use ReportBatchItemFailures, and (2) return a specific response to report which records failed to be processed.
You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
Batch processing can be configured with the settings bellow:
Setting
Description
Environment variable
Default
Error Handling Policy
The error handling policy to apply during batch processing.
POWERTOOLS_BATCH_ERROR_HANDLING_POLICY
DeriveFromEvent
Parallel Enabled
Controls if parallel processing of batch items is enabled.
POWERTOOLS_BATCH_PARALLEL_ENABLED
false
Max Degree of Parallelism
The maximum degree of parallelism to apply if parallel processing is enabled.
POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM
1
Throw on Full Batch Failure
Controls if a BatchProcessingException is thrown on full batch failure.
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
You do not need any additional IAM permissions to use this utility, except for what each event source requires.
AWSTemplateFormatVersion:"2010-09-09"Transform:AWS::Serverless-2016-10-31Description:Example project demoing SQS Queue processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)Globals:Function:Timeout:20Runtime:dotnet8MemorySize:1024Environment:Variables:POWERTOOLS_SERVICE_NAME:powertools-dotnet-sample-batch-sqsPOWERTOOLS_LOG_LEVEL:DebugPOWERTOOLS_LOGGER_CASE:PascalCasePOWERTOOLS_BATCH_ERROR_HANDLING_POLICY:DeriveFromEventPOWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM:1POWERTOOLS_BATCH_PARALLEL_ENABLED :falsePOWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE:trueResources:# --------------# KMS key for encrypted messages / recordsCustomerKey:Type:AWS::KMS::KeyProperties:Description:KMS key for encrypted queuesEnabled:trueKeyPolicy:Version:"2012-10-17"Statement:-Sid:Enable IAM User PermissionsEffect:AllowPrincipal:AWS:!Sub"arn:aws:iam::${AWS::AccountId}:root"Action:"kms:*"Resource:"*"-Sid:Allow AWS Lambda to use the keyEffect:AllowPrincipal:Service:lambda.amazonaws.comAction:-kms:Decrypt-kms:GenerateDataKeyResource:"*"CustomerKeyAlias:Type:AWS::KMS::AliasProperties:AliasName:!Subalias/${AWS::StackName}-kms-keyTargetKeyId:!RefCustomerKey# --------------# Batch Processing for SQS QueueSqsDeadLetterQueue:Type:AWS::SQS::QueueProperties:KmsMasterKeyId:!RefCustomerKeySqsQueue:Type:AWS::SQS::QueueProperties:RedrivePolicy:deadLetterTargetArn:!GetAttSqsDeadLetterQueue.ArnmaxReceiveCount:2KmsMasterKeyId:!RefCustomerKeySqsBatchProcessorFunction:Type:AWS::Serverless::FunctionProperties:CodeUri:./src/HelloWorld/Handler:HelloWorld::HelloWorld.Function::SqsHandlerUsingAttributePolicies:-Statement:-Sid:DlqPermissionsEffect:AllowAction:-sqs:SendMessage-sqs:SendMessageBatchResource:!GetAttSqsDeadLetterQueue.Arn-Sid:KmsKeyPermissionsEffect:AllowAction:-kms:Decrypt-kms:GenerateDataKeyResource:!GetAttCustomerKey.ArnEvents:SqsBatch:Type:SQSProperties:BatchSize:5Enabled:trueFunctionResponseTypes:-ReportBatchItemFailuresQueue:!GetAttSqsQueue.ArnSqsBatchProcessorFunctionLogGroup:Type:AWS::Logs::LogGroupProperties:LogGroupName:!Sub"/aws/lambda/${SqsBatchProcessorFunction}"RetentionInDays:7Outputs:SqsQueueUrl:Description:"SQSQueueURL"Value:!RefSqsQueue
AWSTemplateFormatVersion:"2010-09-09"Transform:AWS::Serverless-2016-10-31Description:Example project demoing Kinesis Data Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)Globals:Function:Timeout:20Runtime:dotnet8MemorySize:1024Environment:Variables:POWERTOOLS_SERVICE_NAME:powertools-dotnet-sample-batch-kinesisPOWERTOOLS_LOG_LEVEL:DebugPOWERTOOLS_LOGGER_CASE:PascalCasePOWERTOOLS_BATCH_ERROR_HANDLING_POLICY:DeriveFromEventPOWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM:1POWERTOOLS_BATCH_PARALLEL_ENABLED :falsePOWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE:trueResources:# --------------# KMS key for encrypted messages / recordsCustomerKey:Type:AWS::KMS::KeyProperties:Description:KMS key for encrypted queuesEnabled:trueKeyPolicy:Version:"2012-10-17"Statement:-Sid:Enable IAM User PermissionsEffect:AllowPrincipal:AWS:!Sub"arn:aws:iam::${AWS::AccountId}:root"Action:"kms:*"Resource:"*"-Sid:Allow AWS Lambda to use the keyEffect:AllowPrincipal:Service:lambda.amazonaws.comAction:-kms:Decrypt-kms:GenerateDataKeyResource:"*"CustomerKeyAlias:Type:AWS::KMS::AliasProperties:AliasName:!Subalias/${AWS::StackName}-kms-keyTargetKeyId:!RefCustomerKey# --------------# Batch Processing for Kinesis Data StreamKinesisStreamDeadLetterQueue:Type:AWS::SQS::QueueProperties:KmsMasterKeyId:!RefCustomerKeyKinesisStream:Type:AWS::Kinesis::StreamProperties:ShardCount:1StreamEncryption:EncryptionType:KMSKeyId:!RefCustomerKeyKinesisStreamConsumer:Type:AWS::Kinesis::StreamConsumerProperties:ConsumerName:powertools-dotnet-sample-batch-kds-consumerStreamARN:!GetAttKinesisStream.ArnKinesisBatchProcessorFunction:Type:AWS::Serverless::FunctionProperties:Policies:-Statement:-Sid:KinesisStreamConsumerPermissionsEffect:AllowAction:-kinesis:DescribeStreamConsumerResource:-!GetAttKinesisStreamConsumer.ConsumerARN-Sid:DlqPermissionsEffect:AllowAction:-sqs:SendMessage-sqs:SendMessageBatchResource:!GetAttKinesisStreamDeadLetterQueue.Arn-Sid:KmsKeyPermissionsEffect:AllowAction:-kms:Decrypt-kms:GenerateDataKeyResource:!GetAttCustomerKey.ArnCodeUri:./src/HelloWorld/Handler:HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttributeEvents:Kinesis:Type:KinesisProperties:BatchSize:5BisectBatchOnFunctionError:trueDestinationConfig:OnFailure:Destination:!GetAttKinesisStreamDeadLetterQueue.ArnEnabled:trueFunctionResponseTypes:-ReportBatchItemFailuresMaximumRetryAttempts:2ParallelizationFactor:1StartingPosition:LATESTStream:!GetAttKinesisStreamConsumer.ConsumerARNKinesisBatchProcessorFunctionLogGroup:Type:AWS::Logs::LogGroupProperties:LogGroupName:!Sub"/aws/lambda/${KinesisBatchProcessorFunction}"RetentionInDays:7Outputs:KinesisStreamArn:Description:"KinesisStreamARN"Value:!GetAttKinesisStream.Arn
AWSTemplateFormatVersion:"2010-09-09"Transform:AWS::Serverless-2016-10-31Description:Example project demoing DynamoDB Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET)Globals:Function:Timeout:20Runtime:dotnet8MemorySize:1024Environment:Variables:POWERTOOLS_SERVICE_NAME:powertools-dotnet-sample-batch-ddbPOWERTOOLS_LOG_LEVEL:DebugPOWERTOOLS_LOGGER_CASE:PascalCasePOWERTOOLS_BATCH_ERROR_HANDLING_POLICY:DeriveFromEventPOWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM:1POWERTOOLS_BATCH_PARALLEL_ENABLED :falsePOWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE:trueResources:# --------------# KMS key for encrypted messages / recordsCustomerKey:Type:AWS::KMS::KeyProperties:Description:KMS key for encrypted queuesEnabled:trueKeyPolicy:Version:"2012-10-17"Statement:-Sid:Enable IAM User PermissionsEffect:AllowPrincipal:AWS:!Sub"arn:aws:iam::${AWS::AccountId}:root"Action:"kms:*"Resource:"*"-Sid:Allow AWS Lambda to use the keyEffect:AllowPrincipal:Service:lambda.amazonaws.comAction:-kms:Decrypt-kms:GenerateDataKeyResource:"*"CustomerKeyAlias:Type:AWS::KMS::AliasProperties:AliasName:!Subalias/${AWS::StackName}-kms-keyTargetKeyId:!RefCustomerKey# --------------# Batch Processing for DynamoDb (DDB) StreamDdbStreamDeadLetterQueue:Type:AWS::SQS::QueueProperties:KmsMasterKeyId:!RefCustomerKeyDdbTable:Type:AWS::DynamoDB::TableProperties:BillingMode:PAY_PER_REQUESTAttributeDefinitions:-AttributeName:idAttributeType:SKeySchema:-AttributeName:idKeyType:HASHStreamSpecification:StreamViewType:NEW_AND_OLD_IMAGESDdbStreamBatchProcessorFunction:Type:AWS::Serverless::FunctionProperties:CodeUri:./src/HelloWorld/Handler:HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttributePolicies:-AWSLambdaDynamoDBExecutionRole-Statement:-Sid:DlqPermissionsEffect:AllowAction:-sqs:SendMessage-sqs:SendMessageBatchResource:!GetAttDdbStreamDeadLetterQueue.Arn-Sid:KmsKeyPermissionsEffect:AllowAction:-kms:GenerateDataKeyResource:!GetAttCustomerKey.ArnEvents:Stream:Type:DynamoDBProperties:BatchSize:5BisectBatchOnFunctionError:trueDestinationConfig:OnFailure:Destination:!GetAttDdbStreamDeadLetterQueue.ArnEnabled:trueFunctionResponseTypes:-ReportBatchItemFailuresMaximumRetryAttempts:2ParallelizationFactor:1StartingPosition:LATESTStream:!GetAttDdbTable.StreamArnDdbStreamBatchProcessorFunctionLogGroup:Type:AWS::Logs::LogGroupProperties:LogGroupName:!Sub"/aws/lambda/${DdbStreamBatchProcessorFunction}"RetentionInDays:7Outputs:DdbTableName:Description:"DynamoDBTableName"Value:!RefDdbTable
publicclassProduct{publicintId{get;set;}publicstring?Name{get;set;}publicdecimalPrice{get;set;}}publicclassTypedSqsRecordHandler:ITypedRecordHandler<Product>// (1)!{publicasyncTask<RecordHandlerResult>HandleAsync(Productproduct,CancellationTokencancellationToken){/* * Your business logic with automatic deserialization. * If an exception is thrown, the item will be marked as a partial batch item failure. */Logger.LogInformation($"Processing product {product.Id} - {product.Name} (${product.Price})");if(product.Id==4)// (2)!{thrownewArgumentException("Error on id 4");}returnawaitTask.FromResult(RecordHandlerResult.None);// (3)!}}[BatchProcessor(TypedRecordHandler = typeof(TypedSqsRecordHandler))]publicBatchItemFailuresResponseHandlerUsingTypedAttribute(SQSEvent_){returnTypedSqsBatchProcessor.Result.BatchItemFailuresResponse;// (4)!}
Step 1. Creates a class that implements ITypedRecordHandler interface - Product is automatically deserialized from SQS message body.
Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
Step 3. RecordHandlerResult can return empty (None) or some data.
Step 4. Lambda function returns the Partial batch response using TypedSqsBatchProcessor
publicclassCustomSqsRecordHandler:ISqsRecordHandler// (1)!{publicasyncTask<RecordHandlerResult>HandleAsync(SQSEvent.SQSMessagerecord,CancellationTokencancellationToken){/* * Your business logic. * If an exception is thrown, the item will be marked as a partial batch item failure. */varproduct=JsonSerializer.Deserialize<Product>(record.Body);if(product.Id==4)// (2)!{thrownewArgumentException("Error on id 4");}returnawaitTask.FromResult(RecordHandlerResult.None);// (3)!}}[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))]publicBatchItemFailuresResponseHandlerUsingAttribute(SQSEvent_){returnSqsBatchProcessor.Result.BatchItemFailuresResponse;// (4)!}
Step 1. Creates a class that implements ISqsRecordHandler interface and the HandleAsync method.
Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
Step 3. RecordHandlerResult can return empty (None) or some data.
Step 4. Lambda function returns the Partial batch response
When using SQS FIFO queues, we will stop processing messages after the first failure, and return all failed and unprocessed messages in batchItemFailures. This helps preserve the ordering of messages in your queue. Powertools automatically detects a FIFO queue.
publicclassOrder{publicstring?OrderId{get;set;}publicDateTimeOrderDate{get;set;}publicList<Product>Items{get;set;}=new();publicdecimalTotalAmount{get;set;}}internalclassTypedKinesisRecordHandler:ITypedRecordHandler<Order>// (1)!{publicasyncTask<RecordHandlerResult>HandleAsync(Orderorder,CancellationTokencancellationToken){Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items");if(order.TotalAmount<=0)// (2)!{thrownewArgumentException("Invalid order total");}returnawaitTask.FromResult(RecordHandlerResult.None);// (3)!}}[BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]publicBatchItemFailuresResponseHandlerUsingTypedAttribute(KinesisEvent_){returnTypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse;// (4)!}
Step 1. Creates a class that implements ITypedRecordHandler interface - Order is automatically deserialized from Kinesis record data.
Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
Step 3. RecordHandlerResult can return empty (None) or some data.
Step 4. Lambda function returns the Partial batch response using TypedKinesisEventBatchProcessor
Processing batches from Kinesis using Lambda handler decorator works in three stages:
Decorate your handler with BatchProcessor attribute
Create a class that implements IKinesisEventRecordHandler interface and the HandleAsync method.
Pass the type of that class to RecordHandler property of the BatchProcessor attribute
Return BatchItemFailuresResponse from Lambda handler using KinesisEventBatchProcessor.Result.BatchItemFailuresResponse
1 2 3 4 5 6 7 8 9101112131415161718192021
internalclassCustomKinesisEventRecordHandler:IKinesisEventRecordHandler// (1)!{publicasyncTask<RecordHandlerResult>HandleAsync(KinesisEvent.KinesisEventRecordrecord,CancellationTokencancellationToken){varproduct=JsonSerializer.Deserialize<Product>(record.Kinesis.Data);if(product.Id==4)// (2)!{thrownewArgumentException("Error on id 4");}returnawaitTask.FromResult(RecordHandlerResult.None);// (3)!}}[BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))]publicBatchItemFailuresResponseHandlerUsingAttribute(KinesisEvent_){returnKinesisEventBatchProcessor.Result.BatchItemFailuresResponse;// (4)!}
Step 1. Creates a class that implements the IKinesisEventRecordHandler interface and the HandleAsync method.
Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
Step 3. RecordHandlerResult can return empty (None) or some data.
Step 4. Lambda function returns the Partial batch response
Processing batches from DynamoDB Streams using Lambda handler decorator works in three stages:
Decorate your handler with BatchProcessor attribute
Create a class that implements IDynamoDbStreamRecordHandler and the HandleAsync method.
Pass the type of that class to RecordHandler property of the BatchProcessor attribute
Return BatchItemFailuresResponse from Lambda handler using DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse
1 2 3 4 5 6 7 8 9101112131415161718192021
internalclassCustomDynamoDbStreamRecordHandler:IDynamoDbStreamRecordHandler// (1)!{publicasyncTask<RecordHandlerResult>HandleAsync(DynamoDBEvent.DynamodbStreamRecordrecord,CancellationTokencancellationToken){varproduct=JsonSerializer.Deserialize<Product>(record.Dynamodb.NewImage["Product"].S);if(product.Id==4)// (2)!{thrownewArgumentException("Error on id 4");}returnawaitTask.FromResult(RecordHandlerResult.None);// (3)!}}[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))]publicBatchItemFailuresResponseHandlerUsingAttribute(DynamoDBEvent_){returnDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse;// (4)!}
Step 1. Creates a class that implements the IDynamoDbStreamRecordHandler and the HandleAsync method.
Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
Step 3. RecordHandlerResult can return empty (None) or some data.
Step 4. Lambda function returns the Partial batch response
By default, we catch any exception raised by your custom record handler HandleAsync method (ISqsRecordHandler, IKinesisEventRecordHandler, IDynamoDbStreamRecordHandler). This allows us to (1) continue processing the batch, (2) collect each batch item that failed processing, and (3) return the appropriate response correctly without failing your Lambda function execution.
1 2 3 4 5 6 7 8 91011121314151617181920
publicclassCustomSqsRecordHandler:ISqsRecordHandler// (1)!{publicasyncTask<RecordHandlerResult>HandleAsync(SQSEvent.SQSMessagerecord,CancellationTokencancellationToken){/* * Your business logic. * If an exception is thrown, the item will be marked as a partial batch item failure. */varproduct=JsonSerializer.Deserialize<Product>(record.Body);if(product.Id==4)// (2)!{thrownewArgumentException("Error on id 4");}returnawaitTask.FromResult(RecordHandlerResult.None);// (3)!}}
You can specify the error handling policy applied during batch processing.
ErrorHandlingPolicy is used to control the error handling policy of the batch item processing. With a value of DeriveFromEvent (default), the specific BatchProcessor, determines the policy based on the incoming event.
For example, the SqsBatchProcessor looks at the EventSourceArn to determine if the ErrorHandlingPolicy should be StopOnFirstBatchItemFailure (for FIFO queues) or ContinueOnBatchItemFailure (for standard queues). For StopOnFirstBatchItemFailure the batch processor stops processing and marks any remaining records as batch item failures. For ContinueOnBatchItemFailure the batch processor continues processing batch items regardless of item failures.
Policy
Description
DeriveFromEvent
Auto-derive the policy based on the event.
ContinueOnBatchItemFailure
Continue processing regardless of whether other batch items fails during processing.
StopOnFirstBatchItemFailure
Stop processing other batch items after the first batch item has failed processing. This is useful to preserve ordered processing of events.
Note
When using StopOnFirstBatchItemFailure and parallel processing is enabled, all batch items already scheduled to be processed, will be allowed to complete before the batch processing stops.
Therefore, if order is important, it is recommended to use sequential (non-parallel) processing together with this value."
To change the default error handling policy, you can set the POWERTOOLS_BATCH_ERROR_HANDLING_POLICY Environment Variable.
Another approach is to decorate the handler and use one of the policies in the ErrorHandlingPolicy Enum property of the BatchProcessor attribute
All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:
All records successfully processed. We will return an empty list of item failures {'batchItemFailures': []}.
Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing.
All records failed to be processed. By defaullt, we will throw a BatchProcessingException with a list of all exceptions raised during processing to reflect the failure in your operational metrics. However, in some scenarios, this might not be desired. See Working with full batch failures for more information.
The following sequence diagrams explain how each Batch processor behaves under different scenarios.
For typed handlers that need access to Lambda context, use ITypedRecordHandlerWithContext<T>:
1 2 3 4 5 6 7 8 910111213141516
publicclassProductHandlerWithContext:ITypedRecordHandlerWithContext<Product>{publicasyncTask<RecordHandlerResult>HandleAsync(Productproduct,ILambdaContextcontext,CancellationTokencancellationToken){Logger.LogInformation($"Processing product {product.Id} in request {context.AwsRequestId}");Logger.LogInformation($"Remaining time: {context.RemainingTime.TotalSeconds}s");// Use context for timeout handlingif(context.RemainingTime.TotalSeconds<5){Logger.LogWarning("Low remaining time, processing quickly");}returnRecordHandlerResult.None;}}
You can use Batch processing without using the decorator.
Calling the ProcessAsync method on the Instance of the static BatchProcessor (SqsBatchProcessor, DynamoDbStreamBatchProcessor, KinesisEventBatchProcessor)
1 2 3 4 5 6 7 8 910111213
publicasyncTask<BatchItemFailuresResponse>HandlerUsingUtility(DynamoDBEventdynamoDbEvent){varresult=awaitDynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent,RecordHandler<DynamoDBEvent.DynamodbStreamRecord>.From(record=>{varproduct=JsonSerializer.Deserialize<JsonElement>(record.Dynamodb.NewImage["Product"].S);if(product.GetProperty("Id").GetInt16()==4){thrownewArgumentException("Error on 4");}}));returnresult.BatchItemFailuresResponse;}
To make the handler testable you can use Dependency Injection to resolve the BatchProcessor (SqsBatchProcessor, DynamoDbStreamBatchProcessor, KinesisEventBatchProcessor) instance and then call the ProcessAsync method.
You can set the POWERTOOLS_BATCH_PARALLEL_ENABLED Environment Variable to true or set the property BatchParallelProcessingEnabled on the Lambda decorator to process messages concurrently.
You can also set POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM Environment Variable to the number of parallelism you which.
Note
MaxDegreeOfParallelism is used to control the parallelism of the batch item processing.
With a value of 1, the processing is done sequentially (default). Sequential processing is recommended when preserving order is important - i.e. with SQS FIFIO queues.
With a value > 1, the processing is done in parallel. Doing parallel processing can enable processing to complete faster, i.e., when processing does downstream service calls.
With a value of -1, the parallelism is automatically configured to be the vCPU count of the Lambda function. Internally, the Batch Processing Utility utilizes Parallel.ForEachAsync Method and the ParallelOptions.MaxDegreeOfParallelism Property to enable this functionality.
When is this useful?
Your use case might be able to process multiple records at the same time without conflicting with one another.
For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
By default, the BatchProcessor will throw a BatchProcessingException if all records in the batch fail to process. We do this to reflect the failure in your operational metrics.
When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the Lambda service will scale down the concurrency of your function, potentially impacting performance.
For these scenarios, you can set POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = false, or the equivalent on either the BatchProcessor decorator or on the ProcessingOptions object. See examples below.
You might want to bring custom logic to the existing BatchProcessor to slightly override how we handle successes and failures.
For these scenarios, you can create a class that inherits from BatchProcessor (SqsBatchProcessor, DynamoDbStreamBatchProcessor, KinesisEventBatchProcessor) and quickly override ProcessAsync and HandleRecordFailureAsync methods:
ProcessAsync() – Keeps track of successful batch records
HandleRecordFailureAsync() – Keeps track of failed batch records
Example
Let's suppose you'd like to add a metric named BatchRecordFailures for each batch record that failed processing. And also override the default error handling policy to stop on first item failure.
publicclassCustomDynamoDbStreamBatchProcessor:DynamoDbStreamBatchProcessor{publicoverrideasyncTask<ProcessingResult<DynamoDBEvent.DynamodbStreamRecord>>ProcessAsync(DynamoDBEvent@event,IRecordHandler<DynamoDBEvent.DynamodbStreamRecord>recordHandler,ProcessingOptionsprocessingOptions){ProcessingResult=newProcessingResult<DynamoDBEvent.DynamodbStreamRecord>();// Prepare batch records (order is preserved)varbatchRecords=GetRecordsFromEvent(@event).Select(x=>newKeyValuePair<string,DynamoDBEvent.DynamodbStreamRecord>(GetRecordId(x),x)).ToArray();// We assume all records fail by default to avoid loss of datavarfailureBatchRecords=batchRecords.Select(x=>newKeyValuePair<string,RecordFailure<DynamoDBEvent.DynamodbStreamRecord>>(x.Key,newRecordFailure<DynamoDBEvent.DynamodbStreamRecord>{Exception=newUnprocessedRecordException($"Record: '{x.Key}' has not been processed."),Record=x.Value}));// Override to fail on first failurevarerrorHandlingPolicy=BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure;varsuccessRecords=newDictionary<string,RecordSuccess<DynamoDBEvent.DynamodbStreamRecord>>();varfailureRecords=newDictionary<string,RecordFailure<DynamoDBEvent.DynamodbStreamRecord>>(failureBatchRecords);try{foreach(varpairinbatchRecords){var(recordId,record)=pair;try{varresult=awaitHandleRecordAsync(record,recordHandler,CancellationToken.None);failureRecords.Remove(recordId,out_);successRecords.TryAdd(recordId,newRecordSuccess<DynamoDBEvent.DynamodbStreamRecord>{Record=record,RecordId=recordId,HandlerResult=result});}catch(Exceptionex){// Capture exceptionfailureRecords[recordId]=newRecordFailure<DynamoDBEvent.DynamodbStreamRecord>{Exception=newRecordProcessingException($"Failed processing record: '{recordId}'. See inner exception for details.",ex),Record=record,RecordId=recordId};Metrics.AddMetric("BatchRecordFailures",1,MetricUnit.Count);try{// Invoke hookawaitHandleRecordFailureAsync(record,ex);}catch{// NOOP}// Check if we should stop record processing on first error// ReSharper disable once ConditionIsAlwaysTrueOrFalseif(errorHandlingPolicy==BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure){// This causes the loop's (inner) cancellation token to be cancelled for all operations already scheduled internallythrownewCircuitBreakerException("Error handling policy is configured to stop processing on first batch item failure. See inner exception for details.",ex);}}}}catch(Exceptionex)when(exisCircuitBreakerExceptionorOperationCanceledException){// NOOP}ProcessingResult.BatchRecords.AddRange(batchRecords.Select(x=>x.Value));ProcessingResult.BatchItemFailuresResponse.BatchItemFailures.AddRange(failureRecords.Select(x=>newBatchItemFailuresResponse.BatchItemFailure{ItemIdentifier=x.Key}));ProcessingResult.FailureRecords.AddRange(failureRecords.Values);ProcessingResult.SuccessRecords.AddRange(successRecords.Values);returnProcessingResult;}// ReSharper disable once RedundantOverriddenMemberprotectedoverrideasyncTaskHandleRecordFailureAsync(DynamoDBEvent.DynamodbStreamRecordrecord,Exceptionexception){awaitbase.HandleRecordFailureAsync(record,exception);}}
publicclassCustomSqsRecordHandler:ISqsRecordHandler{publicasyncTask<RecordHandlerResult>HandleAsync(SQSEvent.SQSMessagerecord,CancellationTokencancellationToken){varproduct=JsonSerializer.Deserialize<JsonElement>(record.Body);if(product.GetProperty("Id").GetInt16()==4){thrownewArgumentException("Error on 4");}returnawaitTask.FromResult(RecordHandlerResult.None);}}