Skip to content

Commit 742d77f

Browse files
committed
feat: add support for run_stream
1 parent 8c26119 commit 742d77f

File tree

3 files changed

+188
-8
lines changed

3 files changed

+188
-8
lines changed

sentry_sdk/integrations/pydantic_ai/patches/agent_run.py

Lines changed: 124 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,59 @@
1717
raise DidNotEnable("pydantic-ai not installed")
1818

1919

20-
def _create_run_wrapper(original_func):
21-
# type: (Callable[..., Any]) -> Callable[..., Any]
20+
class _StreamingContextManagerWrapper:
21+
"""Wrapper for streaming methods that return async context managers."""
22+
23+
def __init__(self, agent, original_ctx_manager, is_streaming=True):
24+
# type: (Any, Any, bool) -> None
25+
self.agent = agent
26+
self.original_ctx_manager = original_ctx_manager
27+
self.is_streaming = is_streaming
28+
self._isolation_scope = None
29+
self._workflow_span = None
30+
31+
async def __aenter__(self):
32+
# type: () -> Any
33+
# Set up isolation scope and workflow span
34+
self._isolation_scope = sentry_sdk.isolation_scope()
35+
self._isolation_scope.__enter__()
36+
37+
# Store agent reference and streaming flag
38+
sentry_sdk.get_current_scope().set_context(
39+
"pydantic_ai_agent", {"_agent": self.agent, "_streaming": self.is_streaming}
40+
)
41+
42+
# Create workflow span
43+
self._workflow_span = agent_workflow_span(self.agent)
44+
self._workflow_span.__enter__()
45+
46+
# Enter the original context manager
47+
result = await self.original_ctx_manager.__aenter__()
48+
return result
49+
50+
async def __aexit__(self, exc_type, exc_val, exc_tb):
51+
# type: (Any, Any, Any) -> None
52+
try:
53+
# Exit the original context manager first
54+
await self.original_ctx_manager.__aexit__(exc_type, exc_val, exc_tb)
55+
finally:
56+
# Clean up workflow span
57+
if self._workflow_span:
58+
self._workflow_span.__exit__(exc_type, exc_val, exc_tb)
59+
60+
# Clean up isolation scope
61+
if self._isolation_scope:
62+
self._isolation_scope.__exit__(exc_type, exc_val, exc_tb)
63+
64+
65+
def _create_run_wrapper(original_func, is_streaming=False):
66+
# type: (Callable[..., Any], bool) -> Callable[..., Any]
2267
"""
2368
Wraps the Agent.run method to create a root span for the agent workflow.
69+
70+
Args:
71+
original_func: The original run method
72+
is_streaming: Whether this is a streaming method (for future use)
2473
"""
2574

2675
@wraps(original_func)
@@ -29,10 +78,10 @@ async def wrapper(self, *args, **kwargs):
2978
# Isolate each workflow so that when agents are run in asyncio tasks they
3079
# don't touch each other's scopes
3180
with sentry_sdk.isolation_scope():
32-
# Store agent reference in Sentry scope for access in nested spans
81+
# Store agent reference and streaming flag in Sentry scope for access in nested spans
3382
# We store the full agent to allow access to tools and system prompts
3483
sentry_sdk.get_current_scope().set_context(
35-
"pydantic_ai_agent", {"_agent": self}
84+
"pydantic_ai_agent", {"_agent": self, "_streaming": is_streaming}
3685
)
3786

3887
with agent_workflow_span(self):
@@ -57,6 +106,7 @@ def _create_run_sync_wrapper(original_func):
57106
# type: (Callable[..., Any]) -> Callable[..., Any]
58107
"""
59108
Wraps the Agent.run_sync method to create a root span for the agent workflow.
109+
Note: run_sync is always non-streaming.
60110
"""
61111

62112
@wraps(original_func)
@@ -65,10 +115,10 @@ def wrapper(self, *args, **kwargs):
65115
# Isolate each workflow so that when agents are run they
66116
# don't touch each other's scopes
67117
with sentry_sdk.isolation_scope():
68-
# Store agent reference in Sentry scope for access in nested spans
118+
# Store agent reference and streaming flag in Sentry scope for access in nested spans
69119
# We store the full agent to allow access to tools and system prompts
70120
sentry_sdk.get_current_scope().set_context(
71-
"pydantic_ai_agent", {"_agent": self}
121+
"pydantic_ai_agent", {"_agent": self, "_streaming": False}
72122
)
73123

74124
with agent_workflow_span(self):
@@ -89,18 +139,84 @@ def wrapper(self, *args, **kwargs):
89139
return wrapper
90140

91141

