Skip to content

Batch Processing

The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams.

stateDiagram-v2 direction LR BatchSource: Amazon SQS <br/><br/> Amazon Kinesis Data Streams <br/><br/> Amazon DynamoDB Streams <br/><br/> LambdaInit: Lambda invocation BatchProcessor: Batch Processor RecordHandler: Record Handler function YourLogic: Your logic to process each batch item LambdaResponse: Lambda response BatchSource --> LambdaInit LambdaInit --> BatchProcessor BatchProcessor --> RecordHandler state BatchProcessor { [*] --> RecordHandler: Your function RecordHandler --> YourLogic } RecordHandler --> BatchProcessor: Collect results BatchProcessor --> LambdaResponse: Report items that failed processing

Key features

  • Reports batch item failures to reduce number of retries for a record upon errors
  • Simple interface to process each batch record
  • Typed batch processing with automatic deserialization
  • Lambda context injection for typed handlers
  • AOT (Ahead-of-Time) compilation support

  • Bring your own batch processor

  • Parallel processing

Background

When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages.

If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: a) your Lambda function returns a successful response, b) record reaches maximum retry attempts, or c) when records expire.

journey section Conditions Successful response: 5: Success Maximum retries: 3: Failure Records expired: 1: Failure

This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:

  • SQS queues. Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
  • Kinesis data streams and DynamoDB streams. Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
Warning: This utility lowers the chance of processing records more than once; it does not guarantee it

We recommend implementing processing logic in an idempotent manner wherever possible.

You can find more details on how Lambda works with either SQS, Kinesis, or DynamoDB in the AWS Documentation.

Migrating to v3

If you're upgrading to v3, please review the Migration Guide v3 for important breaking changes including .NET 8 requirement and AWS SDK v4 migration.

Installation

You should install with NuGet:

1
Install-Package AWS.Lambda.Powertools.BatchProcessing 

Or via the .NET Core command line interface:

1
dotnet add package AWS.Lambda.Powertools.BatchProcessing 

Getting started

For this feature to work, you need to (1) configure your Lambda function event source to use ReportBatchItemFailures, and (2) return a specific response to report which records failed to be processed.

You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.

Batch processing can be configured with the settings bellow:

Setting Description Environment variable Default
Error Handling Policy The error handling policy to apply during batch processing. POWERTOOLS_BATCH_ERROR_HANDLING_POLICY DeriveFromEvent
Parallel Enabled Controls if parallel processing of batch items is enabled. POWERTOOLS_BATCH_PARALLEL_ENABLED false
Max Degree of Parallelism The maximum degree of parallelism to apply if parallel processing is enabled. POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM 1
Throw on Full Batch Failure Controls if a BatchProcessingException is thrown on full batch failure. POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE true

Required resources

The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.

You do not need any additional IAM permissions to use this utility, except for what each event source requires.

template.yaml
 1  2  3  4  5  6  7  8  9  10  11  12  13  14  15  16  17  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106
AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: Example project demoing SQS Queue processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET) Globals:  Function:  Timeout: 20  Runtime: dotnet8  MemorySize: 1024  Environment:  Variables:  POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-sqs  POWERTOOLS_LOG_LEVEL: Debug  POWERTOOLS_LOGGER_CASE: PascalCase  POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent  POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1  POWERTOOLS_BATCH_PARALLEL_ENABLED : false  POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true Resources:  # --------------  # KMS key for encrypted messages / records  CustomerKey:  Type: AWS::KMS::Key  Properties:  Description: KMS key for encrypted queues  Enabled: true  KeyPolicy:  Version: "2012-10-17"  Statement:  - Sid: Enable IAM User Permissions  Effect: Allow  Principal:  AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"  Action: "kms:*"  Resource: "*"  - Sid: Allow AWS Lambda to use the key  Effect: Allow  Principal:  Service: lambda.amazonaws.com  Action:  - kms:Decrypt  - kms:GenerateDataKey  Resource: "*"  CustomerKeyAlias:  Type: AWS::KMS::Alias  Properties:  AliasName: !Sub alias/${AWS::StackName}-kms-key  TargetKeyId: !Ref CustomerKey  # --------------  # Batch Processing for SQS Queue  SqsDeadLetterQueue:  Type: AWS::SQS::Queue  Properties:  KmsMasterKeyId: !Ref CustomerKey  SqsQueue:  Type: AWS::SQS::Queue  Properties:  RedrivePolicy:  deadLetterTargetArn: !GetAtt SqsDeadLetterQueue.Arn  maxReceiveCount: 2  KmsMasterKeyId: !Ref CustomerKey  SqsBatchProcessorFunction:  Type: AWS::Serverless::Function  Properties:  CodeUri: ./src/HelloWorld/  Handler: HelloWorld::HelloWorld.Function::SqsHandlerUsingAttribute  Policies:  - Statement:  - Sid: DlqPermissions  Effect: Allow  Action:  - sqs:SendMessage  - sqs:SendMessageBatch  Resource: !GetAtt SqsDeadLetterQueue.Arn  - Sid: KmsKeyPermissions  Effect: Allow  Action:  - kms:Decrypt  - kms:GenerateDataKey  Resource: !GetAtt CustomerKey.Arn  Events:  SqsBatch:  Type: SQS  Properties:  BatchSize: 5  Enabled: true  FunctionResponseTypes:  - ReportBatchItemFailures  Queue: !GetAtt SqsQueue.Arn  SqsBatchProcessorFunctionLogGroup:  Type: AWS::Logs::LogGroup  Properties:  LogGroupName: !Sub "/aws/lambda/${SqsBatchProcessorFunction}"  RetentionInDays: 7 Outputs:  SqsQueueUrl:  Description: "SQS Queue URL"  Value: !Ref SqsQueue 
template.yaml
 1  2  3  4  5  6  7  8  9  10  11  12  13  14  15  16  17  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: Example project demoing Kinesis Data Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET) Globals:  Function:  Timeout: 20  Runtime: dotnet8  MemorySize: 1024  Environment:  Variables:  POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-kinesis  POWERTOOLS_LOG_LEVEL: Debug  POWERTOOLS_LOGGER_CASE: PascalCase  POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent  POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1  POWERTOOLS_BATCH_PARALLEL_ENABLED : false  POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true Resources:  # --------------  # KMS key for encrypted messages / records  CustomerKey:  Type: AWS::KMS::Key  Properties:  Description: KMS key for encrypted queues  Enabled: true  KeyPolicy:  Version: "2012-10-17"  Statement:  - Sid: Enable IAM User Permissions  Effect: Allow  Principal:  AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"  Action: "kms:*"  Resource: "*"  - Sid: Allow AWS Lambda to use the key  Effect: Allow  Principal:  Service: lambda.amazonaws.com  Action:  - kms:Decrypt  - kms:GenerateDataKey  Resource: "*"  CustomerKeyAlias:  Type: AWS::KMS::Alias  Properties:  AliasName: !Sub alias/${AWS::StackName}-kms-key  TargetKeyId: !Ref CustomerKey  # --------------  # Batch Processing for Kinesis Data Stream  KinesisStreamDeadLetterQueue:  Type: AWS::SQS::Queue  Properties:  KmsMasterKeyId: !Ref CustomerKey  KinesisStream:  Type: AWS::Kinesis::Stream  Properties:  ShardCount: 1  StreamEncryption:  EncryptionType: KMS  KeyId: !Ref CustomerKey  KinesisStreamConsumer:  Type: AWS::Kinesis::StreamConsumer  Properties:  ConsumerName: powertools-dotnet-sample-batch-kds-consumer  StreamARN: !GetAtt KinesisStream.Arn  KinesisBatchProcessorFunction:  Type: AWS::Serverless::Function  Properties:  Policies:  - Statement:  - Sid: KinesisStreamConsumerPermissions  Effect: Allow  Action:  - kinesis:DescribeStreamConsumer  Resource:  - !GetAtt KinesisStreamConsumer.ConsumerARN  - Sid: DlqPermissions  Effect: Allow  Action:  - sqs:SendMessage  - sqs:SendMessageBatch  Resource: !GetAtt KinesisStreamDeadLetterQueue.Arn  - Sid: KmsKeyPermissions  Effect: Allow  Action:  - kms:Decrypt  - kms:GenerateDataKey  Resource: !GetAtt CustomerKey.Arn  CodeUri: ./src/HelloWorld/  Handler: HelloWorld::HelloWorld.Function::KinesisEventHandlerUsingAttribute  Events:  Kinesis:  Type: Kinesis  Properties:  BatchSize: 5  BisectBatchOnFunctionError: true  DestinationConfig:  OnFailure:  Destination: !GetAtt KinesisStreamDeadLetterQueue.Arn  Enabled: true  FunctionResponseTypes:  - ReportBatchItemFailures  MaximumRetryAttempts: 2  ParallelizationFactor: 1  StartingPosition: LATEST  Stream: !GetAtt KinesisStreamConsumer.ConsumerARN  KinesisBatchProcessorFunctionLogGroup:  Type: AWS::Logs::LogGroup  Properties:  LogGroupName: !Sub "/aws/lambda/${KinesisBatchProcessorFunction}"  RetentionInDays: 7 Outputs:  KinesisStreamArn:  Description: "Kinesis Stream ARN"  Value: !GetAtt KinesisStream.Arn 
template.yaml
 1  2  3  4  5  6  7  8  9  10  11  12  13  14  15  16  17  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: Example project demoing DynamoDB Streams processing using the Batch Processing Utility in Powertools for AWS Lambda (.NET) Globals:  Function:  Timeout: 20  Runtime: dotnet8  MemorySize: 1024  Environment:  Variables:  POWERTOOLS_SERVICE_NAME: powertools-dotnet-sample-batch-ddb  POWERTOOLS_LOG_LEVEL: Debug  POWERTOOLS_LOGGER_CASE: PascalCase  POWERTOOLS_BATCH_ERROR_HANDLING_POLICY: DeriveFromEvent  POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM: 1  POWERTOOLS_BATCH_PARALLEL_ENABLED : false  POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE: true Resources:  # --------------  # KMS key for encrypted messages / records  CustomerKey:  Type: AWS::KMS::Key  Properties:  Description: KMS key for encrypted queues  Enabled: true  KeyPolicy:  Version: "2012-10-17"  Statement:  - Sid: Enable IAM User Permissions  Effect: Allow  Principal:  AWS: !Sub "arn:aws:iam::${AWS::AccountId}:root"  Action: "kms:*"  Resource: "*"  - Sid: Allow AWS Lambda to use the key  Effect: Allow  Principal:  Service: lambda.amazonaws.com  Action:  - kms:Decrypt  - kms:GenerateDataKey  Resource: "*"  CustomerKeyAlias:  Type: AWS::KMS::Alias  Properties:  AliasName: !Sub alias/${AWS::StackName}-kms-key  TargetKeyId: !Ref CustomerKey  # --------------  # Batch Processing for DynamoDb (DDB) Stream  DdbStreamDeadLetterQueue:  Type: AWS::SQS::Queue  Properties:  KmsMasterKeyId: !Ref CustomerKey  DdbTable:  Type: AWS::DynamoDB::Table  Properties:  BillingMode: PAY_PER_REQUEST  AttributeDefinitions:  - AttributeName: id  AttributeType: S  KeySchema:  - AttributeName: id  KeyType: HASH  StreamSpecification:  StreamViewType: NEW_AND_OLD_IMAGES  DdbStreamBatchProcessorFunction:  Type: AWS::Serverless::Function  Properties:  CodeUri: ./src/HelloWorld/  Handler: HelloWorld::HelloWorld.Function::DynamoDbStreamHandlerUsingAttribute  Policies:  - AWSLambdaDynamoDBExecutionRole  - Statement:  - Sid: DlqPermissions  Effect: Allow  Action:  - sqs:SendMessage  - sqs:SendMessageBatch  Resource: !GetAtt DdbStreamDeadLetterQueue.Arn  - Sid: KmsKeyPermissions  Effect: Allow  Action:  - kms:GenerateDataKey  Resource: !GetAtt CustomerKey.Arn  Events:  Stream:  Type: DynamoDB  Properties:  BatchSize: 5  BisectBatchOnFunctionError: true  DestinationConfig:  OnFailure:  Destination: !GetAtt DdbStreamDeadLetterQueue.Arn  Enabled: true  FunctionResponseTypes:  - ReportBatchItemFailures  MaximumRetryAttempts: 2  ParallelizationFactor: 1  StartingPosition: LATEST  Stream: !GetAtt DdbTable.StreamArn  DdbStreamBatchProcessorFunctionLogGroup:  Type: AWS::Logs::LogGroup  Properties:  LogGroupName: !Sub "/aws/lambda/${DdbStreamBatchProcessorFunction}"  RetentionInDays: 7 Outputs:  DdbTableName:  Description: "DynamoDB Table Name"  Value: !Ref DdbTable 

