Skip to content

Commit 4943f76

Browse files
authored
Ensure that metrics are flushed before shutting down (#1139)
* Ensure that metrics are flushed before shutting down To ensure that the flushed metrics are actually sent, we need to make sure that the transport thread is shut down last. * use a variable name that doesn't clash with the thread priority concept
1 parent 5e2ebd3 commit 4943f76

File tree

5 files changed

+30
-5
lines changed

5 files changed

+30
-5
lines changed

elasticapm/base.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,11 +213,13 @@ def __init__(self, config=None, **inline):
213213
set_client(self)
214214

215215
def start_threads(self):
216-
with self._thread_starter_lock:
217-
current_pid = os.getpid()
218-
if self._pid != current_pid:
216+
current_pid = os.getpid()
217+
if self._pid != current_pid:
218+
with self._thread_starter_lock:
219219
self.logger.debug("Detected PID change from %r to %r, starting threads", self._pid, current_pid)
220-
for manager_type, manager in self._thread_managers.items():
220+
for manager_type, manager in sorted(
221+
self._thread_managers.items(), key=lambda item: item[1].start_stop_order
222+
):
221223
self.logger.debug("Starting %s thread", manager_type)
222224
manager.start_thread(pid=current_pid)
223225
self._pid = current_pid
@@ -303,7 +305,7 @@ def end_transaction(self, name=None, result="", duration=None):
303305
def close(self):
304306
if self.config.enabled:
305307
with self._thread_starter_lock:
306-
for _, manager in self._thread_managers.items():
308+
for _, manager in sorted(self._thread_managers.items(), key=lambda item: item[1].start_stop_order):
307309
manager.stop_thread()
308310
global CLIENT_SINGLETON
309311
CLIENT_SINGLETON = None

elasticapm/metrics/base_metrics.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ def stop_thread(self):
103103
logger.debug("Cancelling collect timer")
104104
self._collect_timer.cancel()
105105
self._collect_timer = None
106+
# collect one last time
107+
self.collect()
106108

107109
@property
108110
def collect_interval(self):

elasticapm/transport/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import gzip
3434
import os
3535
import random
36+
import sys
3637
import threading
3738
import time
3839
import timeit
@@ -86,6 +87,7 @@ def __init__(
8687
self._closed = False
8788
self._processors = processors if processors is not None else []
8889
super(Transport, self).__init__()
90+
self.start_stop_order = sys.maxsize # ensure that the transport thread is always started/stopped last
8991

9092
@property
9193
def _max_flush_time(self):

elasticapm/utils/threading.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def cancel(self):
9191
class ThreadManager(object):
9292
def __init__(self):
9393
self.pid = None
94+
self.start_stop_order = 100
9495

9596
def start_thread(self, pid=None):
9697
if not pid:

tests/metrics/base_tests.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,24 @@ def target():
125125
assert metricset.counter("x").val == expected
126126

127127

128+
@pytest.mark.parametrize("sending_elasticapm_client", [{"metrics_interval": "30s"}], indirect=True)
129+
def test_metrics_flushed_on_shutdown(sending_elasticapm_client):
130+
# this is ugly, we need an API for this at some point...
131+
metricset = MetricsSet(sending_elasticapm_client._metrics)
132+
sending_elasticapm_client._metrics._metricsets["foo"] = metricset
133+
metricset.counter("x").inc()
134+
sending_elasticapm_client.close()
135+
assert sending_elasticapm_client.httpserver.payloads
136+
for item in sending_elasticapm_client.httpserver.payloads[0]:
137+
try:
138+
assert item["metricset"]["samples"]["x"]["value"] == 1
139+
break
140+
except KeyError:
141+
pass
142+
else:
143+
assert False, "no item found with matching dict path metricset.samples.x.value"
144+
145+
128146
@mock.patch("elasticapm.metrics.base_metrics.DISTINCT_LABEL_LIMIT", 3)
129147
def test_metric_limit(caplog, elasticapm_client):
130148
m = MetricsSet(MetricsRegistry(elasticapm_client))

0 commit comments

Comments
 (0)