Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ endif::[]
* Add instrumentation for https://github.com/aio-libs/aiobotocore[`aiobotocore`] {pull}1520[#1520]
* Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555]
* Add API for span links, and implement span link support for OpenTelemetry bridge {pull}1562[#1562]
* Add span links to SQS ReceiveMessage call {pull}1575[#1575]
* Add specific instrumentation for SQS delete/batch-delete {pull}1567[#1567]
* Add `trace_continuation_strategy` setting {pull}1564[#1564]

Expand Down
15 changes: 11 additions & 4 deletions elasticapm/instrumentation/packages/asyncio/aiobotocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from elasticapm.contrib.asyncio.traces import async_capture_span
from elasticapm.instrumentation.packages.botocore import BotocoreInstrumentation, span_modifiers
from elasticapm.instrumentation.packages.botocore import (
BotocoreInstrumentation,
post_span_modifiers,
pre_span_modifiers,
)


class AioBotocoreInstrumentation(BotocoreInstrumentation):
Expand All @@ -44,6 +48,9 @@ async def call(self, module, method, wrapped, instance, args, kwargs):

ctx = self._call(service, instance, args, kwargs)
async with ctx as span:
if service in span_modifiers:
span_modifiers[service](span, args, kwargs)
return await wrapped(*args, **kwargs)
if service in pre_span_modifiers:
pre_span_modifiers[service](span, args, kwargs)
result = await wrapped(*args, **kwargs)
if service in post_span_modifiers:
post_span_modifiers[service](span, args, kwargs, result)
return result
68 changes: 43 additions & 25 deletions elasticapm/instrumentation/packages/botocore.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@

from elasticapm.conf import constants
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import capture_span, execution_context
from elasticapm.traces import SpanType, capture_span, execution_context
from elasticapm.utils.disttracing import TraceParent
from elasticapm.utils.logging import get_logger

logger = get_logger("elasticapm.instrument")
Expand All @@ -59,10 +60,7 @@ def _call(self, service, instance, args, kwargs):
This is split out from `call()` so that it can be re-used by the
aiobotocore instrumentation without duplicating all of this code.
"""
if "operation_name" in kwargs:
operation_name = kwargs["operation_name"]
else:
operation_name = args[0]
operation_name = kwargs.get("operation_name", args[0])

parsed_url = urllib.parse.urlparse(instance.meta.endpoint_url)
context = {
Expand Down Expand Up @@ -103,9 +101,12 @@ def call(self, module, method, wrapped, instance, args, kwargs):

ctx = self._call(service, instance, args, kwargs)
with ctx as span:
if service in span_modifiers:
span_modifiers[service](span, args, kwargs)
return wrapped(*args, **kwargs)
if service in pre_span_modifiers:
pre_span_modifiers[service](span, args, kwargs)
result = wrapped(*args, **kwargs)
if service in post_span_modifiers:
post_span_modifiers[service](span, args, kwargs, result)
return result


def handle_s3(operation_name, service, instance, args, kwargs, context):
Expand Down Expand Up @@ -193,7 +194,8 @@ def handle_sqs(operation_name, service, instance, args, kwargs, context):
return HandlerInfo(signature, span_type, span_subtype, op["span_action"], context)


def modify_span_sqs(span, args, kwargs):
def modify_span_sqs_pre(span, args, kwargs):
operation_name = kwargs.get("operation_name", args[0])
if span.id:
trace_parent = span.transaction.trace_parent.copy_from(span_id=span.id)
else:
Expand All @@ -204,21 +206,33 @@ def modify_span_sqs(span, args, kwargs):
if trace_parent.tracestate:
attributes[constants.TRACESTATE_HEADER_NAME] = {"DataType": "String", "StringValue": trace_parent.tracestate}
if len(args) > 1:
attributes_count = len(attributes)
if "MessageAttributes" in args[1]:
messages = [args[1]]
# both send_batch and delete_batch use the same "Entries" list. We only want to add the
# traceparent to send_batch. We use the existence of ReceiptHandle to differentiate between the two
elif "Entries" in args[1] and args[1]["Entries"] and "ReceiptHandle" not in args[1]["Entries"][0]:
messages = args[1]["Entries"]
else:
messages = []
for message in messages:
message["MessageAttributes"] = message.get("MessageAttributes") or {}
if len(message["MessageAttributes"]) + attributes_count <= SQS_MAX_ATTRIBUTES:
message["MessageAttributes"].update(attributes)
if operation_name in ("SendMessage", "SendMessageBatch"):
attributes_count = len(attributes)
if operation_name == "SendMessage":
messages = [args[1]]
else:
logger.info("Not adding disttracing headers to message due to attribute limit reached")
messages = args[1]["Entries"]
for message in messages:
message["MessageAttributes"] = message.get("MessageAttributes") or {}
if len(message["MessageAttributes"]) + attributes_count <= SQS_MAX_ATTRIBUTES:
message["MessageAttributes"].update(attributes)
else:
logger.info("Not adding disttracing headers to message due to attribute limit reached")
elif operation_name == "ReceiveMessage":
message_attributes = args[1].setdefault("MessageAttributeNames", [])
if "All" not in message_attributes:
message_attributes.extend([constants.TRACEPARENT_HEADER_NAME, constants.TRACESTATE_HEADER_NAME])


def modify_span_sqs_post(span: SpanType, args, kwargs, result):
operation_name = kwargs.get("operation_name", args[0])
if operation_name == "ReceiveMessage" and "Messages" in result:
for message in result["Messages"][:1000]: # only up to 1000 span links are recorded
if "MessageAttributes" in message and constants.TRACEPARENT_HEADER_NAME in message["MessageAttributes"]:
tp = TraceParent.from_string(
message["MessageAttributes"][constants.TRACEPARENT_HEADER_NAME]["StringValue"]
)
span.add_link(tp)


def handle_default(operation_name, service, instance, args, kwargs, destination):
Expand All @@ -240,6 +254,10 @@ def handle_default(operation_name, service, instance, args, kwargs, destination)
"default": handle_default,
}

span_modifiers = {
"SQS": modify_span_sqs,
pre_span_modifiers = {
"SQS": modify_span_sqs_pre,
}

post_span_modifiers = {
"SQS": modify_span_sqs_post,
}
21 changes: 21 additions & 0 deletions tests/instrumentation/botocore_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,3 +425,24 @@ def test_sqs_delete_batch(instrument, elasticapm_client, sqs_client_and_queue):
assert delete_span["context"]["destination"]["service"]["name"] == "sqs"
assert delete_span["context"]["destination"]["service"]["resource"] == "sqs/myqueue"
assert delete_span["context"]["destination"]["service"]["type"] == "messaging"


def test_sqs_receive_message_span_links(instrument, elasticapm_client, sqs_client_and_queue):
sqs, queue_url = sqs_client_and_queue
send_transaction = elasticapm_client.begin_transaction("test")
sqs.send_message(
QueueUrl=queue_url,
MessageBody=("bar"),
)
elasticapm_client.end_transaction("test")
receive_transaction = elasticapm_client.begin_transaction("test")
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=["All"],
)
assert len(response["Messages"]) == 1
elasticapm_client.end_transaction("test")
send_span = elasticapm_client.events[constants.SPAN][0]
receive_span = elasticapm_client.events[constants.SPAN][1]
assert receive_span["links"][0]["trace_id"] == send_transaction.trace_parent.trace_id
assert receive_span["links"][0]["span_id"] == send_span["id"]