Skip to content
10 changes: 10 additions & 0 deletions sentry_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from sentry_sdk.envelope import Envelope
from sentry_sdk.profiler import has_profiling_enabled, setup_profiler
from sentry_sdk.scrubber import EventScrubber
from sentry_sdk.monitor import Monitor

from sentry_sdk._types import TYPE_CHECKING

Expand Down Expand Up @@ -210,6 +211,13 @@ def _capture_envelope(envelope):
_client_init_debug.set(self.options["debug"])
self.transport = make_transport(self.options)

self.monitor = None
if self.transport:
if self.options["_experiments"].get(
"enable_backpressure_handling", False
):
self.monitor = Monitor(self.transport)

self.session_flusher = SessionFlusher(capture_func=_capture_envelope)

request_bodies = ("always", "never", "small", "medium")
Expand Down Expand Up @@ -571,6 +579,8 @@ def close(
if self.transport is not None:
self.flush(timeout=timeout, callback=callback)
self.session_flusher.kill()
if self.monitor:
self.monitor.kill()
self.transport.kill()
self.transport = None

Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
# TODO: Remove these 2 profiling related experiments
"profiles_sample_rate": Optional[float],
"profiler_mode": Optional[ProfilerMode],
"enable_backpressure_handling": Optional[bool],
},
total=False,
)
Expand Down
105 changes: 105 additions & 0 deletions sentry_sdk/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import os
import time
from threading import Thread, Lock

import sentry_sdk
from sentry_sdk.utils import logger
from sentry_sdk._types import TYPE_CHECKING

if TYPE_CHECKING:
from typing import Optional


class Monitor(object):
"""
Performs health checks in a separate thread once every interval seconds
and updates the internal state. Other parts of the SDK only read this state
and act accordingly.
"""

name = "sentry.monitor"

def __init__(self, transport, interval=10):
# type: (sentry_sdk.transport.Transport, float) -> None
self.transport = transport # type: sentry_sdk.transport.Transport
self.interval = interval # type: float

self._healthy = True
self._downsample_factor = 1 # type: int

self._thread = None # type: Optional[Thread]
self._thread_lock = Lock()
self._thread_for_pid = None # type: Optional[int]
self._running = True

def _ensure_running(self):
# type: () -> None
if self._thread_for_pid == os.getpid() and self._thread is not None:
return None

with self._thread_lock:
if self._thread_for_pid == os.getpid() and self._thread is not None:
return None

def _thread():
# type: (...) -> None
while self._running:
time.sleep(self.interval)
if self._running:
self.run()

thread = Thread(name=self.name, target=_thread)
thread.daemon = True
thread.start()
self._thread = thread
self._thread_for_pid = os.getpid()

return None

def run(self):
# type: () -> None
self.check_health()
self.set_downsample_factor()

def set_downsample_factor(self):
# type: () -> None
if self._healthy:
if self._downsample_factor > 1:
logger.debug(
"[Monitor] health check positive, reverting to normal sampling"
)
self._downsample_factor = 1
else:
self._downsample_factor *= 2
logger.debug(
"[Monitor] health check negative, downsampling with a factor of %d",
self._downsample_factor,
)

def check_health(self):
# type: () -> None
"""
Perform the actual health checks,
currently only checks if the transport is rate-limited.
TODO: augment in the future with more checks.
"""
self._healthy = self.transport.is_healthy()

def is_healthy(self):
# type: () -> bool
self._ensure_running()
return self._healthy

@property
def downsample_factor(self):
# type: () -> int
self._ensure_running()
return self._downsample_factor

def kill(self):
# type: () -> None
self._running = False

def __del__(self):
# type: () -> None
self.kill()
18 changes: 12 additions & 6 deletions sentry_sdk/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,12 @@ def finish(self, hub=None, end_timestamp=None):
# exclusively based on sample rate but also traces sampler, but
# we handle this the same here.
if client.transport and has_tracing_enabled(client.options):
client.transport.record_lost_event(
"sample_rate", data_category="transaction"
)
if client.monitor and client.monitor.downsample_factor > 1:
reason = "backpressure"
else:
reason = "sample_rate"

client.transport.record_lost_event(reason, data_category="transaction")

return None

Expand Down Expand Up @@ -749,9 +752,12 @@ def _set_initial_sampling_decision(self, sampling_context):

self.sample_rate = float(sample_rate)