Processing messages from SQS

Processing batches from SQS using typed Lambda handler decorator with automatic deserialization works in four stages:

  1. Define your data model class
  2. Create a class that implements ITypedRecordHandler<T> interface and the HandleAsync method
  3. Decorate your handler with BatchProcessor attribute using TypedRecordHandler property
  4. Return BatchItemFailuresResponse from Lambda handler using TypedSqsBatchProcessor.Result.BatchItemFailuresResponse
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
public class Product {  public int Id { get; set; }  public string? Name { get; set; }  public decimal Price { get; set; } } public class TypedSqsRecordHandler : ITypedRecordHandler<Product> // (1)! {  public async Task<RecordHandlerResult> HandleAsync(Product product, CancellationToken cancellationToken)  {  /*  * Your business logic with automatic deserialization.  * If an exception is thrown, the item will be marked as a partial batch item failure.  */  Logger.LogInformation($"Processing product {product.Id} - {product.Name} (${product.Price})");  if (product.Id == 4) // (2)!  {  throw new ArgumentException("Error on id 4");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } [BatchProcessor(TypedRecordHandler = typeof(TypedSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingTypedAttribute(SQSEvent _) {  return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)! } 
  1. Step 1. Creates a class that implements ITypedRecordHandler interface - Product is automatically deserialized from SQS message body.
  2. Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
  3. Step 3. RecordHandlerResult can return empty (None) or some data.
  4. Step 4. Lambda function returns the Partial batch response using TypedSqsBatchProcessor
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
{  "Records": [  {  "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"id\": 1, \"name\": \"Laptop Computer\", \"price\": 999.99}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "7b270e59b47ff90a553787216d55d91d",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"id\": 4, \"name\": \"Invalid Product\", \"price\": -10.00}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "7b270e59b47ff90a553787216d55d92e",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  }  ] } 

The second record failed to be processed, therefore the processor added its message ID in the response.

1 2 3 4 5 6 7
{  "batchItemFailures": [  {  "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"  }  ] } 

Using Handler decorator (Traditional)

Processing batches from SQS using Lambda handler decorator works in three stages:

  1. Decorate your handler with BatchProcessor attribute
  2. Create a class that implements ISqsRecordHandler interface and the HandleAsync method.
  3. Pass the type of that class to RecordHandler property of the BatchProcessor attribute
  4. Return BatchItemFailuresResponse from Lambda handler using SqsBatchProcessor.Result.BatchItemFailuresResponse
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
public class CustomSqsRecordHandler : ISqsRecordHandler // (1)! {  public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)  {  /*  * Your business logic.  * If an exception is thrown, the item will be marked as a partial batch item failure.  */  var product = JsonSerializer.Deserialize<Product>(record.Body);  if (product.Id == 4) // (2)!  {  throw new ArgumentException("Error on id 4");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } [BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) {  return SqsBatchProcessor.Result.BatchItemFailuresResponse; // (4)! } 
  1. Step 1. Creates a class that implements ISqsRecordHandler interface and the HandleAsync method.
  2. Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
  3. Step 3. RecordHandlerResult can return empty (None) or some data.
  4. Step 4. Lambda function returns the Partial batch response
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
{  "Records": [  {  "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "fail",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  ] } 

The second record failed to be processed, therefore the processor added its message ID in the response.

 1  2  3  4  5  6  7  8  9 10
{  "batchItemFailures": [  {  "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"  },  {  "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"  }  ] } 

FIFO queues

When using SQS FIFO queues, we will stop processing messages after the first failure, and return all failed and unprocessed messages in batchItemFailures. This helps preserve the ordering of messages in your queue. Powertools automatically detects a FIFO queue.

Processing messages from Kinesis

Processing batches from Kinesis using typed Lambda handler decorator with automatic deserialization works in four stages:

  1. Define your data model class
  2. Create a class that implements ITypedRecordHandler<T> interface and the HandleAsync method
  3. Decorate your handler with BatchProcessor attribute using TypedRecordHandler property
  4. Return BatchItemFailuresResponse from Lambda handler using TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
public class Order {  public string? OrderId { get; set; }  public DateTime OrderDate { get; set; }  public List<Product> Items { get; set; } = new();  public decimal TotalAmount { get; set; } } internal class TypedKinesisRecordHandler : ITypedRecordHandler<Order> // (1)! {  public async Task<RecordHandlerResult> HandleAsync(Order order, CancellationToken cancellationToken)  {  Logger.LogInformation($"Processing order {order.OrderId} with {order.Items.Count} items");  if (order.TotalAmount <= 0) // (2)!  {  throw new ArgumentException("Invalid order total");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))] public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _) {  return TypedKinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)! } 
  1. Step 1. Creates a class that implements ITypedRecordHandler interface - Order is automatically deserialized from Kinesis record data.
  2. Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
  3. Step 3. RecordHandlerResult can return empty (None) or some data.
  4. Step 4. Lambda function returns the Partial batch response using TypedKinesisEventBatchProcessor

Using Handler decorator (Traditional)

Processing batches from Kinesis using Lambda handler decorator works in three stages:

  1. Decorate your handler with BatchProcessor attribute
  2. Create a class that implements IKinesisEventRecordHandler interface and the HandleAsync method.
  3. Pass the type of that class to RecordHandler property of the BatchProcessor attribute
  4. Return BatchItemFailuresResponse from Lambda handler using KinesisEventBatchProcessor.Result.BatchItemFailuresResponse
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21
internal class CustomKinesisEventRecordHandler : IKinesisEventRecordHandler // (1)! {  public async Task<RecordHandlerResult> HandleAsync(KinesisEvent.KinesisEventRecord record, CancellationToken cancellationToken)  {  var product = JsonSerializer.Deserialize<Product>(record.Kinesis.Data);  if (product.Id == 4) // (2)!  {  throw new ArgumentException("Error on id 4");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } [BatchProcessor(RecordHandler = typeof(CustomKinesisEventRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttribute(KinesisEvent _) {  return KinesisEventBatchProcessor.Result.BatchItemFailuresResponse; // (4)! } 
  1. Step 1. Creates a class that implements the IKinesisEventRecordHandler interface and the HandleAsync method.
  2. Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
  3. Step 3. RecordHandlerResult can return empty (None) or some data.
  4. Step 4. Lambda function returns the Partial batch response
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
{  "Records": [  {  "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "fail",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  ] } 

The second record failed to be processed, therefore the processor added its message ID in the response.

 1  2  3  4  5  6  7  8  9 10
{  "batchItemFailures": [  {  "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"  },  {  "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"  }  ] } 

Processing messages from DynamoDB

Processing batches from DynamoDB Streams using typed Lambda handler decorator with automatic deserialization works in four stages:

  1. Define your data model class
  2. Create a class that implements ITypedRecordHandler<T> interface and the HandleAsync method
  3. Decorate your handler with BatchProcessor attribute using TypedRecordHandler property
  4. Return BatchItemFailuresResponse from Lambda handler using TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
public class Customer {  public string? CustomerId { get; set; }  public string? Name { get; set; }  public string? Email { get; set; }  public DateTime CreatedAt { get; set; } } internal class TypedDynamoDbRecordHandler : ITypedRecordHandler<Customer> // (1)! {  public async Task<RecordHandlerResult> HandleAsync(Customer customer, CancellationToken cancellationToken)  {  Logger.LogInformation($"Processing customer {customer.CustomerId} - {customer.Name}");  if (string.IsNullOrEmpty(customer.Email)) // (2)!  {  throw new ArgumentException("Customer email is required");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } [BatchProcessor(TypedRecordHandler = typeof(TypedDynamoDbRecordHandler))] public BatchItemFailuresResponse HandlerUsingTypedAttribute(DynamoDBEvent _) {  return TypedDynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)! } 
  1. Step 1. Creates a class that implements ITypedRecordHandler interface - Customer is automatically deserialized from DynamoDB stream record.
  2. Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
  3. Step 3. RecordHandlerResult can return empty (None) or some data.
  4. Step 4. Lambda function returns the Partial batch response using TypedDynamoDbStreamBatchProcessor

Using Handler decorator (Traditional)

Processing batches from DynamoDB Streams using Lambda handler decorator works in three stages:

  1. Decorate your handler with BatchProcessor attribute
  2. Create a class that implements IDynamoDbStreamRecordHandler and the HandleAsync method.
  3. Pass the type of that class to RecordHandler property of the BatchProcessor attribute
  4. Return BatchItemFailuresResponse from Lambda handler using DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21
internal class CustomDynamoDbStreamRecordHandler : IDynamoDbStreamRecordHandler // (1)! {  public async Task<RecordHandlerResult> HandleAsync(DynamoDBEvent.DynamodbStreamRecord record, CancellationToken cancellationToken)  {  var product = JsonSerializer.Deserialize<Product>(record.Dynamodb.NewImage["Product"].S);  if (product.Id == 4) // (2)!  {  throw new ArgumentException("Error on id 4");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } [BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) {  return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; // (4)! } 
  1. Step 1. Creates a class that implements the IDynamoDbStreamRecordHandler and the HandleAsync method.
  2. Step 2. You can have custom logic inside the record handler and throw exceptions that will cause this message to fail
  3. Step 3. RecordHandlerResult can return empty (None) or some data.
  4. Step 4. Lambda function returns the Partial batch response
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
{  "Records": [  {  "eventID": "1",  "eventVersion": "1.0",  "dynamodb": {  "Keys": {  "Id": {  "N": "101"  }  },  "NewImage": {  "Product": {  "S": "{\"Id\":1,\"Name\":\"product-name\",\"Price\":14}"  }  },  "StreamViewType": "NEW_AND_OLD_IMAGES",  "SequenceNumber": "3275880929",  "SizeBytes": 26  },  "awsRegion": "us-west-2",  "eventName": "INSERT",  "eventSourceARN": "eventsource_arn",  "eventSource": "aws:dynamodb"  },  {  "eventID": "1",  "eventVersion": "1.0",  "dynamodb": {  "Keys": {  "Id": {  "N": "101"  }  },  "NewImage": {  "Product": {  "S": "fail"  }  },  "StreamViewType": "NEW_AND_OLD_IMAGES",  "SequenceNumber": "8640712661",  "SizeBytes": 26  },  "awsRegion": "us-west-2",  "eventName": "INSERT",  "eventSourceARN": "eventsource_arn",  "eventSource": "aws:dynamodb"  }  ] } 

The second record failed to be processed, therefore the processor added its message ID in the response.

1 2 3 4 5 6 7
{  "batchItemFailures": [  {  "itemIdentifier": "8640712661"  }  ] } 

Error handling

By default, we catch any exception raised by your custom record handler HandleAsync method (ISqsRecordHandler, IKinesisEventRecordHandler, IDynamoDbStreamRecordHandler). This allows us to (1) continue processing the batch, (2) collect each batch item that failed processing, and (3) return the appropriate response correctly without failing your Lambda function execution.

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20
public class CustomSqsRecordHandler : ISqsRecordHandler // (1)! {  public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)  {  /*  * Your business logic.  * If an exception is thrown, the item will be marked as a partial batch item failure.  */  var product = JsonSerializer.Deserialize<Product>(record.Body);  if (product.Id == 4) // (2)!  {  throw new ArgumentException("Error on id 4");  }  return await Task.FromResult(RecordHandlerResult.None); // (3)!  } } 
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
{  "Records": [  {  "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "fail",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  {  "messageId": "213f4fd3-84a4-4667-a1b9-c277964197d9",  "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a",  "body": "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",  "attributes": {  "ApproximateReceiveCount": "1",  "SentTimestamp": "1545082649183",  "SenderId": "SENDER_ID",  "ApproximateFirstReceiveTimestamp": "1545082649185"  },  "messageAttributes": {},  "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",  "eventSource": "aws:sqs",  "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue",  "awsRegion": "us-east-1"  },  ] } 

The second record failed to be processed, therefore the processor added its message ID in the response.

 1  2  3  4  5  6  7  8  9 10
{  "batchItemFailures": [  {  "itemIdentifier": "244fc6b4-87a3-44ab-83d2-361172410c3a"  },  {  "itemIdentifier": "213f4fd3-84a4-4667-a1b9-c277964197d9"  }  ] } 

Error Handling Policy

You can specify the error handling policy applied during batch processing.

ErrorHandlingPolicy is used to control the error handling policy of the batch item processing. With a value of DeriveFromEvent (default), the specific BatchProcessor, determines the policy based on the incoming event.

For example, the SqsBatchProcessor looks at the EventSourceArn to determine if the ErrorHandlingPolicy should be StopOnFirstBatchItemFailure (for FIFO queues) or ContinueOnBatchItemFailure (for standard queues). For StopOnFirstBatchItemFailure the batch processor stops processing and marks any remaining records as batch item failures. For ContinueOnBatchItemFailure the batch processor continues processing batch items regardless of item failures.

Policy Description
DeriveFromEvent Auto-derive the policy based on the event.
ContinueOnBatchItemFailure Continue processing regardless of whether other batch items fails during processing.
StopOnFirstBatchItemFailure Stop processing other batch items after the first batch item has failed processing. This is useful to preserve ordered processing of events.

Note

When using StopOnFirstBatchItemFailure and parallel processing is enabled, all batch items already scheduled to be processed, will be allowed to complete before the batch processing stops.

Therefore, if order is important, it is recommended to use sequential (non-parallel) processing together with this value."

To change the default error handling policy, you can set the POWERTOOLS_BATCH_ERROR_HANDLING_POLICY Environment Variable.

Another approach is to decorate the handler and use one of the policies in the ErrorHandlingPolicy Enum property of the BatchProcessor attribute

1 2 3 4 5 6
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler),  ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) {  return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; } 

Partial failure mechanics

All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch:

  • All records successfully processed. We will return an empty list of item failures {'batchItemFailures': []}.
  • Partial success with some exceptions. We will return a list of all item IDs/sequence numbers that failed processing.
  • All records failed to be processed. By defaullt, we will throw a BatchProcessingException with a list of all exceptions raised during processing to reflect the failure in your operational metrics. However, in some scenarios, this might not be desired. See Working with full batch failures for more information.

The following sequence diagrams explain how each Batch processor behaves under different scenarios.

SQS Standard

Read more about Batch Failure Reporting feature in AWS Lambda.

Sequence diagram to explain how BatchProcessor works with SQS Standard queues.

sequenceDiagram autonumber participant SQS queue participant Lambda service participant Lambda function Lambda service->>SQS queue: Poll Lambda service->>Lambda function: Invoke (batch event) Lambda function->>Lambda service: Report some failed messages activate SQS queue Lambda service->>SQS queue: Delete successful messages SQS queue-->>SQS queue: Failed messages return Note over SQS queue,Lambda service: Process repeat deactivate SQS queue
SQS mechanism with Batch Item Failures

SQS FIFO

Read more about Batch Failure Reporting feature in AWS Lambda.

Sequence diagram to explain how SqsFifoPartialProcessor works with SQS FIFO queues.

sequenceDiagram autonumber participant SQS queue participant Lambda service participant Lambda function Lambda service->>SQS queue: Poll Lambda service->>Lambda function: Invoke (batch event) activate Lambda function Lambda function-->Lambda function: Process 2 out of 10 batch items Lambda function--xLambda function: Fail on 3rd batch item Lambda function->>Lambda service: Report 3rd batch item and unprocessed messages as failure deactivate Lambda function activate SQS queue Lambda service->>SQS queue: Delete successful messages (1-2) SQS queue-->>SQS queue: Failed messages return (3-10) deactivate SQS queue
SQS FIFO mechanism with Batch Item Failures

Kinesis and DynamoDB Streams

Read more about Batch Failure Reporting feature.

Sequence diagram to explain how BatchProcessor works with both Kinesis Data Streams and DynamoDB Streams.

For brevity, we will use Streams to refer to either services. For theory on stream checkpoints, see this blog post

sequenceDiagram autonumber participant Streams participant Lambda service participant Lambda function Lambda service->>Streams: Poll latest records Lambda service->>Lambda function: Invoke (batch event) activate Lambda function Lambda function-->Lambda function: Process 2 out of 10 batch items Lambda function--xLambda function: Fail on 3rd batch item Lambda function-->Lambda function: Continue processing batch items (4-10) Lambda function->>Lambda service: Report batch item as failure (3) deactivate Lambda function activate Streams Lambda service->>Streams: Checkpoints to sequence number from 3rd batch item Lambda service->>Streams: Poll records starting from updated checkpoint deactivate Streams
Kinesis and DynamoDB streams mechanism with single batch item failure

The behavior changes slightly when there are multiple item failures. Stream checkpoint is updated to the lowest sequence number reported.

Note that the batch item sequence number could be different from batch item number in the illustration.

sequenceDiagram autonumber participant Streams participant Lambda service participant Lambda function Lambda service->>Streams: Poll latest records Lambda service->>Lambda function: Invoke (batch event) activate Lambda function Lambda function-->Lambda function: Process 2 out of 10 batch items Lambda function--xLambda function: Fail on 3-5 batch items Lambda function-->Lambda function: Continue processing batch items (6-10) Lambda function->>Lambda service: Report batch items as failure (3-5) deactivate Lambda function activate Streams Lambda service->>Streams: Checkpoints to lowest sequence number Lambda service->>Streams: Poll records starting from updated checkpoint deactivate Streams
Kinesis and DynamoDB streams mechanism with multiple batch item failures

Typed Batch Processing Advanced Features

AOT (Ahead-of-Time) Compilation Support

For Native AOT scenarios, you can configure JsonSerializerContext:

 1  2  3  4  5  6  7  8  9 10 11
[JsonSerializable(typeof(Product))] [JsonSerializable(typeof(Order))] [JsonSerializable(typeof(Customer))] [JsonSerializable(typeof(List<Product>))] [JsonSourceGenerationOptions(  PropertyNamingPolicy = JsonKnownNamingPolicy.CamelCase,  WriteIndented = false,  DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)] public partial class MyJsonSerializerContext : JsonSerializerContext { } 
1 2 3 4 5 6 7
[BatchProcessor(  TypedRecordHandler = typeof(TypedSqsRecordHandler),  JsonSerializerContext = typeof(MyJsonSerializerContext))] public BatchItemFailuresResponse ProcessWithAot(SQSEvent sqsEvent) {  return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; } 

Lambda Context Injection

For typed handlers that need access to Lambda context, use ITypedRecordHandlerWithContext<T>:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16
public class ProductHandlerWithContext : ITypedRecordHandlerWithContext<Product> {  public async Task<RecordHandlerResult> HandleAsync(Product product, ILambdaContext context, CancellationToken cancellationToken)  {  Logger.LogInformation($"Processing product {product.Id} in request {context.AwsRequestId}");  Logger.LogInformation($"Remaining time: {context.RemainingTime.TotalSeconds}s");  // Use context for timeout handling  if (context.RemainingTime.TotalSeconds < 5)  {  Logger.LogWarning("Low remaining time, processing quickly");  }  return RecordHandlerResult.None;  } } 
1 2 3 4 5
[BatchProcessor(TypedRecordHandler = typeof(ProductHandlerWithContext))] public BatchItemFailuresResponse ProcessWithContext(SQSEvent sqsEvent, ILambdaContext context) {  return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; } 

Migration from Traditional to Typed Handlers

You can gradually migrate from traditional to typed handlers:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21
public class TraditionalSqsHandler : ISqsRecordHandler {  public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)  {  // Manual deserialization  var product = JsonSerializer.Deserialize<Product>(record.Body);   Logger.LogInformation($"Processing product {product.Id}");  if (product.Price < 0)  throw new ArgumentException("Invalid price");  return RecordHandlerResult.None;  } } [BatchProcessor(RecordHandler = typeof(TraditionalSqsHandler))] public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent) {  return SqsBatchProcessor.Result.BatchItemFailuresResponse; } 
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20
public class TypedSqsHandler : ITypedRecordHandler<Product> {  public async Task<RecordHandlerResult> HandleAsync(Product product, CancellationToken cancellationToken)  {  // Automatic deserialization - product is already deserialized!  Logger.LogInformation($"Processing product {product.Id}");  // Same business logic  if (product.Price < 0)  throw new ArgumentException("Invalid price");  return RecordHandlerResult.None;  } } [BatchProcessor(TypedRecordHandler = typeof(TypedSqsHandler))] public BatchItemFailuresResponse ProcessSqs(SQSEvent sqsEvent) {  return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; } 

Error Handling with Typed Processors

Typed processors support the same error handling policies as traditional processors:

1 2 3 4 5 6 7
[BatchProcessor(  TypedRecordHandler = typeof(TypedSqsHandler),  ErrorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)] public BatchItemFailuresResponse ProcessWithErrorPolicy(SQSEvent sqsEvent) {  return TypedSqsBatchProcessor.Result.BatchItemFailuresResponse; } 

Advanced

Using utility outside handler and IoC

You can use Batch processing without using the decorator.

Calling the ProcessAsync method on the Instance of the static BatchProcessor (SqsBatchProcessor, DynamoDbStreamBatchProcessor, KinesisEventBatchProcessor)

 1  2  3  4  5  6  7  8  9 10 11 12 13
public async Task<BatchItemFailuresResponse> HandlerUsingUtility(DynamoDBEvent dynamoDbEvent) {  var result = await DynamoDbStreamBatchProcessor.Instance.ProcessAsync(dynamoDbEvent, RecordHandler<DynamoDBEvent.DynamodbStreamRecord>.From(record =>  {  var product = JsonSerializer.Deserialize<JsonElement>(record.Dynamodb.NewImage["Product"].S);  if (product.GetProperty("Id").GetInt16() == 4)  {  throw new ArgumentException("Error on 4");  }  }));  return result.BatchItemFailuresResponse; } 

To make the handler testable you can use Dependency Injection to resolve the BatchProcessor (SqsBatchProcessor, DynamoDbStreamBatchProcessor, KinesisEventBatchProcessor) instance and then call the ProcessAsync method.

1 2 3 4 5 6 7
public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent) {  var batchProcessor = Services.Provider.GetRequiredService<IDynamoDbStreamBatchProcessor>();  var recordHandler = Services.Provider.GetRequiredService<IDynamoDbStreamRecordHandler>();  var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);  return result.BatchItemFailuresResponse; } 
1 2 3 4 5 6
public async Task<BatchItemFailuresResponse> HandlerUsingUtilityFromIoc(DynamoDBEvent dynamoDbEvent,  IDynamoDbStreamBatchProcessor batchProcessor, IDynamoDbStreamRecordHandler recordHandler) {  var result = await batchProcessor.ProcessAsync(dynamoDbEvent, recordHandler);  return result.BatchItemFailuresResponse; } 
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20
internal class Services {  private static readonly Lazy<IServiceProvider> LazyInstance = new(Build);  private static ServiceCollection _services;  public static IServiceProvider Provider => LazyInstance.Value;  public static IServiceProvider Init()  {  return LazyInstance.Value;  }  private static IServiceProvider Build()  {  _services = new ServiceCollection();  _services.AddScoped<IDynamoDbStreamBatchProcessor, CustomDynamoDbStreamBatchProcessor>();  _services.AddScoped<IDynamoDbStreamRecordHandler, CustomDynamoDbStreamRecordHandler>();  return _services.BuildServiceProvider();  } } 

Processing messages in parallel

You can set the POWERTOOLS_BATCH_PARALLEL_ENABLED Environment Variable to true or set the property BatchParallelProcessingEnabled on the Lambda decorator to process messages concurrently.

You can also set POWERTOOLS_BATCH_MAX_DEGREE_OF_PARALLELISM Environment Variable to the number of parallelism you which.

Note

MaxDegreeOfParallelism is used to control the parallelism of the batch item processing.

With a value of 1, the processing is done sequentially (default). Sequential processing is recommended when preserving order is important - i.e. with SQS FIFIO queues.

With a value > 1, the processing is done in parallel. Doing parallel processing can enable processing to complete faster, i.e., when processing does downstream service calls.

With a value of -1, the parallelism is automatically configured to be the vCPU count of the Lambda function. Internally, the Batch Processing Utility utilizes Parallel.ForEachAsync Method and the ParallelOptions.MaxDegreeOfParallelism Property to enable this functionality.

When is this useful?

Your use case might be able to process multiple records at the same time without conflicting with one another.

For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.

The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).

1 2 3 4 5
[BatchProcessor(RecordHandler = typeof(CustomDynamoDbStreamRecordHandler), BatchParallelProcessingEnabled = true )] public BatchItemFailuresResponse HandlerUsingAttribute(DynamoDBEvent _) {  return DynamoDbStreamBatchProcessor.Result.BatchItemFailuresResponse; } 

Working with full batch failures

By default, the BatchProcessor will throw a BatchProcessingException if all records in the batch fail to process. We do this to reflect the failure in your operational metrics.

When working with functions that handle batches with a small number of records, or when you use errors as a flow control mechanism, this behavior might not be desirable as your function might generate an unnaturally high number of errors. When this happens, the Lambda service will scale down the concurrency of your function, potentially impacting performance.

For these scenarios, you can set POWERTOOLS_BATCH_THROW_ON_FULL_BATCH_FAILURE = false, or the equivalent on either the BatchProcessor decorator or on the ProcessingOptions object. See examples below.

1 2 3 4 5 6 7
[BatchProcessor(  RecordHandler = typeof(CustomSqsRecordHandler),  ThrowOnFullBatchFailure = false)] public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) {  return SqsBatchProcessor.Result.BatchItemFailuresResponse; } 
 1  2  3  4  5  6  7  8  9 10 11
public async Task<BatchItemFailuresResponse> HandlerUsingUtility(SQSEvent sqsEvent) {  var result = await SqsBatchProcessor.Instance.ProcessAsync(sqsEvent, RecordHandler<SQSEvent.SQSMessage>.From(x =>  {  // Inline handling of SQS message...  }), new ProcessingOptions  {  ThrowOnFullBatchFailure = false  });  return result.BatchItemFailuresResponse; } 

Extending BatchProcessor

You might want to bring custom logic to the existing BatchProcessor to slightly override how we handle successes and failures.

For these scenarios, you can create a class that inherits from BatchProcessor (SqsBatchProcessor, DynamoDbStreamBatchProcessor, KinesisEventBatchProcessor) and quickly override ProcessAsync and HandleRecordFailureAsync methods:

  • ProcessAsync() – Keeps track of successful batch records
  • HandleRecordFailureAsync() – Keeps track of failed batch records
Example

Let's suppose you'd like to add a metric named BatchRecordFailures for each batch record that failed processing. And also override the default error handling policy to stop on first item failure.

 1  2  3  4  5  6  7  8  9  10  11  12  13  14  15  16  17  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89  90  91  92  93  94  95  96  97  98  99 100 101
public class CustomDynamoDbStreamBatchProcessor : DynamoDbStreamBatchProcessor {  public override async Task<ProcessingResult<DynamoDBEvent.DynamodbStreamRecord>> ProcessAsync(DynamoDBEvent @event,  IRecordHandler<DynamoDBEvent.DynamodbStreamRecord> recordHandler, ProcessingOptions processingOptions)  {  ProcessingResult = new ProcessingResult<DynamoDBEvent.DynamodbStreamRecord>();  // Prepare batch records (order is preserved)  var batchRecords = GetRecordsFromEvent(@event).Select(x => new KeyValuePair<string, DynamoDBEvent.DynamodbStreamRecord>(GetRecordId(x), x))  .ToArray();  // We assume all records fail by default to avoid loss of data  var failureBatchRecords = batchRecords.Select(x => new KeyValuePair<string, RecordFailure<DynamoDBEvent.DynamodbStreamRecord>>(x.Key,  new RecordFailure<DynamoDBEvent.DynamodbStreamRecord>  {  Exception = new UnprocessedRecordException($"Record: '{x.Key}' has not been processed."),  Record = x.Value  }));  // Override to fail on first failure  var errorHandlingPolicy = BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure;   var successRecords = new Dictionary<string, RecordSuccess<DynamoDBEvent.DynamodbStreamRecord>>();  var failureRecords = new Dictionary<string, RecordFailure<DynamoDBEvent.DynamodbStreamRecord>>(failureBatchRecords);  try  {  foreach (var pair in batchRecords)  {  var (recordId, record) = pair;  try  {  var result = await HandleRecordAsync(record, recordHandler, CancellationToken.None);  failureRecords.Remove(recordId, out _);  successRecords.TryAdd(recordId, new RecordSuccess<DynamoDBEvent.DynamodbStreamRecord>  {  Record = record,  RecordId = recordId,  HandlerResult = result  });  }  catch (Exception ex)  {  // Capture exception  failureRecords[recordId] = new RecordFailure<DynamoDBEvent.DynamodbStreamRecord>  {  Exception = new RecordProcessingException(  $"Failed processing record: '{recordId}'. See inner exception for details.", ex),  Record = record,  RecordId = recordId  };  Metrics.AddMetric("BatchRecordFailures", 1, MetricUnit.Count);   try  {  // Invoke hook  await HandleRecordFailureAsync(record, ex);  }  catch  {  // NOOP  }  // Check if we should stop record processing on first error  // ReSharper disable once ConditionIsAlwaysTrueOrFalse  if (errorHandlingPolicy == BatchProcessorErrorHandlingPolicy.StopOnFirstBatchItemFailure)  {  // This causes the loop's (inner) cancellation token to be cancelled for all operations already scheduled internally  throw new CircuitBreakerException(  "Error handling policy is configured to stop processing on first batch item failure. See inner exception for details.",  ex);  }  }  }  }  catch (Exception ex) when (ex is CircuitBreakerException or OperationCanceledException)  {  // NOOP  }  ProcessingResult.BatchRecords.AddRange(batchRecords.Select(x => x.Value));  ProcessingResult.BatchItemFailuresResponse.BatchItemFailures.AddRange(failureRecords.Select(x =>  new BatchItemFailuresResponse.BatchItemFailure  {  ItemIdentifier = x.Key  }));  ProcessingResult.FailureRecords.AddRange(failureRecords.Values);  ProcessingResult.SuccessRecords.AddRange(successRecords.Values);  return ProcessingResult;  }  // ReSharper disable once RedundantOverriddenMember  protected override async Task HandleRecordFailureAsync(DynamoDBEvent.DynamodbStreamRecord record, Exception exception)  {  await base.HandleRecordFailureAsync(record, exception);  } } 

Testing your code

Testing Typed Handlers

Testing typed batch processors is straightforward since you work directly with your data models:

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
[Fact] public async Task TypedHandler_ValidProduct_ProcessesSuccessfully() {  // Arrange  var product = new Product { Id = 1, Name = "Test Product", Price = 10.99m };  var handler = new TypedSqsRecordHandler();  var cancellationToken = CancellationToken.None;  // Act  var result = await handler.HandleAsync(product, cancellationToken);  // Assert  Assert.Equal(RecordHandlerResult.None, result); } [Fact] public async Task TypedHandler_InvalidProduct_ThrowsException() {  // Arrange  var product = new Product { Id = 4, Name = "Invalid", Price = -10 };  var handler = new TypedSqsRecordHandler();  // Act & Assert  await Assert.ThrowsAsync<ArgumentException>(() =>  handler.HandleAsync(product, CancellationToken.None)); } 
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
[Fact] public async Task ProcessSqsEvent_WithTypedHandler_ProcessesAllRecords() {  // Arrange  var sqsEvent = new SQSEvent  {  Records = new List<SQSEvent.SQSMessage>  {  new() {  MessageId = "1",  Body = JsonSerializer.Serialize(new Product { Id = 1, Name = "Product 1", Price = 10 }),  EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue"  },  new() {  MessageId = "2",  Body = JsonSerializer.Serialize(new Product { Id = 2, Name = "Product 2", Price = 20 }),  EventSourceArn = "arn:aws:sqs:us-east-1:123456789012:my-queue"  }  }  };  var function = new TypedFunction();  // Act  var result = function.HandlerUsingTypedAttribute(sqsEvent);  // Assert  Assert.Empty(result.BatchItemFailures); } 

Testing Traditional Handlers

As there is no external calls, you can unit test your code with BatchProcessor quite easily.

 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18
[Fact] public Task Sqs_Handler_Using_Attribute() {  var request = new SQSEvent  {  Records = TestHelper.SqsMessages  };  var function = new HandlerFunction();  var response = function.HandlerUsingAttribute(request);  Assert.Equal(2, response.BatchItemFailures.Count);  Assert.Equal("2", response.BatchItemFailures[0].ItemIdentifier);  Assert.Equal("4", response.BatchItemFailures[1].ItemIdentifier);  return Task.CompletedTask; } 
1 2 3 4 5
[BatchProcessor(RecordHandler = typeof(CustomSqsRecordHandler))] public BatchItemFailuresResponse HandlerUsingAttribute(SQSEvent _) {  return SqsBatchProcessor.Result.BatchItemFailuresResponse; } 
 1  2  3  4  5  6  7  8  9 10 11 12 13 14
public class CustomSqsRecordHandler : ISqsRecordHandler {  public async Task<RecordHandlerResult> HandleAsync(SQSEvent.SQSMessage record, CancellationToken cancellationToken)  {  var product = JsonSerializer.Deserialize<JsonElement>(record.Body);  if (product.GetProperty("Id").GetInt16() == 4)  {  throw new ArgumentException("Error on 4");  }  return await Task.FromResult(RecordHandlerResult.None);  } } 
 1  2  3  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
internal static List<SQSEvent.SQSMessage> SqsMessages => new() {  new SQSEvent.SQSMessage  {  MessageId = "1",  Body = "{\"Id\":1,\"Name\":\"product-4\",\"Price\":14}",  EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"  },  new SQSEvent.SQSMessage  {  MessageId = "2",  Body = "fail",  EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"  },  new SQSEvent.SQSMessage  {  MessageId = "3",  Body = "{\"Id\":3,\"Name\":\"product-4\",\"Price\":14}",  EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"  },  new SQSEvent.SQSMessage  {  MessageId = "4",  Body = "{\"Id\":4,\"Name\":\"product-4\",\"Price\":14}",  EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"  },  new SQSEvent.SQSMessage  {  MessageId = "5",  Body = "{\"Id\":5,\"Name\":\"product-4\",\"Price\":14}",  EventSourceArn = "arn:aws:sqs:us-east-2:123456789012:my-queue"  }, }; 

Complete Examples and Documentation

The BatchProcessing example contains complete working examples:

  • TypedFunction.cs - Complete examples using all typed batch processing patterns
  • TypedHandlers/ - Example implementations for SQS, Kinesis, and DynamoDB
  • Sample Events - Test events for all event types with typed data