Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ endif::[]

* Add instrumentation for https://github.com/aio-libs/aiobotocore[`aiobotocore`] {pull}1520[#1520]
* Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555]
* Add API for span links, and implement span link support for OpenTelemetry bridge {pull}1562[#1562]

[float]
===== Bug fixes
Expand Down
31 changes: 17 additions & 14 deletions docs/api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ client.begin_transaction('processors')
----

* `transaction_type`: (*required*) A string describing the type of the transaction, e.g. `'request'` or `'celery'`.
* `trace_parent`: (*optional*) A TraceParent object. See <<traceparent-api, TraceParent generation>>.
* `trace_parent`: (*optional*) A `TraceParent` object. See <<traceparent-api, TraceParent generation>>.
* `links`: (*optional*) A list of `TraceParent` objects to which this transaction is causally linked.

[float]
[[client-api-end-transaction]]
Expand All @@ -164,10 +165,10 @@ they have to be set beforehand by calling <<api-set-transaction-name, `elasticap

[float]
[[traceparent-api]]
==== TraceParent
==== `TraceParent`

Transactions can be started with a TraceParent object. This creates a
transaction that is a child of the TraceParent, which is essential for
Transactions can be started with a `TraceParent` object. This creates a
transaction that is a child of the `TraceParent`, which is essential for
distributed tracing.

[float]
Expand All @@ -176,7 +177,7 @@ distributed tracing.

[small]#Added in v5.6.0.#

Create a TraceParent object from the string representation generated by
Create a `TraceParent` object from the string representation generated by
`TraceParent.to_string()`:

[source,python]
Expand All @@ -185,7 +186,7 @@ parent = elasticapm.trace_parent_from_string('00-03d67dcdd62b7c0f7a675424347eee3
client.begin_transaction('processors', trace_parent=parent)
----

* `traceparent_string`: (*required*) A string representation of a TraceParent object.
* `traceparent_string`: (*required*) A string representation of a `TraceParent` object.


[float]
Expand All @@ -194,7 +195,7 @@ client.begin_transaction('processors', trace_parent=parent)

[small]#Added in v5.6.0.#

Create a TraceParent object from HTTP headers (usually generated by another
Create a `TraceParent` object from HTTP headers (usually generated by another
Elastic APM agent):

[source,python]
Expand All @@ -211,7 +212,7 @@ client.begin_transaction('processors', trace_parent=parent)

[small]#Added in v5.10.0.#

Return the string representation of the current transaction TraceParent object:
Return the string representation of the current transaction `TraceParent` object:

[source,python]
----
Expand Down Expand Up @@ -469,12 +470,14 @@ def coffee_maker(strength):
----

* `name`: The name of the span
* `span_type`: The type of the span, usually in a dot-separated hierarchy of `type`, `subtype`, and `action`, e.g. `db.mysql.query`. Alternatively, type, subtype and action can be provided as three separate arguments, see `span_subtype` and `span_action`.
* `skip_frames`: The number of stack frames to skip when collecting stack traces. Defaults to `0`.
* `leaf`: if `True`, all spans nested bellow this span will be ignored. Defaults to `False`.
* `labels`: a dictionary of labels. Keys must be strings, values can be strings, booleans, or numerical (`int`, `float`, `decimal.Decimal`). Defaults to `None`.
* `span_subtype`: subtype of the span, e.g. name of the database. Defaults to `None`.
* `span_action`: action of the span, e.g. `query`. Defaults to `None`
* `span_type`: (*optional*) The type of the span, usually in a dot-separated hierarchy of `type`, `subtype`, and `action`, e.g. `db.mysql.query`. Alternatively, type, subtype and action can be provided as three separate arguments, see `span_subtype` and `span_action`.
* `skip_frames`: (*optional*) The number of stack frames to skip when collecting stack traces. Defaults to `0`.
* `leaf`: (*optional*) if `True`, all spans nested bellow this span will be ignored. Defaults to `False`.
* `labels`: (*optional*) a dictionary of labels. Keys must be strings, values can be strings, booleans, or numerical (`int`, `float`, `decimal.Decimal`). Defaults to `None`.
* `span_subtype`: (*optional*) subtype of the span, e.g. name of the database. Defaults to `None`.
* `span_action`: (*optional*) action of the span, e.g. `query`. Defaults to `None`
* `links`: (*optional*) A list of `TraceParent` objects to which this span is causally linked.


[float]
[[api-async-capture-span]]
Expand Down
14 changes: 11 additions & 3 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
import warnings
from copy import deepcopy
from datetime import timedelta
from typing import Optional, Tuple
from typing import Optional, Sequence, Tuple

import elasticapm
from elasticapm.conf import Config, VersionedConfig, constants
from elasticapm.conf.constants import ERROR
from elasticapm.metrics.base_metrics import MetricsRegistry
from elasticapm.traces import Tracer, execution_context
from elasticapm.utils import cgroup, cloud, compat, is_master_process, stacks, varmap
from elasticapm.utils.disttracing import TraceParent
from elasticapm.utils.encoding import enforce_label_format, keyword_field, shorten, transform
from elasticapm.utils.logging import get_logger
from elasticapm.utils.module_import import import_string
Expand Down Expand Up @@ -288,7 +289,14 @@ def queue(self, event_type, data, flush=False):
flush = False
self._transport.queue(event_type, data, flush)

def begin_transaction(self, transaction_type, trace_parent=None, start=None, auto_activate=True):
def begin_transaction(
self,
transaction_type: str,
trace_parent: Optional[TraceParent] = None,
start: Optional[float] = None,
auto_activate: bool = True,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Register the start of a transaction on the client

Expand All @@ -300,7 +308,7 @@ def begin_transaction(self, transaction_type, trace_parent=None, start=None, aut
"""
if self.config.is_recording:
return self.tracer.begin_transaction(
transaction_type, trace_parent=trace_parent, start=start, auto_activate=auto_activate
transaction_type, trace_parent=trace_parent, start=start, auto_activate=auto_activate, links=links
)

def end_transaction(self, name=None, result="", duration=None):
Expand Down
17 changes: 9 additions & 8 deletions elasticapm/contrib/opentelemetry/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

from opentelemetry import trace as trace_api
from opentelemetry.sdk import trace as oteltrace
from opentelemetry.trace import Context, SpanKind
from opentelemetry.trace import Context, Link, SpanKind
from opentelemetry.trace.propagation import _SPAN_KEY
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.util import types
Expand Down Expand Up @@ -76,7 +76,7 @@ def start_span(
context: Optional[Context] = None,
kind: SpanKind = SpanKind.INTERNAL,
attributes: types.Attributes = None,
links: Optional[Sequence[Any]] = None,
links: Optional[Sequence[Link]] = None,
start_time: Optional[int] = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
Expand Down Expand Up @@ -116,8 +116,6 @@ def start_span(
Returns:
The newly-created span.
"""
if links:
logger.warning("The opentelemetry bridge does not support links at this time.")
if not record_exception:
logger.warning("record_exception was set to False, but exceptions will still be recorded for this span.")

Expand All @@ -130,14 +128,15 @@ def start_span(
current_transaction = execution_context.get_transaction()
client = self.client

elastic_links = tuple(get_traceparent(link.context) for link in links) if links else None
if traceparent and current_transaction:
logger.warning(
"Remote context included when a transaction was already active. "
"Ignoring remote context and creating a Span instead."
)
elif traceparent:
elastic_span = client.begin_transaction(
"otel", traceparent=traceparent, start=start_time, auto_activate=False
"otel", trace_parent=traceparent, start=start_time, auto_activate=False, links=elastic_links
)
span = Span(
name=name,
Expand All @@ -147,7 +146,7 @@ def start_span(
)
span.set_attributes(attributes)
elif not current_transaction:
elastic_span = client.begin_transaction("otel", start=start_time, auto_activate=False)
elastic_span = client.begin_transaction("otel", start=start_time, auto_activate=False, links=elastic_links)
span = Span(
name=name,
elastic_span=elastic_span,
Expand All @@ -156,7 +155,9 @@ def start_span(
)
span.set_attributes(attributes)
else:
elastic_span = current_transaction.begin_span(name, "otel", start=start_time, auto_activate=False)
elastic_span = current_transaction.begin_span(
name, "otel", start=start_time, auto_activate=False, links=elastic_links
)
span = Span(
name=name,
elastic_span=elastic_span,
Expand All @@ -176,7 +177,7 @@ def start_as_current_span(
context: Optional[Context] = None,
kind: SpanKind = SpanKind.INTERNAL,
attributes: types.Attributes = None,
links: Optional[Sequence[Any]] = None,
links: Optional[Sequence[Link]] = None,
start_time: Optional[int] = None,
record_exception: bool = True,
set_status_on_exception: bool = True,
Expand Down
42 changes: 36 additions & 6 deletions elasticapm/traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from collections import defaultdict
from datetime import timedelta
from types import TracebackType
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union

import elasticapm
from elasticapm.conf import constants
Expand Down Expand Up @@ -91,7 +91,7 @@ def duration(self) -> timedelta:


class BaseSpan(object):
def __init__(self, labels=None, start=None):
def __init__(self, labels=None, start=None, links: Optional[Sequence[TraceParent]] = None):
self._child_durations = ChildDuration(self)
self.labels = {}
self.outcome: Optional[str] = None
Expand All @@ -100,7 +100,10 @@ def __init__(self, labels=None, start=None):
self.start_time: float = time_to_perf_counter(start) if start is not None else _time_func()
self.ended_time: Optional[float] = None
self.duration: Optional[timedelta] = None
self.links: List[Dict[str, str]] = []
self.links: Optional[List[Dict[str, str]]] = None
if links:
for trace_parent in links:
self.add_link(trace_parent)
if labels:
self.label(**labels)

Expand Down Expand Up @@ -149,6 +152,8 @@ def add_link(self, trace_parent: TraceParent) -> None:
"""
Causally link this span/transaction to another span/transaction
"""
if self.links is None:
self.links = []
self.links.append({"trace_id": trace_parent.trace_id, "span_id": trace_parent.span_id})

def set_success(self):
Expand All @@ -175,6 +180,7 @@ def __init__(
is_sampled: bool = True,
start: Optional[float] = None,
sample_rate: Optional[float] = None,
links: Optional[Sequence[TraceParent]] = None,
):
"""
tracer
Expand All @@ -193,6 +199,8 @@ def __init__(
Sample rate which was used to decide whether to sample this transaction.
This is reported to the APM server so that unsampled transactions can
be extrapolated.
links:
A list of traceparents to link this transaction causally
"""
self.id = self.get_dist_tracing_id()
if not trace_parent:
Expand Down Expand Up @@ -233,7 +241,10 @@ def __init__(
)
except (LookupError, AttributeError):
self._breakdown = None
super(Transaction, self).__init__(start=start)
super().__init__(start=start)
if links:
for trace_parent in links:
self.add_link(trace_parent)

def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None):
super().end(skip_frames, duration)
Expand Down Expand Up @@ -271,6 +282,7 @@ def _begin_span(
sync=None,
start=None,
auto_activate=True,
links: Optional[Sequence[TraceParent]] = None,
):
parent_span = execution_context.get_span()
tracer = self.tracer
Expand All @@ -293,6 +305,7 @@ def _begin_span(
span_action=span_action,
sync=sync,
start=start,
links=links,
)
span.frames = tracer.frames_collector_func()
self._span_counter += 1
Expand All @@ -312,6 +325,7 @@ def begin_span(
sync=None,
start=None,
auto_activate=True,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Begin a new span
Expand All @@ -325,6 +339,7 @@ def begin_span(
:param sync: indicate if the span is synchronous or not. In most cases, `None` should be used
:param start: timestamp, mostly useful for testing
:param auto_activate: whether to set this span in execution_context
:param links: an optional list of traceparents to link this span with
:return: the Span object
"""
return self._begin_span(
Expand All @@ -339,6 +354,7 @@ def begin_span(
sync=sync,
start=start,
auto_activate=auto_activate,
links=links,
)

def end_span(self, skip_frames: int = 0, duration: Optional[float] = None, outcome: str = "unknown"):
Expand Down Expand Up @@ -503,6 +519,7 @@ def __init__(
span_action: Optional[str] = None,
sync: Optional[bool] = None,
start: Optional[int] = None,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Create a new Span
Expand Down Expand Up @@ -538,7 +555,7 @@ def __init__(
self.dist_tracing_propagated = False
self.composite: Dict[str, Any] = {}
self._cancelled: bool = False
super(Span, self).__init__(labels=labels, start=start)
super().__init__(labels=labels, start=start, links=links)
self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time)
if self.transaction._breakdown:
p = self.parent if self.parent else self.transaction
Expand Down Expand Up @@ -870,14 +887,22 @@ def span_stack_trace_min_duration(self) -> timedelta:
else:
return self.config.span_frames_min_duration

def begin_transaction(self, transaction_type, trace_parent=None, start=None, auto_activate=True):
def begin_transaction(
self,
transaction_type: str,
trace_parent: Optional[TraceParent] = None,
start: Optional[float] = None,
auto_activate: bool = True,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Start a new transactions and bind it in a thread-local variable

:param transaction_type: type of the transaction, e.g. "request"
:param trace_parent: an optional TraceParent object
:param start: override the start timestamp, mostly useful for testing
:param auto_activate: whether to set this transaction in execution_context
:param list of traceparents to causally link this transaction to

:returns the Transaction object
"""
Expand All @@ -900,6 +925,7 @@ def begin_transaction(self, transaction_type, trace_parent=None, start=None, aut
is_sampled=is_sampled,
start=start,
sample_rate=sample_rate,
links=links,
)
if trace_parent is None:
transaction.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, sample_rate)
Expand Down Expand Up @@ -949,6 +975,7 @@ class capture_span(object):
"duration",
"start",
"sync",
"links",
)

def __init__(
Expand All @@ -964,6 +991,7 @@ def __init__(
start: Optional[int] = None,
duration: Optional[Union[float, timedelta]] = None,
sync: Optional[bool] = None,
links: Optional[Sequence[TraceParent]] = None,
):
self.name = name
if span_subtype is None and "." in span_type:
Expand All @@ -985,6 +1013,7 @@ def __init__(
duration = timedelta(seconds=duration)
self.duration = duration
self.sync = sync
self.links = links

def __call__(self, func: Callable) -> Callable:
self.name = self.name or get_name_from_func(func)
Expand Down Expand Up @@ -1017,6 +1046,7 @@ def handle_enter(self, sync: bool) -> Optional[SpanType]:
span_action=self.action,
start=self.start,
sync=sync,
links=self.links,
)
return None

Expand Down
Loading