142+
def _create_streaming_wrapper(original_func):
143+
# type: (Callable[..., Any]) -> Callable[..., Any]
144+
"""
145+
Wraps run_stream method that returns an async context manager.
146+
"""
147+
148+
@wraps(original_func)
149+
def wrapper(self, *args, **kwargs):
150+
# type: (Any, *Any, **Any) -> Any
151+
# Call original function to get the context manager
152+
original_ctx_manager = original_func(self, *args, **kwargs)
153+
154+
# Wrap it with our instrumentation
155+
return _StreamingContextManagerWrapper(
156+
agent=self, original_ctx_manager=original_ctx_manager, is_streaming=True
157+
)
158+
159+
return wrapper
160+
161+
162+
def _create_streaming_events_wrapper(original_func):
163+
# type: (Callable[..., Any]) -> Callable[..., Any]
164+
"""
165+
Wraps run_stream_events method that returns an async generator/iterator.
166+
"""
167+
168+
@wraps(original_func)
169+
async def wrapper(self, *args, **kwargs):
170+
# type: (Any, *Any, **Any) -> Any
171+
# Isolate each workflow so that when agents are run in asyncio tasks they
172+
# don't touch each other's scopes
173+
with sentry_sdk.isolation_scope():
174+
# Store agent reference and streaming flag in Sentry scope for access in nested spans
175+
sentry_sdk.get_current_scope().set_context(
176+
"pydantic_ai_agent", {"_agent": self, "_streaming": True}
177+
)
178+
179+
with agent_workflow_span(self):
180+
try:
181+
# Call the original generator and yield all events
182+
async for event in original_func(self, *args, **kwargs):
183+
yield event
184+
except Exception as exc:
185+
_capture_exception(exc)
186+
187+
# It could be that there is an "invoke agent" span still open
188+
current_span = sentry_sdk.get_current_span()
189+
if current_span is not None and current_span.timestamp is None:
190+
current_span.__exit__(None, None, None)
191+
192+
raise exc from None
193+
194+
return wrapper
195+
196+
92197
def _patch_agent_run():
93198
# type: () -> None
94199
"""
95200
Patches the Agent run methods to create spans for agent execution.
201+
202+
This patches both non-streaming (run, run_sync) and streaming
203+
(run_stream, run_stream_events) methods.
96204
"""
97205
# Import here to avoid circular imports
98206
from pydantic_ai.agent import Agent
99207

100208
# Store original methods
101209
original_run = Agent.run
102210
original_run_sync = Agent.run_sync
211+
original_run_stream = Agent.run_stream
212+
original_run_stream_events = Agent.run_stream_events
103213

104-
# Wrap and apply patches
105-
Agent.run = _create_run_wrapper(original_run) # type: ignore
214+
# Wrap and apply patches for non-streaming methods
215+
Agent.run = _create_run_wrapper(original_run, is_streaming=False) # type: ignore
106216
Agent.run_sync = _create_run_sync_wrapper(original_run_sync) # type: ignore
217+
218+
# Wrap and apply patches for streaming methods
219+
Agent.run_stream = _create_streaming_wrapper(original_run_stream) # type: ignore
220+
Agent.run_stream_events = _create_streaming_events_wrapper(
221+
original_run_stream_events
222+
) # type: ignore

sentry_sdk/integrations/pydantic_ai/patches/graph_nodes.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,65 @@ async def wrapped_model_request_run(self, ctx):
103103

104104
ModelRequestNode.run = wrapped_model_request_run # type: ignore
105105

106+
# Patch ModelRequestNode.stream for streaming requests
107+
original_model_request_stream = ModelRequestNode.stream
108+
109+
def create_wrapped_stream(original_stream_method):
110+
# type: (Callable[..., Any]) -> Callable[..., Any]
111+
"""Create a wrapper for ModelRequestNode.stream that creates chat spans."""
112+
from contextlib import asynccontextmanager
113+
114+
@asynccontextmanager
115+
@wraps(original_stream_method)
116+
async def wrapped_model_request_stream(self, ctx):
117+
# type: (Any, Any) -> Any
118+
# Extract data from context
119+
model = None
120+
model_settings = None
121+
if hasattr(ctx, "deps"):
122+
model = getattr(ctx.deps, "model", None)
123+
model_settings = getattr(ctx.deps, "model_settings", None)
124+
125+
# Build full message list: history + current request
126+
messages = []
127+
128+
# Add message history
129+
if hasattr(ctx, "state") and hasattr(ctx.state, "message_history"):
130+
messages.extend(ctx.state.message_history)
131+
132+
# Add current request
133+
current_request = getattr(self, "request", None)
134+
if current_request:
135+
messages.append(current_request)
136+
137+
# Create chat span for streaming request
138+
import sentry_sdk
139+
140+
span = ai_client_span(messages, None, model, model_settings)
141+
span.__enter__()
142+
143+
try:
144+
# Call the original stream method
145+
async with original_stream_method(self, ctx) as stream:
146+
yield stream
147+
148+
# After streaming completes, update span with response data
149+
# The ModelRequestNode stores the final response in _result
150+
model_response = None
151+
if hasattr(self, "_result") and self._result is not None:
152+
# _result is a NextNode containing the model_response
153+
if hasattr(self._result, "model_response"):
154+
model_response = self._result.model_response
155+
156+
update_ai_client_span(span, model_response)
157+
finally:
158+
# Close the span after streaming completes
159+
span.__exit__(None, None, None)
160+
161+
return wrapped_model_request_stream
162+
163+
ModelRequestNode.stream = create_wrapped_stream(original_model_request_stream) # type: ignore
164+
106165
# Patch CallToolsNode to close invoke_agent span when done
107166
original_call_tools_run = CallToolsNode.run
108167

sentry_sdk/integrations/pydantic_ai/spans/ai_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ def ai_client_span(messages, agent, model, model_settings):
4545
_set_agent_data(span, agent)
4646
_set_model_data(span, model, model_settings)
4747

48+
# Set streaming flag
49+
agent_data = sentry_sdk.get_current_scope()._contexts.get("pydantic_ai_agent") or {}
50+
is_streaming = agent_data.get("_streaming", False)
51+
span.set_data(SPANDATA.GEN_AI_RESPONSE_STREAMING, is_streaming)
52+
4853
# Add available tools if agent is available
4954
agent_obj = agent
5055
if not agent_obj:

0 commit comments

Comments
 (0)