Skip to content

Commit a022a6b

Browse files
Work in progress
1 parent ffc23bc commit a022a6b

File tree

5 files changed

+701
-1
lines changed

5 files changed

+701
-1
lines changed

docs/examples/metrics/reader/README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ These examples show how to customize the metrics that are output by the SDK usin
66
* preferred_aggregation.py: Shows how to configure the preferred aggregation for metric instrument types.
77
* preferred_temporality.py: Shows how to configure the preferred temporality for metric instrument types.
88
* preferred_exemplarfilter.py: Shows how to configure the exemplar filter.
9+
* sychronous_reader.py: Shows how to configure the reader to collect and export metrics synchronously.
910

1011
The source files of these examples are available :scm_web:`here <docs/examples/metrics/reader/>`.
1112

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#!/usr/bin/env python3
2+
# Copyright The OpenTelemetry Authors
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import logging
17+
import random
18+
import time
19+
20+
from opentelemetry import metrics
21+
from opentelemetry.sdk.metrics import MeterProvider
22+
from opentelemetry.sdk.metrics.export import (
23+
ConsoleMetricExporter,
24+
SynchronousExportingMetricReader,
25+
)
26+
from opentelemetry.sdk.resources import Resource
27+
28+
# Configure logging
29+
logging.basicConfig(level=logging.INFO)
30+
logger = logging.getLogger(__name__)
31+
# Create a resource that identifies our service
32+
resource = Resource.create({"service.name": "batch-metrics-auto-export-demo"})
33+
# Create a console exporter for visible output
34+
exporter = ConsoleMetricExporter()
35+
# Create a batch exporting reader with a small batch size for demonstration
36+
reader = SynchronousExportingMetricReader(
37+
exporter,
38+
max_export_batch_size=5, # Export after every 5 metrics
39+
max_queue_size=1000, # Queue up to 100 metrics
40+
)
41+
# Create a meter provider with our reader
42+
meter_provider = MeterProvider(resource=resource, metric_readers=[reader])
43+
# Set the global meter provider
44+
metrics.set_meter_provider(meter_provider)
45+
# Get a meter
46+
meter = metrics.get_meter("batch-demo-auto-export")
47+
# Create instruments
48+
request_counter = meter.create_counter(
49+
name="request_count",
50+
description="Number of requests processed",
51+
unit="1",
52+
)
53+
active_requests_gauge = meter.create_gauge(
54+
name="active_requests",
55+
description="Number of active requests",
56+
unit="1",
57+
)
58+
processing_time_histogram = meter.create_histogram(
59+
name="request_processing_time",
60+
description="Time taken to process requests",
61+
unit="ms",
62+
)
63+
64+
65+
def simulate_request():
66+
"""Simulate a request and record metrics."""
67+
# Simulate active requests fluctuating
68+
active_requests = random.randint(1, 20)
69+
active_requests_gauge.set(active_requests)
70+
# Simulate processing time
71+
processing_time = random.uniform(10, 500) # Between 10-500ms
72+
processing_time_histogram.record(processing_time)
73+
# Increment the request counter
74+
request_counter.add(1)
75+
logger.info("Processed a request")
76+
reader.collect()
77+
# Each request generates 3 metrics (counter, gauge, histogram)
78+
# With batch size of 5, we should see exports after every ~2 requests
79+
80+
81+
def main():
82+
"""Main application function to demonstrate reading and exporting metrics synchronously."""
83+
logger.info("Starting to process requests...")
84+
logger.info(
85+
"With batch size of 5, exports should happen automatically after every ~2 requests"
86+
)
87+
logger.info("Watch for the metric exports in the console...")
88+
# Process 25 requests with pauses to observe collection and export mechanism
89+
for i in range(1, 25):
90+
simulate_request()
91+
# Add a pause between requests to make it easier to observe the exports
92+
time.sleep(0.5)
93+
logger.info("\nAll requests completed.")
94+
# We still need to shut down cleanly
95+
logger.info("Shutting down...")
96+
meter_provider.shutdown()
97+
logger.info(
98+
"Done. Any metrics left in the queue were exported during shutdown."
99+
)
100+
101+
102+
if __name__ == "__main__":
103+
main()

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py

