Skip to content
This repository was archived by the owner on Oct 1, 2020. It is now read-only.

Commit 519c39d

Browse files
committed
Convert event-replay back to batch API
1 parent e5513b0 commit 519c39d

File tree

3 files changed

+58
-35
lines changed

3 files changed

+58
-35
lines changed

pipelines/event-replay-pipeline/src/replay_messages.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,24 @@ def handler(event, context):
2020
LOG.info('No records in event')
2121
return
2222

23-
for record in event['Records']:
24-
request = _to_request(record)
25-
LOG.debug('Replaying event to SQS queue: queue URL=%s, request=%s', DESTINATION_SQS_QUEUE_URL, request)
26-
response = QUEUE.send_message(**request)
27-
LOG.debug('SQS response: %s', response)
23+
entries = [_to_request_record(str(i), r) for i, r in enumerate(event['Records'])]
24+
LOG.debug('Replaying events to SQS queue: queue URL=%s, entries=%s', DESTINATION_SQS_QUEUE_URL, entries)
25+
response = QUEUE.send_messages(Entries=entries)
26+
LOG.debug('SQS response: %s', response)
27+
if response.get('Failed'):
28+
raise RuntimeError('Failed to send messages to destination queue: queue URL={}, failures={}'.format(
29+
DESTINATION_SQS_QUEUE_URL, response['Failed']
30+
))
2831

2932

30-
def _to_request(record):
33+
def _to_request_record(id, record):
3134
# SQS event lowercases message attribute property names so have to capitalize them again before sending to SQS
3235
message_attributes = {attr_name: {_capitalize(attr_prop_name): attr_prop_value
3336
for (attr_prop_name, attr_prop_value) in attr_props.items()
3437
if attr_prop_value}
3538
for (attr_name, attr_props) in record['messageAttributes'].items()}
3639
return {
40+
'Id': id,
3741
'MessageBody': record['body'],
3842
'MessageAttributes': message_attributes
3943
}

pipelines/event-replay-pipeline/template.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ Resources:
9090
Effect: Allow
9191
Action:
9292
- sqs:GetQueueUrl
93-
- sqs:SendMessage
93+
- sqs:SendMessage*
9494
Resource:
9595
- !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${DestinationSQSQueueName}
9696

pipelines/event-replay-pipeline/test/unit/test_replay_messages.py

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,17 @@ def replay_messages(mocker):
1515
mocker.patch.object(boto3, 'resource')
1616
boto3.client.return_value = mock_sqs_client
1717
boto3.resource.return_value = mock_sqs_client
18-
mock_queue = MagicMock()
19-
mock_sqs_client.Topic.return_value = mock_queue
2018
import replay_messages
2119
return replay_messages
2220

2321

2422
def test_handler_no_records(mocker, replay_messages):
2523
replay_messages.handler({}, None)
26-
replay_messages.QUEUE.send_message.assert_not_called()
24+
replay_messages.QUEUE.send_messages.assert_not_called()
2725

2826

2927
def test_handler_success(mocker, replay_messages):
30-
replay_messages.QUEUE.send_message.return_value = {}
28+
replay_messages.QUEUE.send_messages.return_value = {}
3129

3230
event = {
3331
'Records': [
@@ -69,29 +67,50 @@ def test_handler_success(mocker, replay_messages):
6967
}
7068
replay_messages.handler(event, None)
7169

72-
replay_messages.QUEUE.send_message.assert_any_call(
73-
MessageBody='foo',
74-
MessageAttributes={
75-
'fooAttr1': {
76-
'DataType': 'String',
77-
'StringValue': 'fooAttr1Value'
78-
},
79-
'fooAttr2': {
80-
'DataType': 'String',
81-
'StringValue': 'fooAttr2Value'
70+
expected_entries = [
71+
{
72+
'Id': '0',
73+
'MessageBody': 'foo',
74+
'MessageAttributes': {
75+
'fooAttr1': {
76+
'DataType': 'String',
77+
'StringValue': 'fooAttr1Value'
78+
},
79+
'fooAttr2': {
80+
'DataType': 'String',
81+
'StringValue': 'fooAttr2Value'
82+
}
8283
}
83-
}
84-
)
85-
replay_messages.QUEUE.send_message.assert_any_call(
86-
MessageBody='bar',
87-
MessageAttributes={
88-
'barAttr1': {
89-
'DataType': 'String',
90-
'StringValue': 'barAttr1Value'
91-
},
92-
'barAttr2': {
93-
'DataType': 'String',
94-
'StringValue': 'barAttr2Value'
84+
},
85+
{
86+
'Id': '1',
87+
'MessageBody': 'bar',
88+
'MessageAttributes': {
89+
'barAttr1': {
90+
'DataType': 'String',
91+
'StringValue': 'barAttr1Value'
92+
},
93+
'barAttr2': {
94+
'DataType': 'String',
95+
'StringValue': 'barAttr2Value'
96+
}
9597
}
96-
}
97-
)
98+
},
99+
]
100+
replay_messages.QUEUE.send_messages.assert_called_with(Entries=expected_entries)
101+
102+
103+
def test_handler_failure(mocker, replay_messages):
104+
replay_messages.QUEUE.send_messages.return_value = {'Failed': 'True'}
105+
106+
event = {
107+
'Records': [
108+
{
109+
'body': 'foo',
110+
'messageAttributes': {}
111+
}
112+
]
113+
}
114+
115+
with pytest.raises(RuntimeError):
116+
replay_messages.handler(event, None)

0 commit comments

Comments
 (0)