- Notifications
You must be signed in to change notification settings - Fork 833
Description
What problem do you want to solve?
Adding SQL Alchemy engines one at a time using this pattern - does not seem to add the second engine to instrumentation (ie, calling SQLAlchemyInstrumentor().instrument(engine=engine_1, ...) multiple times)
instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlalchemy.py
def test_instrument_two_engines(self): engine_1 = create_engine("sqlite:///:memory:") engine_2 = create_engine("sqlite:///:memory:") SQLAlchemyInstrumentor().instrument( engine=engine_1, tracer_provider=self.tracer_provider, ) cnx_1 = engine_1.connect() cnx_1.execute("SELECT 1 + 1;").fetchall() SQLAlchemyInstrumentor().instrument( engine=engine_2, tracer_provider=self.tracer_provider, ) cnx_2 = engine_2.connect() cnx_2.execute("SELECT 1 + 1;").fetchall() spans = self.memory_exporter.get_finished_spans() self.assertEqual(len(spans), 2) Describe the solution you'd like
I'm lazy initializing multiple SQL Alchemy engines in my app to multiple DBs - I'd like to be able to directly call SQLAlchemyInstrumentor().instrument(... with different engines multiple times and have it work.
Describe alternatives you've considered
I have a place holder/naive/not-code reviewed instrumentation CLAUDE CODE wrapper for now that seems to allow multiple engines:
"""SQLAlchemy OpenTelemetry instrumentation for multiple async engines.""" import logging import weakref from typing import Any, ClassVar import structlog from opentelemetry import trace from opentelemetry.instrumentation.sqlalchemy import ( # type: ignore[import-untyped] SQLAlchemyInstrumentor, ) from opentelemetry.instrumentation.sqlalchemy.engine import ( # type: ignore[import-untyped] EngineTracer, ) from opentelemetry.semconv._incubating.attributes import ( # type: ignore[import-untyped] db_attributes, ) from sqlalchemy import Engine from sqlalchemy.ext.asyncio import AsyncEngine from plu.pcs.apps.services.web.backend.observability.logger import get_logger # Stdlib logger for library-level class (consistent with BaseInstrumentor) _LOG = logging.getLogger(__name__) # Attribute name for db.name in logs (matches OpenTelemetry semantic conventions) DB_NAME_LOG_ATTR = "db.name" class DatabaseNameEngineTracer(EngineTracer): # type: ignore[misc] """EngineTracer that adds database name to all spans. The base EngineTracer extracts db.name from URL.database, but ODBC connection strings (mssql+aioodbc:///?odbc_connect=...) don't expose the database in the URL object. This subclass stores the database name and adds it to all spans. Also handles connections_usage=None by overriding pool event methods to be no-ops. """ def __init__( self, tracer: Any, engine: Any, connections_usage: Any, database_name: str, enable_commenter: bool = False, commenter_options: dict[str, Any] | None = None, enable_attribute_commenter: bool = False, ) -> None: """Initialize tracer with database name. Args: tracer: OpenTelemetry tracer instance. engine: SQLAlchemy engine to trace. connections_usage: Connections usage metric (can be None to skip metrics). database_name: Database name to add to spans. enable_commenter: Enable SQL commenter. commenter_options: Commenter options. enable_attribute_commenter: Include commenter in span attributes. """ super().__init__( # pyright: ignore[reportUnknownMemberType] tracer, engine, connections_usage, enable_commenter=enable_commenter, commenter_options=commenter_options, enable_attribute_commenter=enable_attribute_commenter, ) self._database_name = database_name def _add_idle_to_connection_usage(self, value: int) -> None: """Override to handle None connections_usage.""" if self.connections_usage is not None: super()._add_idle_to_connection_usage(value) # pyright: ignore[reportUnknownMemberType] def _add_used_to_connection_usage(self, value: int) -> None: """Override to handle None connections_usage.""" if self.connections_usage is not None: super()._add_used_to_connection_usage(value) # pyright: ignore[reportUnknownMemberType] def _before_cur_exec( self, conn: Any, cursor: Any, statement: Any, params: Any, context: Any, executemany: Any, ) -> Any: """Hook before cursor execution - creates span with db.name attribute. Calls base implementation then adds db.name to the span if set. """ result: tuple[Any, Any] = super()._before_cur_exec( # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] conn, cursor, statement, params, context, executemany ) # Add database name to span if we have one and span exists if self._database_name and hasattr(context, "_otel_span"): span = context._otel_span if span and span.is_recording(): span.set_attribute(db_attributes.DB_NAME, self._database_name) # Bind database name to structlog context for logs during query execution if self._database_name: structlog.contextvars.bind_contextvars(**{DB_NAME_LOG_ATTR: self._database_name}) return result def _after_cur_exec( self, conn: Any, cursor: Any, statement: Any, params: Any, context: Any, executemany: Any, ) -> None: """Hook after cursor execution - cleans up context. Calls base implementation then unbinds db.name from structlog context. """ super()._after_cur_exec( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue] conn, cursor, statement, params, context, executemany ) # Unbind database name from structlog context if self._database_name: structlog.contextvars.unbind_contextvars(DB_NAME_LOG_ATTR) class PluSQLAlchemyInstrumentor(SQLAlchemyInstrumentor): """SQLAlchemy instrumentor with PLU-specific enhancements. Enhancements over base SQLAlchemyInstrumentor: - Supports instrumenting multiple engines (base uses singleton pattern) - Adds db.name to all spans/logs (ODBC connection strings don't expose it) """ _instrumented_engines: ClassVar[weakref.WeakSet[Engine]] = weakref.WeakSet() @classmethod def reset_instrumented_engines(cls) -> None: """Clear tracked engines. Useful for testing.""" cls._instrumented_engines = weakref.WeakSet() def instrument( self, *, database_name: str = "", engine: Engine | None = None, **kwargs: Any, ) -> None: """Instrument an engine, allowing multiple engines. Tracks engines via WeakSet to warn on duplicates. When engines are garbage collected, they are automatically removed from tracking. Args: database_name: Name of database for spans/logging. engine: SQLAlchemy engine to instrument. **kwargs: Passed to base instrument(). """ if engine is not None: if engine in self._instrumented_engines: _LOG.warning( "Engine for %s is already instrumented, skipping", database_name or engine, ) return self._instrumented_engines.add(engine) # Reset flag so base.instrument() doesn't hit the guard self._is_instrumented_by_opentelemetry = False # If we have a database_name, use our custom tracer directly if database_name and engine is not None: self._instrument_with_database_name(engine, database_name, **kwargs) else: # Fallback to base for full logic super().instrument(engine=engine, **kwargs) def _instrument_with_database_name( self, engine: Engine, database_name: str, **kwargs: Any, ) -> None: """Instrument engine with custom tracer that includes database name. Args: engine: SQLAlchemy engine to instrument. database_name: Database name to add to all spans. **kwargs: Additional options (tracer_provider, commenter settings). """ tracer_provider = kwargs.get("tracer_provider") tracer = trace.get_tracer( __name__, tracer_provider=tracer_provider, ) # Create our custom tracer with database name DatabaseNameEngineTracer( tracer, engine, connections_usage=None, # Skip metrics for now database_name=database_name, enable_commenter=kwargs.get("enable_commenter", False), commenter_options=kwargs.get("commenter_options"), enable_attribute_commenter=kwargs.get("enable_attribute_commenter", False), ) def instrument_async_engine(engine: AsyncEngine, *, database_name: str = "") -> None: """Instrument an async SQLAlchemy engine for OpenTelemetry tracing. Args: engine: Async SQLAlchemy engine to instrument. database_name: Optional name for logging. """ logger = get_logger(__name__) logger.info( "db.engine.instrumenting", database=database_name, message=f"Instrumenting SQLAlchemy engine for {database_name or 'unknown'}", ) # Use sync_engine for async engines (required for event listener registration) PluSQLAlchemyInstrumentor().instrument( engine=engine.sync_engine, database_name=database_name, ) logger.info( "db.engine.instrumented", database=database_name, message=f"SQLAlchemy engine instrumented for {database_name or 'unknown'}", ) Additional Context
I'm probably using the instrumentation for SQL Alchemy wrong and missing something obvious - feel free to yell at me that I should have read the docs/source code more closely. I have a lot of dev experience, but not much with python/otel...
Would you like to implement a fix?
Yes
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.