Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pymemcache~=1.3
# Required by conf
django>=2.2

# Require by opentelemetry-util-genai
fsspec>=2025.9.0

# Required by instrumentation and exporter packages
aio_pika~=7.2.0
aiohttp~=3.0
Expand Down
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
None,
),
"redis": ("https://redis.readthedocs.io/en/latest/", None),
"fsspec": ("https://filesystem-spec.readthedocs.io/en/latest/", None),
}

# http://www.sphinx-doc.org/en/master/config.html#confval-nitpicky
Expand Down
4 changes: 4 additions & 0 deletions docs/instrumentation-genai/util.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ OpenTelemetry Python - GenAI Util
:members:
:undoc-members:
:show-inheritance:

.. automodule:: opentelemetry.util.genai._fsspec_upload
:members:
:show-inheritance:
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -1060,7 +1060,7 @@ deps =
{[testenv]test_deps}
{toxinidir}/opentelemetry-instrumentation
{toxinidir}/util/opentelemetry-util-http
{toxinidir}/util/opentelemetry-util-genai
{toxinidir}/util/opentelemetry-util-genai[fsspec]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-vertexai[instruments]
{toxinidir}/instrumentation-genai/opentelemetry-instrumentation-google-genai[instruments]
{toxinidir}/instrumentation/opentelemetry-instrumentation-aiokafka[instruments]
Expand Down
8 changes: 5 additions & 3 deletions util/opentelemetry-util-genai/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ dependencies = [
"opentelemetry-api>=1.31.0",
]

[project.entry-points.opentelemetry_genai_upload_hook]
fsspec = "opentelemetry.util.genai._fsspec_upload:fsspec_upload_hook"

[project.optional-dependencies]
test = [
"pytest>=7.0.0",
]
test = ["pytest>=7.0.0"]
fsspec = ["fsspec>=2025.9.0"]

[project.urls]
Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/util/opentelemetry-util-genai"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from os import environ

from opentelemetry.util.genai.environment_variables import (
OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH,
)
from opentelemetry.util.genai.upload_hook import UploadHook, _NoOpUploadHook


def fsspec_upload_hook() -> UploadHook:
# If fsspec is not installed the hook will be a no-op.
try:
# pylint: disable=import-outside-toplevel
from opentelemetry.util.genai._fsspec_upload.fsspec_hook import (
FsspecUploadHook,
)
except ImportError:
return _NoOpUploadHook()

base_path = environ.get(OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH)
if not base_path:
return _NoOpUploadHook()

return FsspecUploadHook(base_path=base_path)
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from __future__ import annotations

import json
import logging
import posixpath
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from dataclasses import asdict, dataclass
from functools import partial
from typing import Any, Callable, Literal, TextIO, cast
from uuid import uuid4

import fsspec

from opentelemetry._logs import LogRecord
from opentelemetry.trace import Span
from opentelemetry.util.genai import types
from opentelemetry.util.genai.upload_hook import UploadHook

_logger = logging.getLogger(__name__)


@dataclass
class Completion:
inputs: list[types.InputMessage]
outputs: list[types.OutputMessage]
system_instruction: list[types.MessagePart]


@dataclass
class CompletionRefs:
inputs_ref: str
outputs_ref: str
system_instruction_ref: str


JsonEncodeable = list[dict[str, Any]]

# mapping of upload path to function computing upload data dict
UploadData = dict[str, Callable[[], JsonEncodeable]]


def fsspec_open(urlpath: str, mode: Literal["w"]) -> TextIO:
"""typed wrapper around `fsspec.open`"""
return cast(TextIO, fsspec.open(urlpath, mode)) # pyright: ignore[reportUnknownMemberType]


class FsspecUploadHook(UploadHook):
"""An upload hook using ``fsspec`` to upload to external storage

This function can be used as the
:func:`~opentelemetry.util.genai.upload_hook.load_upload_hook` implementation by
setting :envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK` to ``fsspec``.
:envvar:`OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH` must be configured to specify the
base path for uploads.

Both the ``fsspec`` and ``opentelemetry-sdk`` packages should be installed, or a no-op
implementation will be used instead. You can use ``opentelemetry-util-genai[fsspec]``
as a requirement to achieve this.
"""