if client.monitor:
self.sample_rate /= client.monitor.downsample_factor

# if the function returned 0 (or false), or if `traces_sample_rate` is
# 0, it's a sign the transaction should be dropped
if not sample_rate:
if not self.sample_rate:
logger.debug(
"[Tracing] Discarding {transaction_description} because {reason}".format(
transaction_description=transaction_description,
Expand All @@ -768,7 +774,7 @@ def _set_initial_sampling_decision(self, sampling_context):
# Now we roll the dice. random.random is inclusive of 0, but not of 1,
# so strict < is safe here. In case sample_rate is a boolean, cast it
# to a float (True becomes 1.0 and False becomes 0.0)
self.sampled = random.random() < float(sample_rate)
self.sampled = random.random() < self.sample_rate

if self.sampled:
logger.debug(
Expand All @@ -780,7 +786,7 @@ def _set_initial_sampling_decision(self, sampling_context):
logger.debug(
"[Tracing] Discarding {transaction_description} because it's not included in the random sample (sampling rate = {sample_rate})".format(
transaction_description=transaction_description,
sample_rate=float(sample_rate),
sample_rate=self.sample_rate,
)
)

Expand Down
16 changes: 16 additions & 0 deletions sentry_sdk/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ def record_lost_event(
"""
return None

def is_healthy(self):
# type: () -> bool
return True

def __del__(self):
# type: () -> None
try:
Expand Down Expand Up @@ -311,6 +315,18 @@ def _disabled(bucket):

return _disabled(category) or _disabled(None)

def _is_rate_limited(self):
# type: () -> bool
return any(ts > datetime.utcnow() for ts in self._disabled_until.values())

def _is_worker_full(self):
# type: () -> bool
return self._worker.full()

def is_healthy(self):
# type: () -> bool
return not (self._is_worker_full() or self._is_rate_limited())

def _send_event(
self, event # type: Event
):
Expand Down
4 changes: 4 additions & 0 deletions sentry_sdk/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ def flush(self, timeout, callback=None):
self._wait_flush(timeout, callback)
logger.debug("background worker flushed")

def full(self):
# type: () -> bool
return self._queue.full()

def _wait_flush(self, timeout, callback):
# type: (float, Optional[Any]) -> None
initial_timeout = min(0.1, timeout)
Expand Down
87 changes: 87 additions & 0 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import random

from sentry_sdk import Hub, start_transaction
from sentry_sdk.transport import Transport


class HealthyTestTransport(Transport):
def _send_event(self, event):
pass

def _send_envelope(self, envelope):
pass

def is_healthy(self):
return True


class UnhealthyTestTransport(HealthyTestTransport):
def is_healthy(self):
return False


def test_no_monitor_if_disabled(sentry_init):
sentry_init(transport=HealthyTestTransport())
assert Hub.current.client.monitor is None


def test_monitor_if_enabled(sentry_init):
sentry_init(
transport=HealthyTestTransport(),
_experiments={"enable_backpressure_handling": True},
)

monitor = Hub.current.client.monitor
assert monitor is not None
assert monitor._thread is None

assert monitor.is_healthy() is True
assert monitor.downsample_factor == 1
assert monitor._thread is not None
assert monitor._thread.name == "sentry.monitor"


def test_monitor_unhealthy(sentry_init):
sentry_init(
transport=UnhealthyTestTransport(),
_experiments={"enable_backpressure_handling": True},
)

monitor = Hub.current.client.monitor
monitor.interval = 0.1

assert monitor.is_healthy() is True
monitor.run()
assert monitor.is_healthy() is False
assert monitor.downsample_factor == 2
monitor.run()
assert monitor.downsample_factor == 4


def test_transaction_uses_downsampled_rate(
sentry_init, capture_client_reports, monkeypatch
):
sentry_init(
traces_sample_rate=1.0,
transport=UnhealthyTestTransport(),
_experiments={"enable_backpressure_handling": True},
)

reports = capture_client_reports()

monitor = Hub.current.client.monitor
monitor.interval = 0.1

# make sure rng doesn't sample
monkeypatch.setattr(random, "random", lambda: 0.9)

assert monitor.is_healthy() is True
monitor.run()
assert monitor.is_healthy() is False
assert monitor.downsample_factor == 2

with start_transaction(name="foobar") as transaction:
assert transaction.sampled is False
assert transaction.sample_rate == 0.5

assert reports == [("backpressure", "transaction")]