DEV Community

Cover image for Moving Big Number Of Messages Between SQS Queues
πŸš€ Vu Dao πŸš€
πŸš€ Vu Dao πŸš€

Posted on

Moving Big Number Of Messages Between SQS Queues

- Dead-letter queue is used to send undeliverable messages to a dead-letter queue. Sometimes, for example, if there’s a bug in the worker code, you can configure SQS to send such problematic messages to a dead-letter queue (DLQ), where you can inspect them in isolation and work out what went wrong.

- Once we’ve found the problem in the worker, fixed the bug and deployed a new version, we want to send all the messages from the DLQ back to the original input queue, so they can be processed by the updated worker. There’s no way to do this in SQS directly, so we’ve written a script to do it for us.

- The automation script moves SQS messages between queues with proper way to avoid impact to other services which using the same queue. This post also provides CDK code to create SQS queues to test and the python script to generate a bunch of messages.

Alt Text


What’s In This Document


πŸš€ Create SQS queues using CDK

1. Init CDK project

⚑ $ cdk init -l python Applying project template app for python # Welcome to your CDK Python project! This is a blank project for Python development with CDK. The `cdk.json` file tells the CDK Toolkit how to execute your app. βœ… All done! 
Enter fullscreen mode Exit fullscreen mode

2. Create CDK stacks

from aws_cdk import ( core, aws_sqs as sqs ) class SqsStackStack(core.Stack): def __init__(self, scope: core.Construct, construct_id: str, env, **kwargs) -> None: super().__init__(scope, construct_id, env=env, **kwargs) # Create my-queue my_dl_queue = sqs.Queue(self, id="SQSTestDLQueueMovingMsg", queue_name="my_queue_dl_test") my_queue = sqs.Queue(self, id="SQSTestMovingMsg", queue_name="my_queue_test", dead_letter_queue=sqs.DeadLetterQueue(max_receive_count=1, queue=my_dl_queue) ) 
Enter fullscreen mode Exit fullscreen mode
  • Deploy stacks
⚑ $ cdk ls sqs-stack ⚑ $ cdk deploy sqs-stack: deploying... βœ… sqs-stack Stack ARN: arn:aws:cloudformation:ap-northeast-2:111111111111:stack/sqs-stack/ff3d8c20-6ee2-11eb-9f14-02d3758fdfbc 
Enter fullscreen mode Exit fullscreen mode

Alt Text
Alt Text

πŸš€ Generate 100 Messages to Dead-letter Queue For Testing

import boto3 import uuid def gen_msg(): msg = list() for i in range(0, 100): body = f"Hello {i}" msg.append({"Body": body}) if len(msg) > 9: yield msg msg = list() sqs_client = boto3.client("sqs", region_name='ap-northeast-2') sqs_url = 'https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_dl_test' while True: for message_batch in gen_msg(): sqs_client.send_message_batch( QueueUrl=sqs_url, Entries=[{"Id": str(uuid.uuid4()), "MessageBody": message["Body"]} for message in message_batch] ) else: break 
Enter fullscreen mode Exit fullscreen mode
  • Run
⚑ $ python sendSQSMsg.py 
Enter fullscreen mode Exit fullscreen mode

Alt Text

πŸš€ Move All Messages At Once