Lines changed: 197 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414
from __future__ import annotations
1515

16+
import collections
1617
import math
1718
import os
1819
import weakref
@@ -21,7 +22,7 @@
2122
from logging import getLogger
2223
from os import environ, linesep
2324
from sys import stdout
24-
from threading import Event, Lock, RLock, Thread
25+
from threading import Condition, Event, Lock, RLock, Thread
2526
from time import time_ns
2627
from typing import IO, Callable, Iterable, Optional
2728

@@ -574,3 +575,198 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:
574575
super().force_flush(timeout_millis=timeout_millis)
575576
self._exporter.force_flush(timeout_millis=timeout_millis)
576577
return True
578+
579+
580+
class SynchronousExportingMetricReader(MetricReader):
581+
"""Implementation of `MetricReader` that exports metrics in batches.
582+
583+
Metrics are collected when `collect()` is called, then added to a queue until
584+
the configured batch size is reached or until `force_flush` is called. The
585+
metrics are then exported in batches to the configured exporter.
586+
587+
Unlike the `PeriodicExportingMetricReader`, this reader doesn't automatically
588+
collect metrics on a schedule - collection must be triggered by calling
589+
`collect()` or systems built on top of this reader.
590+
591+
`SynchronousExportingMetricReader` is configurable with the following parameters:
592+
593+
- `max_queue_size`: Maximum number of metric batches to store in memory
594+
- `max_export_batch_size`: Maximum number of metric batches to export at once
595+
- `export_timeout_millis`: Timeout for export operations in milliseconds
596+
"""
597+
598+
def __init__(
599+
self,
600+
exporter: MetricExporter,
601+
max_export_batch_size: int = 512,
602+
export_timeout_millis: float = 30000,
603+
max_queue_size: int = 2048,
604+
) -> None:
605+
# BatchExportingMetricReader defers to exporter for configuration
606+
super().__init__(
607+
preferred_temporality=exporter._preferred_temporality,
608+
preferred_aggregation=exporter._preferred_aggregation,
609+
)
610+
611+
self._validate_arguments(max_queue_size, max_export_batch_size)
612+
613+
self._exporter = exporter
614+
self._max_queue_size = max_queue_size
615+
self._max_export_batch_size = max_export_batch_size
616+
self._export_timeout_millis = export_timeout_millis
617+
618+
# This lock is held whenever calling self._exporter.export() to prevent concurrent
619+
# execution of MetricExporter.export()
620+
self._export_lock = Lock()
621+
622+
# Queue to store metrics
623+
self._queue = collections.deque([], max_queue_size)
624+
625+
# Thread handling and synchronization
626+
self._condition = Condition(Lock())
627+
self._shutdown = False
628+
self._flush_event = Event()
629+
self._shutdown_once = Once()
630+
631+
# Process fork handling
632+
if hasattr(os, "register_at_fork"):
633+
weak_reinit = weakref.WeakMethod(self._at_fork_reinit)
634+
os.register_at_fork(
635+
after_in_child=lambda: weak_reinit()() # pylint: disable=unnecessary-lambda
636+
)
637+
self._pid = os.getpid()
638+
639+
def _at_fork_reinit(self):
640+
"""Reinitialize the reader after fork."""
641+
self._condition = Condition(Lock())
642+
self._queue.clear()
643+
self._pid = os.getpid()
644+
645+
def _export_batch(self) -> int:
646+
"""Exports at most max_export_batch_size metrics and returns the number of exported metrics."""
647+
idx = 0
648+
pending_metrics = []
649+
650+
with self._condition:
651+
while idx < self._max_export_batch_size and self._queue:
652+
metrics_data = self._queue.pop()
653+
pending_metrics.append(metrics_data)
654+
idx += 1
655+
656+
if pending_metrics:
657+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
658+
try:
659+
with self._export_lock:
660+
for metrics_data in pending_metrics:
661+
self._exporter.export(
662+
metrics_data,
663+
timeout_millis=self._export_timeout_millis,
664+
)
665+
except Exception: # pylint: disable=broad-exception-caught
666+
_logger.exception("Exception while exporting metrics.")
667+
finally:
668+
detach(token)
669+
670+
return idx
671+
672+
def _drain_queue(self):
673+
"""Export all elements until queue is empty."""
674+
while self._queue:
675+
self._export_batch()
676+
677+
def _receive_metrics(
678+
self,
679+
metrics_data: MetricsData,
680+
timeout_millis: float = 10_000,
681+
**kwargs,
682+
) -> None:
683+
"""Add metrics to the queue for batched export."""
684+
if self._shutdown:
685+
return
686+
687+
# Handle fork
688+
if self._pid != os.getpid():
689+
self._at_fork_reinit()
690+
691+
# Add metrics to queue
692+
with self._condition:
693+
self._queue.appendleft(metrics_data)
694+
if len(self._queue) >= self._max_export_batch_size:
695+
self._condition.notify()
696+
self._flush_event.set()
697+
698+
# If queue has reached batch size, export immediately
699+
if len(self._queue) >= self._max_export_batch_size:
700+
self._export_batch()
701+
702+
def force_flush(self, timeout_millis: float = 10_000) -> bool:
703+
"""Forces flush of metrics to the exporter
704+
705+
Args:
706+
timeout_millis: The maximum amount of time to wait for the flush
707+
to complete, in milliseconds.
708+
709+
Returns:
710+
True if the flush was successful, False otherwise.
711+
"""
712+
if timeout_millis is None:
713+
timeout_millis = self._export_timeout_millis
714+
715+
if self._shutdown:
716+
return True
717+
718+
# Collect any pending metrics first (this will trigger _receive_metrics)
719+
super().force_flush(timeout_millis=timeout_millis)
720+
721+
# Export all batches in queue
722+
try:
723+
with self._condition:
724+
if not self._queue:
725+
return True
726+
727+
self._drain_queue()
728+
return self._exporter.force_flush(timeout_millis=timeout_millis)
729+
except Exception: # pylint: disable=broad-exception-caught
730+
_logger.exception("Exception during force_flush")
731+
return False
732+
733+
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
734+
"""Shuts down the metric reader and exporter.
735+
736+
Args:
737+
timeout_millis: The maximum amount of time to wait for the exporter
738+
to shutdown, in milliseconds.
739+
"""
740+
deadline_ns = time_ns() + timeout_millis * 10**6
741+
742+
def _shutdown():
743+
self._shutdown = True
744+
745+
did_set = self._shutdown_once.do_once(_shutdown)
746+
if not did_set:
747+
_logger.warning("Can't shutdown multiple times")
748+
return
749+
750+
# Export any metrics still in the queue
751+
self._drain_queue()
752+
753+
# Shutdown the exporter
754+
self._exporter.shutdown(
755+
timeout=(deadline_ns - time_ns()) / 10**6, **kwargs
756+
)
757+
758+
@staticmethod
759+
def _validate_arguments(max_queue_size, max_export_batch_size):
760+
"""Validate constructor arguments."""
761+
if max_queue_size <= 0:
762+
raise ValueError("max_queue_size must be a positive integer.")
763+
764+
if max_export_batch_size <= 0:
765+
raise ValueError(
766+
"max_export_batch_size must be a positive integer."
767+
)
768+
769+
if max_export_batch_size > max_queue_size:
770+
raise ValueError(
771+
"max_export_batch_size must be less than or equal to max_queue_size."
772+
)

opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
MetricExportResult,
2222
MetricReader,
2323
PeriodicExportingMetricReader,
24+
SynchronousExportingMetricReader,
2425
)
2526

2627
# The point module is not in the export directory to avoid a circular import.
@@ -63,4 +64,5 @@
6364
"ResourceMetrics",
6465
"ScopeMetrics",
6566
"Sum",
67+
"SynchronousExportingMetricReader",
6668
]

0 commit comments

Comments
 (0)