Skip to content

Commit 2e03f1d

Browse files
authored
add specific support for SQS delete/delete_batch (#1567)
* add specific support for SQS delete/delete_batch closes #1432 * update aiobotocore * update changelog
1 parent c638501 commit 2e03f1d

File tree

4 files changed

+75
-19
lines changed

4 files changed

+75
-19
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ endif::[]
3939
* Add instrumentation for https://github.com/aio-libs/aiobotocore[`aiobotocore`] {pull}1520[#1520]
4040
* Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555]
4141
* Add API for span links, and implement span link support for OpenTelemetry bridge {pull}1562[#1562]
42+
* Add specific instrumentation for SQS delete/batch-delete {pull}1567[#1567]
4243
4344
[float]
4445
===== Bug fixes

elasticapm/instrumentation/packages/botocore.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,26 +164,33 @@ def handle_sns(operation_name, service, instance, args, kwargs, context):
164164
return HandlerInfo(signature, span_type, span_subtype, span_action, context)
165165

166166

167+
SQS_OPERATIONS = {
168+
"SendMessage": {"span_action": "send", "signature": "SEND to"},
169+
"SendMessageBatch": {"span_action": "send_batch", "signature": "SEND_BATCH to"},
170+
"ReceiveMessage": {"span_action": "receive", "signature": "RECEIVE from"},
171+
"DeleteMessage": {"span_action": "delete", "signature": "DELETE from"},
172+
"DeleteMessageBatch": {"span_action": "delete_batch", "signature": "DELETE_BATCH from"},
173+
}
174+
175+
167176
def handle_sqs(operation_name, service, instance, args, kwargs, context):
168-
if operation_name not in ("SendMessage", "SendMessageBatch", "ReceiveMessage"):
177+
op = SQS_OPERATIONS.get(operation_name, None)
178+
if not op:
169179
# only "publish" is handled specifically, other endpoints get the default treatment
170180
return False
171181
span_type = "messaging"
172182
span_subtype = "sqs"
173-
span_action = "send" if operation_name in ("SendMessage", "SendMessageBatch") else "receive"
174183
topic_name = ""
175-
batch = "_BATCH" if operation_name == "SendMessageBatch" else ""
176-
signature_type = "RECEIVE from" if span_action == "receive" else f"SEND{batch} to"
177184

178185
if len(args) > 1:
179186
topic_name = args[1]["QueueUrl"].rsplit("/", maxsplit=1)[-1]
180-
signature = f"SQS {signature_type} {topic_name}".rstrip() if topic_name else f"SQS {signature_type}"
187+
signature = f"SQS {op['signature']} {topic_name}".rstrip() if topic_name else f"SQS {op['signature']}"
181188
context["destination"]["service"] = {
182189
"name": span_subtype,
183190
"resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype,
184191
"type": span_type,
185192
}
186-
return HandlerInfo(signature, span_type, span_subtype, span_action, context)
193+
return HandlerInfo(signature, span_type, span_subtype, op["span_action"], context)
187194

188195

189196
def modify_span_sqs(span, args, kwargs):
@@ -200,7 +207,9 @@ def modify_span_sqs(span, args, kwargs):
200207
attributes_count = len(attributes)
201208
if "MessageAttributes" in args[1]:
202209
messages = [args[1]]
203-
elif "Entries" in args[1]:
210+
# both send_batch and delete_batch use the same "Entries" list. We only want to add the
211+
# traceparent to send_batch. We use the existence of ReceiptHandle to differentiate between the two
212+
elif "Entries" in args[1] and args[1]["Entries"] and "ReceiptHandle" not in args[1]["Entries"][0]:
204213
messages = args[1]["Entries"]
205214
else:
206215
messages = []

tests/instrumentation/asyncio_tests/aiobotocore_tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ async def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queu
259259
assert span["name"] == "SQS SEND_BATCH to myqueue"
260260
assert span["type"] == "messaging"
261261
assert span["subtype"] == "sqs"
262-
assert span["action"] == "send"
262+
assert span["action"] == "send_batch"
263263
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
264264
assert span["context"]["destination"]["service"]["name"] == "sqs"
265265
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"

tests/instrumentation/botocore_tests.py

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
255255
assert span["name"] == "SQS SEND_BATCH to myqueue"
256256
assert span["type"] == "messaging"
257257
assert span["subtype"] == "sqs"
258-
assert span["action"] == "send"
258+
assert span["action"] == "send_batch"
259259
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
260260
assert span["context"]["destination"]["service"]["name"] == "sqs"
261261
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
@@ -324,7 +324,7 @@ def test_sqs_send_disttracing_dropped_span(instrument, elasticapm_client, sqs_cl
324324
assert transaction.id in traceparent # due to DroppedSpan, transaction.id is used instead of span.id
325325

326326

327-
def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
327+
def test_sqs_receive_and_delete(instrument, elasticapm_client, sqs_client_and_queue):
328328
sqs, queue_url = sqs_client_and_queue
329329
sqs.send_message(
330330
QueueUrl=queue_url,
@@ -341,13 +341,59 @@ def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
341341
"All",
342342
],
343343
)
344+
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"])
344345
elasticapm_client.end_transaction("test", "test")
345-
span = elasticapm_client.events[constants.SPAN][0]
346-
assert span["name"] == "SQS RECEIVE from myqueue"
347-
assert span["type"] == "messaging"
348-
assert span["subtype"] == "sqs"
349-
assert span["action"] == "receive"
350-
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
351-
assert span["context"]["destination"]["service"]["name"] == "sqs"
352-
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
353-
assert span["context"]["destination"]["service"]["type"] == "messaging"
346+
347+
receive_span = elasticapm_client.events[constants.SPAN][0]
348+
assert receive_span["name"] == "SQS RECEIVE from myqueue"
349+
assert receive_span["type"] == "messaging"
350+
assert receive_span["subtype"] == "sqs"
351+
assert receive_span["action"] == "receive"
352+
assert receive_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
353+
assert receive_span["context"]["destination"]["service"]["name"] == "sqs"
354+
assert receive_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
355+
assert receive_span["context"]["destination"]["service"]["type"] == "messaging"
356+
357+
delete_span = elasticapm_client.events[constants.SPAN][1]
358+
assert delete_span["name"] == "SQS DELETE from myqueue"
359+
assert delete_span["type"] == "messaging"
360+
assert delete_span["subtype"] == "sqs"
361+
assert delete_span["action"] == "delete"
362+
assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
363+
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
364+
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
365+
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"
366+
367+
368+
def test_sqs_delete_batch(instrument, elasticapm_client, sqs_client_and_queue):
369+
sqs, queue_url = sqs_client_and_queue
370+
sqs.send_message(
371+
QueueUrl=queue_url,
372+
MessageAttributes={
373+
"Title": {"DataType": "String", "StringValue": "foo"},
374+
},
375+
MessageBody=("bar"),
376+
)
377+
response = sqs.receive_message(
378+
QueueUrl=queue_url,
379+
AttributeNames=["All"],
380+
MessageAttributeNames=[
381+
"All",
382+
],
383+
)
384+
elasticapm_client.begin_transaction("test")
385+
sqs.delete_message_batch(
386+
QueueUrl=queue_url,
387+
Entries=[{"Id": "foo", "ReceiptHandle": response["Messages"][0]["ReceiptHandle"]}],
388+
)
389+
elasticapm_client.end_transaction("test", "test")
390+
391+
delete_span = elasticapm_client.events[constants.SPAN][0]
392+
assert delete_span["name"] == "SQS DELETE_BATCH from myqueue"
393+
assert delete_span["type"] == "messaging"
394+
assert delete_span["subtype"] == "sqs"
395+
assert delete_span["action"] == "delete_batch"
396+
assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
397+
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
398+
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
399+
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"

0 commit comments

Comments
 (0)