def __init__(
self,
*,
base_path: str,
max_size: int = 20,
) -> None:
self._base_path = base_path
self._max_size = max_size

# Use a ThreadPoolExecutor for its queueing and thread management. The semaphore
# limits the number of queued tasks. If the queue is full, data will be dropped.
self._executor = ThreadPoolExecutor(max_workers=max_size)
self._semaphore = threading.BoundedSemaphore(max_size)

def _submit_all(self, upload_data: UploadData) -> None:
def done(future: Future[None]) -> None:
self._semaphore.release()

try:
future.result()
except Exception: # pylint: disable=broad-except
_logger.exception("fsspec uploader failed")

for path, json_encodeable in upload_data.items():
# could not acquire, drop data
if not self._semaphore.acquire(blocking=False): # pylint: disable=consider-using-with
_logger.warning(
"fsspec upload queue is full, dropping upload %s",
path,
)
continue

try:
fut = self._executor.submit(
self._do_upload, path, json_encodeable
)
fut.add_done_callback(done)
except RuntimeError:
_logger.info(
"attempting to upload file after FsspecUploadHook.shutdown() was already called"
)
break

def _calculate_ref_path(self) -> CompletionRefs:
# TODO: experimental with using the trace_id and span_id, or fetching
# gen_ai.response.id from the active span.

uuid_str = str(uuid4())
return CompletionRefs(
inputs_ref=posixpath.join(
self._base_path, f"{uuid_str}_inputs.json"
),
outputs_ref=posixpath.join(
self._base_path, f"{uuid_str}_outputs.json"
),
system_instruction_ref=posixpath.join(
self._base_path, f"{uuid_str}_system_instruction.json"
),
)

@staticmethod
def _do_upload(
path: str, json_encodeable: Callable[[], JsonEncodeable]
) -> None:
with fsspec_open(path, "w") as file:
json.dump(json_encodeable(), file, separators=(",", ":"))

def upload(
self,
*,
inputs: list[types.InputMessage],
outputs: list[types.OutputMessage],
system_instruction: list[types.MessagePart],
span: Span | None = None,
log_record: LogRecord | None = None,
**kwargs: Any,
) -> None:
completion = Completion(
inputs=inputs,
outputs=outputs,
system_instruction=system_instruction,
)
# generate the paths to upload to
ref_names = self._calculate_ref_path()

def to_dict(
dataclass_list: list[types.InputMessage]
| list[types.OutputMessage]
| list[types.MessagePart],
) -> JsonEncodeable:
return [asdict(dc) for dc in dataclass_list]

self._submit_all(
{
# Use partial to defer as much as possible to the background threads
ref_names.inputs_ref: partial(to_dict, completion.inputs),
ref_names.outputs_ref: partial(to_dict, completion.outputs),
ref_names.system_instruction_ref: partial(
to_dict, completion.system_instruction
),
},
)

# TODO: stamp the refs on telemetry

def shutdown(self) -> None:
# TODO: support timeout
self._executor.shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,24 @@
"""
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_HOOK
"""

OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH = (
"OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH"
)
"""
.. envvar:: OTEL_INSTRUMENTATION_GENAI_UPLOAD_BASE_PATH

An :func:`fsspec.open` compatible URI/path for uploading prompts and responses. Can be a local
path like ``/path/to/prompts`` or a cloud storage URI such as ``gs://my_bucket``. For more
information, see

* `Instantiate a file-system
<https://filesystem-spec.readthedocs.io/en/latest/usage.html#instantiate-a-file-system>`_ for supported values and how to
install support for additional backend implementations.
* `Configuration
<https://filesystem-spec.readthedocs.io/en/latest/features.html#configuration>`_ for
configuring a backend with environment variables.
* `URL Chaining
<https://filesystem-spec.readthedocs.io/en/latest/features.html#url-chaining>`_ for advanced
use cases.
"""
3 changes: 2 additions & 1 deletion util/opentelemetry-util-genai/test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pytest==7.4.4
-e opentelemetry-instrumentation
fsspec==2025.9.0
-e opentelemetry-instrumentation
Loading