Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions elasticapm/instrumentation/packages/botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,26 +164,33 @@ def handle_sns(operation_name, service, instance, args, kwargs, context):
return HandlerInfo(signature, span_type, span_subtype, span_action, context)


SQS_OPERATIONS = {
"SendMessage": {"span_action": "send", "signature": "SEND to"},
"SendMessageBatch": {"span_action": "send_batch", "signature": "SEND_BATCH to"},
"ReceiveMessage": {"span_action": "receive", "signature": "RECEIVE from"},
"DeleteMessage": {"span_action": "delete", "signature": "DELETE from"},
"DeleteMessageBatch": {"span_action": "delete_batch", "signature": "DELETE_BATCH from"},
}


def handle_sqs(operation_name, service, instance, args, kwargs, context):
if operation_name not in ("SendMessage", "SendMessageBatch", "ReceiveMessage"):
op = SQS_OPERATIONS.get(operation_name, None)
if not op:
# only "publish" is handled specifically, other endpoints get the default treatment
return False
span_type = "messaging"
span_subtype = "sqs"
span_action = "send" if operation_name in ("SendMessage", "SendMessageBatch") else "receive"
topic_name = ""
batch = "_BATCH" if operation_name == "SendMessageBatch" else ""
signature_type = "RECEIVE from" if span_action == "receive" else f"SEND{batch} to"

if len(args) > 1:
topic_name = args[1]["QueueUrl"].rsplit("/", maxsplit=1)[-1]
signature = f"SQS {signature_type} {topic_name}".rstrip() if topic_name else f"SQS {signature_type}"
signature = f"SQS {op['signature']} {topic_name}".rstrip() if topic_name else f"SQS {op['signature']}"
context["destination"]["service"] = {
"name": span_subtype,
"resource": f"{span_subtype}/{topic_name}" if topic_name else span_subtype,
"type": span_type,
}
return HandlerInfo(signature, span_type, span_subtype, span_action, context)
return HandlerInfo(signature, span_type, span_subtype, op["span_action"], context)


def modify_span_sqs(span, args, kwargs):
Expand All @@ -200,7 +207,9 @@ def modify_span_sqs(span, args, kwargs):
attributes_count = len(attributes)
if "MessageAttributes" in args[1]:
messages = [args[1]]
elif "Entries" in args[1]:
# both send_batch and delete_batch use the same "Entries" list. We only want to add the
# traceparent to send_batch. We use the existence of ReceiptHandle to differentiate between the two
elif "Entries" in args[1] and args[1]["Entries"] and "ReceiptHandle" not in args[1]["Entries"][0]:
messages = args[1]["Entries"]
else:
messages = []
Expand Down
2 changes: 1 addition & 1 deletion tests/instrumentation/asyncio_tests/aiobotocore_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ async def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queu
assert span["name"] == "SQS SEND_BATCH to myqueue"
assert span["type"] == "messaging"
assert span["subtype"] == "sqs"
assert span["action"] == "send"
assert span["action"] == "send_batch"
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert span["context"]["destination"]["service"]["name"] == "sqs"
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
Expand Down
68 changes: 57 additions & 11 deletions tests/instrumentation/botocore_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def test_sqs_send_batch(instrument, elasticapm_client, sqs_client_and_queue):
assert span["name"] == "SQS SEND_BATCH to myqueue"
assert span["type"] == "messaging"
assert span["subtype"] == "sqs"
assert span["action"] == "send"
assert span["action"] == "send_batch"
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert span["context"]["destination"]["service"]["name"] == "sqs"
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
Expand Down Expand Up @@ -324,7 +324,7 @@ def test_sqs_send_disttracing_dropped_span(instrument, elasticapm_client, sqs_cl
assert transaction.id in traceparent # due to DroppedSpan, transaction.id is used instead of span.id


def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
def test_sqs_receive_and_delete(instrument, elasticapm_client, sqs_client_and_queue):
sqs, queue_url = sqs_client_and_queue
sqs.send_message(
QueueUrl=queue_url,
Expand All @@ -341,13 +341,59 @@ def test_sqs_receive(instrument, elasticapm_client, sqs_client_and_queue):
"All",
],
)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=response["Messages"][0]["ReceiptHandle"])
elasticapm_client.end_transaction("test", "test")
span = elasticapm_client.events[constants.SPAN][0]
assert span["name"] == "SQS RECEIVE from myqueue"
assert span["type"] == "messaging"
assert span["subtype"] == "sqs"
assert span["action"] == "receive"
assert span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert span["context"]["destination"]["service"]["name"] == "sqs"
assert span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert span["context"]["destination"]["service"]["type"] == "messaging"

receive_span = elasticapm_client.events[constants.SPAN][0]
assert receive_span["name"] == "SQS RECEIVE from myqueue"
assert receive_span["type"] == "messaging"
assert receive_span["subtype"] == "sqs"
assert receive_span["action"] == "receive"
assert receive_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert receive_span["context"]["destination"]["service"]["name"] == "sqs"
assert receive_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert receive_span["context"]["destination"]["service"]["type"] == "messaging"

delete_span = elasticapm_client.events[constants.SPAN][1]
assert delete_span["name"] == "SQS DELETE from myqueue"
assert delete_span["type"] == "messaging"
assert delete_span["subtype"] == "sqs"
assert delete_span["action"] == "delete"
assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"


def test_sqs_delete_batch(instrument, elasticapm_client, sqs_client_and_queue):
sqs, queue_url = sqs_client_and_queue
sqs.send_message(
QueueUrl=queue_url,
MessageAttributes={
"Title": {"DataType": "String", "StringValue": "foo"},
},
MessageBody=("bar"),
)
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=["All"],
MessageAttributeNames=[
"All",
],
)
elasticapm_client.begin_transaction("test")
sqs.delete_message_batch(
QueueUrl=queue_url,
Entries=[{"Id": "foo", "ReceiptHandle": response["Messages"][0]["ReceiptHandle"]}],
)
elasticapm_client.end_transaction("test", "test")

delete_span = elasticapm_client.events[constants.SPAN][0]
assert delete_span["name"] == "SQS DELETE_BATCH from myqueue"
assert delete_span["type"] == "messaging"
assert delete_span["subtype"] == "sqs"
assert delete_span["action"] == "delete_batch"
assert delete_span["context"]["destination"]["cloud"]["region"] == "us-east-1"
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"