#!/usr/bin/env python """ Move all the messages from one SQS queue to another. Usage: redrive_sqs_queue.py --src-url=<SRC_QUEUE_URL> --dst-url=<DST_QUEUE_URL> --max-msg=<MAX_MSG_PROCESS> redrive_sqs_queue.py -h | --help """ import argparse import itertools import sys import uuid import boto3 from pprint import pprint def parse_args(): """Parse command-line arguments.""" parser = argparse.ArgumentParser( description="Move all the messages from one SQS queue to another." ) parser.add_argument("-s", "--src-url", required=True, help="Queue to read messages from") parser.add_argument("-d", "--dst-url", required=True, help="Queue to move messages to") parser.add_argument("-m", "--max-msg", required=False, type=int, help="Max number of messages to process, no limit if not specify." "Use this to limit the message processed to avoid pending message from others") parser.add_argument("-r", "--region", required=True, help="Region of the SQS queue, assume src and dest SQS" "is in the same region") return parser.parse_args() def get_messages_from_queue(sqs_client, queue_url, max_nr_msg=0): """Generates messages from an SQS queue. Note: this continues to generate messages until the queue is empty. Every message on the queue will be deleted. :param queue_url: URL of the SQS queue to read. :param sqs_client: boto3 sqs client to connect to AWS SQS """ msg_processed = 0 while True: resp = sqs_client.receive_message( QueueUrl=queue_url, AttributeNames=["All"], MaxNumberOfMessages=10 ) entries = [] try: msg_to_copy = resp["Messages"] if max_nr_msg > 0: if msg_processed >= max_nr_msg or msg_to_copy == 0: pprint("Done") break elif (len(msg_to_copy) + msg_processed) > max_nr_msg: msg_to_copy = resp["Messages"][0: (max_nr_msg - msg_processed)] msg_processed += len(msg_to_copy) except KeyError as err: """ Don't return, need to delete messages """ print(f'found no messages') msg_to_copy = list() for msg in msg_to_copy: try: entries.append({"Id": msg["MessageId"], "ReceiptHandle": msg["ReceiptHandle"]}) yield msg except KeyError: print('failed parsing key') continue if entries: resp = sqs_client.delete_message_batch(QueueUrl=queue_url, Entries=entries) if len(resp["Successful"]) != len(entries): raise RuntimeError(f"Failed to delete messages: entries={entries!r} resp={resp!r}") def chunked_iterable(iterable, *, size): """ Read ``iterable`` in chunks of size ``size``. """ it = iter(iterable) while True: chunk = tuple(itertools.islice(it, size)) if not chunk: break yield chunk if __name__ == "__main__": args = parse_args() src_queue_url = args.src_url dst_queue_url = args.dst_url region = args.region max_msg = args.max_msg if not max_msg: max_msg = 0 if src_queue_url == dst_queue_url: sys.exit("Source and destination queues cannot be the same.") sqs_client = boto3.client("sqs", region_name=region) messages = get_messages_from_queue(sqs_client, queue_url=src_queue_url, max_nr_msg=max_msg) # The SendMessageBatch API supports sending up to ten messages at once. for message_batch in chunked_iterable(messages, size=10): print(f"Writing {len(message_batch):2d} messages to {dst_queue_url}") sqs_client.send_message_batch( QueueUrl=dst_queue_url, Entries=[{"Id": str(uuid.uuid4()), "MessageBody": message["Body"]} for message in message_batch] ) 
Enter fullscreen mode Exit fullscreen mode
  • Run
⚑ $ python redrive_sqs_queue.py -s https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_dl_test -d https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test -r ap-northeast-2 Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test found no messages, Done! 
Enter fullscreen mode Exit fullscreen mode

πŸš€ Move Half Messages To Avoid Workload

⚑ $ python redrive_sqs_queue.py -s https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_dl_test -d https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test -r ap-northeast-2 -m 50 Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test Writing 10 messages to https://sqs.ap-northeast-2.amazonaws.com/111111111111/my_queue_test 'Done' 
Enter fullscreen mode Exit fullscreen mode

Alt Text

πŸš€ Destroy Test Queues

⚑ $ cdk destroy Are you sure you want to delete: sqs-stack (y/n)? y sqs-stack: destroying... 11:31:35 PM | DELETE_IN_PROGRESS | AWS::CloudFormation::Stack | sqs-stack 11:31:35 PM | DELETE_IN_PROGRESS | AWS::CloudFormation::Stack | sqs-stack 11:32:38 PM | DELETE_IN_PROGRESS | AWS::SQS::Queue | SQSTestDLQueueMovingMsg βœ… sqs-stack: destroyed 
Enter fullscreen mode Exit fullscreen mode

🌠 Blog · Github · Web · Linkedin · Group · Page · Twitter 🌠

Top comments (0)