Skip to content

Commit d9c72f1

Browse files
committed
Merge remote-tracking branch 'upstream/main' into span_compression
2 parents db94fa0 + cb13641 commit d9c72f1

File tree

16 files changed

+135
-103
lines changed

16 files changed

+135
-103
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ endif::[]
4747
* Lambda fixes to align with the cross-agent spec {pull}1489[#1489]
4848
* Lambda fix for custom `service_name` {pull}1493[#1493]
4949
* Change default for `stack_trace_limit` from 500 to 50 {pull}1492[#1492]
50+
* Switch all duration handling to use `datetime.timedelta` objects {pull}1488[#1488]
5051
5152
5253
[[release-notes-6.x]]

elasticapm/base.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import urllib.parse
4444
import warnings
4545
from copy import deepcopy
46+
from datetime import timedelta
4647
from typing import Optional, Tuple
4748

4849
import elasticapm
@@ -307,6 +308,8 @@ def end_transaction(self, name=None, result="", duration=None):
307308
:param duration: override duration, mostly useful for testing
308309
:return: the ended transaction object
309310
"""
311+
if duration is not None and not isinstance(duration, timedelta):
312+
duration = timedelta(seconds=duration)
310313
transaction = self.tracer.end_transaction(result, name, duration=duration)
311314
return transaction
312315

elasticapm/conf/__init__.py

Lines changed: 47 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import re
3737
import socket
3838
import threading
39+
from datetime import timedelta
3940

4041
from elasticapm.conf.constants import BASE_SANITIZE_FIELD_NAMES
4142
from elasticapm.utils import compat, starmatch_to_regex
@@ -232,6 +233,39 @@ def __set__(self, instance, value):
232233
instance._values[self.dict_key] = bool(value)
233234

234235

236+
class _DurationConfigValue(_ConfigValue):
237+
units = (
238+
("us", 0.000001),
239+
("ms", 0.001),
240+
("s", 1),
241+
("m", 60),
242+
)
243+
244+
def __init__(self, dict_key, allow_microseconds=False, unitless_factor=None, **kwargs):
245+
self.type = None # no type coercion
246+
used_units = self.units if allow_microseconds else self.units[1:]
247+
pattern = "|".join(unit[0] for unit in used_units)
248+
unit_multipliers = dict(used_units)
249+
unit_required = ""
250+
if unitless_factor:
251+
unit_multipliers[None] = unitless_factor
252+
unit_required = "?"
253+
regex = f"((?:-)?\\d+)({pattern}){unit_required}$"
254+
verbose_pattern = f"\\d+({pattern}){unit_required}$"
255+
duration_validator = UnitValidator(
256+
regex=regex, verbose_pattern=verbose_pattern, unit_multipliers=unit_multipliers
257+
)
258+
validators = kwargs.pop("validators", [])
259+
validators.insert(0, duration_validator)
260+
super().__init__(dict_key, validators=validators, **kwargs)
261+
262+
def __set__(self, config_instance, value):
263+
value = self._validate(config_instance, value)
264+
value = timedelta(seconds=float(value))
265+
self._callback_if_changed(config_instance, value)
266+
config_instance._values[self.dict_key] = value
267+
268+
235269
class RegexValidator(object):
236270
def __init__(self, regex, verbose_pattern=None):
237271
self.regex = regex
@@ -283,16 +317,13 @@ def __call__(self, value, field_name):
283317
value = float(value)
284318
except ValueError:
285319
raise ConfigurationError("{} is not a float".format(value), field_name)
286-
multiplier = 10 ** self.precision
320+
multiplier = 10**self.precision
287321
rounded = math.floor(value * multiplier + 0.5) / multiplier
288322
if rounded == 0 and self.minimum and value != 0:
289323
rounded = self.minimum
290324
return rounded
291325

292326

293-
duration_validator = UnitValidator(
294-
r"^((?:-)?\d+)(us|ms|s|m)$", r"\d+(us|ms|s|m)", {"us": 0.001, "ms": 1, "s": 1000, "m": 60000}
295-
)
296327
size_validator = UnitValidator(
297328
r"^(\d+)(b|kb|mb|gb)$", r"\d+(b|KB|MB|GB)", {"b": 1, "kb": 1024, "mb": 1024 * 1024, "gb": 1024 * 1024 * 1024}
298329
)
@@ -557,50 +588,38 @@ class Config(_ConfigBase):
557588
"elasticapm.metrics.sets.cpu.CPUMetricSet",
558589
],
559590
)
560-
metrics_interval = _ConfigValue(
591+
metrics_interval = _DurationConfigValue(
561592
"METRICS_INTERVAL",
562-
type=int,
563-
validators=[duration_validator, ExcludeRangeValidator(1, 999, "{range_start} - {range_end} ms")],
564-
default=30000,
593+
validators=[ExcludeRangeValidator(0.001, 0.999, "{range_start} - {range_end} s")],
594+
default=timedelta(seconds=30),
565595
)
566596
breakdown_metrics = _BoolConfigValue("BREAKDOWN_METRICS", default=True)
567597
prometheus_metrics = _BoolConfigValue("PROMETHEUS_METRICS", default=False)
568598
prometheus_metrics_prefix = _ConfigValue("PROMETHEUS_METRICS_PREFIX", default="prometheus.metrics.")
569599
disable_metrics = _ListConfigValue("DISABLE_METRICS", type=starmatch_to_regex, default=[])
570600
central_config = _BoolConfigValue("CENTRAL_CONFIG", default=True)
571601
api_request_size = _ConfigValue("API_REQUEST_SIZE", type=int, validators=[size_validator], default=768 * 1024)
572-
api_request_time = _ConfigValue("API_REQUEST_TIME", type=int, validators=[duration_validator], default=10 * 1000)
602+
api_request_time = _DurationConfigValue("API_REQUEST_TIME", default=timedelta(seconds=10))
573603
transaction_sample_rate = _ConfigValue(
574604
"TRANSACTION_SAMPLE_RATE", type=float, validators=[PrecisionValidator(4, 0.0001)], default=1.0
575605
)
576606
transaction_max_spans = _ConfigValue("TRANSACTION_MAX_SPANS", type=int, default=500)
577607
stack_trace_limit = _ConfigValue("STACK_TRACE_LIMIT", type=int, default=50)
578-
span_frames_min_duration = _ConfigValue(
579-
"SPAN_FRAMES_MIN_DURATION",
580-
default=5,
581-
validators=[
582-
UnitValidator(r"^((?:-)?\d+)(ms|s|m)?$", r"\d+(ms|s|m)", {"ms": 1, "s": 1000, "m": 60000, None: 1})
583-
],
584-
type=int,
608+
span_frames_min_duration = _DurationConfigValue(
609+
"SPAN_FRAMES_MIN_DURATION", default=timedelta(seconds=0.005), unitless_factor=0.001
585610
)
586611
span_compression_enabled = _BoolConfigValue("SPAN_COMPRESSION_ENABLED", default=True)
587-
span_compression_exact_match_max_duration = _ConfigValue(
612+
span_compression_exact_match_max_duration = _DurationConfigValue(
588613
"SPAN_COMPRESSION_EXACT_MATCH_MAX_DURATION",
589-
default=50,
590-
validators=[duration_validator],
591-
type=int,
614+
default=timedelta(seconds=0.05),
592615
)
593-
span_compression_same_kind_max_duration = _ConfigValue(
616+
span_compression_same_kind_max_duration = _DurationConfigValue(
594617
"SPAN_COMPRESSION_SAME_KIND_MAX_DURATION",
595-
default=0,
596-
validators=[duration_validator],
597-
type=int,
618+
default=timedelta(seconds=0),
598619
)
599-
exit_span_min_duration = _ConfigValue(
620+
exit_span_min_duration = _DurationConfigValue(
600621
"EXIT_SPAN_MIN_DURATION",
601-
default=0,
602-
validators=[duration_validator],
603-
type=float,
622+
default=timedelta(seconds=0),
604623
)
605624
collect_local_variables = _ConfigValue("COLLECT_LOCAL_VARIABLES", default="errors")
606625
source_lines_error_app_frames = _ConfigValue("SOURCE_LINES_ERROR_APP_FRAMES", type=int, default=5)

elasticapm/metrics/base_metrics.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def stop_thread(self):
107107

108108
@property
109109
def collect_interval(self):
110-
return self.client.config.metrics_interval / 1000.0
110+
return self.client.config.metrics_interval.total_seconds()
111111

112112
@property
113113
def ignore_patterns(self):
@@ -391,8 +391,8 @@ class Timer(BaseMetric):
391391
__slots__ = BaseMetric.__slots__ + ("_val", "_count", "_lock", "_unit")
392392

393393
def __init__(self, name=None, reset_on_collect=False, unit=None):
394-
self._val = 0
395-
self._count = 0
394+
self._val: float = 0
395+
self._count: int = 0
396396
self._unit = unit
397397
self._lock = threading.Lock()
398398
super(Timer, self).__init__(name, reset_on_collect=reset_on_collect)

elasticapm/traces.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import time
3636
import timeit
3737
from collections import defaultdict
38+
from datetime import timedelta
3839
from types import TracebackType
3940
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union
4041

@@ -68,7 +69,7 @@ def __init__(self, obj: "BaseSpan"):
6869
self.obj = obj
6970
self._nesting_level: int = 0
7071
self._start: float = 0
71-
self._duration: float = 0
72+
self._duration: timedelta = timedelta(seconds=0)
7273
self._lock = threading.Lock()
7374

7475
def start(self, timestamp: float):
@@ -81,10 +82,10 @@ def stop(self, timestamp: float):
8182
with self._lock:
8283
self._nesting_level -= 1
8384
if self._nesting_level == 0:
84-
self._duration += timestamp - self._start
85+
self._duration += timedelta(seconds=timestamp - self._start)
8586

8687
@property
87-
def duration(self) -> float:
88+
def duration(self) -> timedelta:
8889
return self._duration
8990

9091

@@ -97,7 +98,7 @@ def __init__(self, labels=None, start=None):
9798
self.compression_buffer_lock = threading.Lock()
9899
self.start_time: float = time_to_perf_counter(start) if start is not None else _time_func()
99100
self.ended_time: Optional[float] = None
100-
self.duration: Optional[float] = None
101+
self.duration: Optional[timedelta] = None
101102
if labels:
102103
self.label(**labels)
103104

@@ -117,9 +118,9 @@ def child_ended(self, child: SpanType):
117118
self.compression_buffer.report()
118119
self.compression_buffer = child
119120

120-
def end(self, skip_frames: int = 0, duration: Optional[float] = None):
121+
def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None):
121122
self.ended_time = _time_func()
122-
self.duration = duration if duration is not None else (self.ended_time - self.start_time)
123+
self.duration = duration if duration is not None else timedelta(seconds=self.ended_time - self.start_time)
123124
if self.compression_buffer:
124125
self.compression_buffer.report()
125126
self.compression_buffer = None
@@ -226,7 +227,7 @@ def __init__(
226227
self._breakdown = None
227228
super(Transaction, self).__init__(start=start)
228229

229-
def end(self, skip_frames: int = 0, duration: Optional[float] = None):
230+
def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None):
230231
super().end(skip_frames, duration)
231232
if self._breakdown:
232233
for (span_type, span_subtype), timer in self._span_timers.items():
@@ -239,15 +240,15 @@ def end(self, skip_frames: int = 0, duration: Optional[float] = None):
239240
labels["span.subtype"] = span_subtype
240241
val = timer.val
241242
self._breakdown.timer("span.self_time", reset_on_collect=True, unit="us", **labels).update(
242-
int(val[0] * 1000000), val[1]
243+
val[0], val[1]
243244
)
244245
if self.is_sampled:
245246
self._breakdown.timer(
246247
"span.self_time",
247248
reset_on_collect=True,
248249
unit="us",
249250
**{"span.type": "app", "transaction.name": self.name, "transaction.type": self.transaction_type},
250-
).update(int((self.duration - self._child_durations.duration) * 1000000))
251+
).update((self.duration - self._child_durations.duration).total_seconds() * 1_000_000)
251252

252253
def _begin_span(
253254
self,
@@ -369,9 +370,9 @@ def to_dict(self) -> dict:
369370
"trace_id": self.trace_parent.trace_id,
370371
"name": encoding.keyword_field(self.name or ""),
371372
"type": encoding.keyword_field(self.transaction_type),
372-
"duration": self.duration * 1000, # milliseconds
373+
"duration": self.duration.total_seconds() * 1000,
373374
"result": encoding.keyword_field(str(self.result)),
374-
"timestamp": int(self.timestamp * 1000000), # microseconds
375+
"timestamp": int(self.timestamp * 1_000_000), # microseconds
375376
"outcome": self.outcome,
376377
"sampled": self.is_sampled,
377378
"span_count": {"started": self._span_counter, "dropped": self.dropped_spans},
@@ -381,7 +382,7 @@ def to_dict(self) -> dict:
381382
{
382383
"destination_service_resource": resource,
383384
"outcome": outcome,
384-
"duration": {"count": v["count"], "sum": {"us": int(v["duration.sum.us"] * 1000000)}},
385+
"duration": {"count": v["count"], "sum": {"us": int(v["duration.sum.us"])}},
385386
}
386387
for (resource, outcome), v in self._dropped_span_statistics.items()
387388
]
@@ -419,7 +420,7 @@ def track_span_duration(self, span_type, span_subtype, self_duration):
419420
# TODO: once asynchronous spans are supported, we should check if the transaction is already finished
420421
# TODO: and, if it has, exit without tracking.
421422
with self._span_timers_lock:
422-
self._span_timers[(span_type, span_subtype)].update(self_duration)
423+
self._span_timers[(span_type, span_subtype)].update(self_duration.total_seconds() * 1_000_000)
423424

424425
@property
425426
def is_sampled(self) -> bool:
@@ -448,7 +449,7 @@ def track_dropped_span(self, span: SpanType):
448449
resource = span.context["destination"]["service"]["resource"]
449450
stats = self._dropped_span_statistics[(resource, span.outcome)]
450451
stats["count"] += 1
451-
stats["duration.sum.us"] += span.duration
452+
stats["duration.sum.us"] += int(span.duration.total_seconds() * 1_000_000)
452453
except KeyError:
453454
pass
454455

@@ -551,7 +552,7 @@ def to_dict(self) -> dict:
551552
"subtype": encoding.keyword_field(self.subtype),
552553
"action": encoding.keyword_field(self.action),
553554
"timestamp": int(self.timestamp * 1000000), # microseconds
554-
"duration": self.duration * 1000, # milliseconds
555+
"duration": self.duration.total_seconds() * 1000,
555556
"outcome": self.outcome,
556557
}
557558
if self.transaction.sample_rate is not None:
@@ -586,7 +587,7 @@ def to_dict(self) -> dict:
586587
if self.composite:
587588
result["composite"] = {
588589
"compression_strategy": self.composite["compression_strategy"],
589-
"sum": self.composite["sum"] * 1000,
590+
"sum": self.composite["sum"].total_seconds() * 1000,
590591
"count": self.composite["count"],
591592
}
592593
return result
@@ -650,14 +651,14 @@ def end(self, skip_frames: int = 0, duration: Optional[float] = None):
650651

651652
p = self.parent if self.parent else self.transaction
652653
if self.transaction._breakdown:
653-
p._child_durations.stop(self.start_time + self.duration)
654+
p._child_durations.stop(self.start_time + self.duration.total_seconds())
654655
self.transaction.track_span_duration(
655656
self.type, self.subtype, self.duration - self._child_durations.duration
656657
)
657658
p.child_ended(self)
658659

659660
def report(self) -> None:
660-
if self.discardable and (self.duration * 1000) < self.transaction.config_exit_span_min_duration:
661+
if self.discardable and self.duration < self.transaction.config_exit_span_min_duration:
661662
self.transaction.track_dropped_span(self)
662663
self.transaction.dropped_spans += 1
663664
else:
@@ -674,7 +675,7 @@ def try_to_compress(self, sibling: SpanType) -> bool:
674675
self.composite = {"compression_strategy": compression_strategy, "count": 1, "sum": self.duration}
675676
self.composite["count"] += 1
676677
self.composite["sum"] += sibling.duration
677-
self.duration = sibling.ended_time - self.start_time
678+
self.duration = timedelta(seconds=sibling.ended_time - self.start_time)
678679
self.transaction._span_counter -= 1
679680
return True
680681

@@ -820,11 +821,11 @@ def __init__(self, frames_collector_func, frames_processing_func, queue_func, co
820821
self._ignore_patterns = [re.compile(p) for p in config.transactions_ignore_patterns or []]
821822

822823
@property
823-
def span_frames_min_duration(self):
824-
if self.config.span_frames_min_duration in (-1, None):
824+
def span_frames_min_duration(self) -> Optional[timedelta]:
825+
if self.config.span_frames_min_duration in (timedelta(seconds=-1), None):
825826
return None
826827
else:
827-
return self.config.span_frames_min_duration / 1000.0
828+
return self.config.span_frames_min_duration
828829

829830
def begin_transaction(self, transaction_type, trace_parent=None, start=None, auto_activate=True):
830831
"""
@@ -918,7 +919,7 @@ def __init__(
918919
span_subtype: Optional[str] = None,
919920
span_action: Optional[str] = None,
920921
start: Optional[int] = None,
921-
duration: Optional[float] = None,
922+
duration: Optional[Union[float, timedelta]] = None,
922923
sync: Optional[bool] = None,
923924
):
924925
self.name = name
@@ -937,6 +938,8 @@ def __init__(
937938
self.leaf = leaf
938939
self.labels = labels
939940
self.start = start
941+
if duration and not isinstance(duration, timedelta):
942+
duration = timedelta(seconds=duration)
940943
self.duration = duration
941944
self.sync = sync
942945

0 commit comments

Comments
 (0)