Skip to content

Commit f91b38f

Browse files
authored
fix: improve run_sync's python 3.14 compatibility (openai#2006)
1 parent be94419 commit f91b38f

File tree

2 files changed

+196
-5
lines changed

2 files changed

+196
-5
lines changed

src/agents/run.py

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import contextlib
45
import inspect
56
import os
7+
import warnings
68
from dataclasses import dataclass, field
79
from typing import Any, Callable, Generic, cast, get_args
810

@@ -720,19 +722,40 @@ def run_sync(
720722
conversation_id = kwargs.get("conversation_id")
721723
session = kwargs.get("session")
722724

723-
# Python 3.14 no longer creates a default loop implicitly, so we inspect the running loop.
725+
# Python 3.14 stopped implicitly wiring up a default event loop
726+
# when synchronous code touches asyncio APIs for the first time.
727+
# Several of our synchronous entry points (for example the Redis/SQLAlchemy session helpers)
728+
# construct asyncio primitives like asyncio.Lock during __init__,
729+
# which binds them to whatever loop happens to be the thread's default at that moment.
730+
# To keep those locks usable we must ensure that run_sync reuses that same default loop
731+
# instead of hopping over to a brand-new asyncio.run() loop.
724732
try:
725-
loop = asyncio.get_running_loop()
733+
already_running_loop = asyncio.get_running_loop()
726734
except RuntimeError:
727-
loop = None
735+
already_running_loop = None
728736

729-
if loop is not None:
737+
if already_running_loop is not None:
730738
# This method is only expected to run when no loop is already active.
739+
# (Each thread has its own default loop; concurrent sync runs should happen on
740+
# different threads. In a single thread use the async API to interleave work.)
731741
raise RuntimeError(
732742
"AgentRunner.run_sync() cannot be called when an event loop is already running."
733743
)
734744

735-
return asyncio.run(
745+
policy = asyncio.get_event_loop_policy()
746+
with warnings.catch_warnings():
747+
warnings.simplefilter("ignore", DeprecationWarning)
748+
try:
749+
default_loop = policy.get_event_loop()
750+
except RuntimeError:
751+
default_loop = policy.new_event_loop()
752+
policy.set_event_loop(default_loop)
753+
754+
# We intentionally leave the default loop open even if we had to create one above. Session
755+
# instances and other helpers stash loop-bound primitives between calls and expect to find
756+
# the same default loop every time run_sync is invoked on this thread.
757+
# Schedule the async run on the default loop so that we can manage cancellation explicitly.
758+
task = default_loop.create_task(
736759
self.run(
737760
starting_agent,
738761
input,
@@ -746,6 +769,24 @@ def run_sync(
746769
)
747770
)
748771

772+
try:
773+
# Drive the coroutine to completion, harvesting the final RunResult.
774+
return default_loop.run_until_complete(task)
775+
except BaseException:
776+
# If the sync caller aborts (KeyboardInterrupt, etc.), make sure the scheduled task
777+
# does not linger on the shared loop by cancelling it and waiting for completion.
778+
if not task.done():
779+
task.cancel()
780+
with contextlib.suppress(asyncio.CancelledError):
781+
default_loop.run_until_complete(task)
782+
raise
783+
finally:
784+
if not default_loop.is_closed():
785+
# The loop stays open for subsequent runs, but we still need to flush any pending
786+
# async generators so their cleanup code executes promptly.
787+
with contextlib.suppress(RuntimeError):
788+
default_loop.run_until_complete(default_loop.shutdown_asyncgens())
789+
749790
def run_streamed(
750791
self,
751792
starting_agent: Agent[TContext],

tests/test_agent_runner_sync.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import asyncio
2+
from collections.abc import Generator
3+
from typing import Any
4+
5+
import pytest
6+
7+
from agents.agent import Agent
8+
from agents.run import AgentRunner
9+
10+
11+
@pytest.fixture
12+
def fresh_event_loop_policy() -> Generator[asyncio.AbstractEventLoopPolicy, None, None]:
13+
policy_before = asyncio.get_event_loop_policy()
14+
new_policy = asyncio.DefaultEventLoopPolicy()
15+
asyncio.set_event_loop_policy(new_policy)
16+
try:
17+
yield new_policy
18+
finally:
19+
asyncio.set_event_loop_policy(policy_before)
20+
21+
22+
def test_run_sync_reuses_existing_default_loop(monkeypatch, fresh_event_loop_policy):
23+
runner = AgentRunner()
24+
observed_loops: list[asyncio.AbstractEventLoop] = []
25+
26+
async def fake_run(self, *_args, **_kwargs):
27+
observed_loops.append(asyncio.get_running_loop())
28+
return object()
29+
30+
monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)
31+
32+
test_loop = asyncio.new_event_loop()
33+
fresh_event_loop_policy.set_event_loop(test_loop)
34+
35+
try:
36+
runner.run_sync(Agent(name="test-agent"), "input")
37+
assert observed_loops and observed_loops[0] is test_loop
38+
finally:
39+
fresh_event_loop_policy.set_event_loop(None)
40+
test_loop.close()
41+
42+
43+
def test_run_sync_creates_default_loop_when_missing(monkeypatch, fresh_event_loop_policy):
44+
runner = AgentRunner()
45+
observed_loops: list[asyncio.AbstractEventLoop] = []
46+
47+
async def fake_run(self, *_args, **_kwargs):
48+
observed_loops.append(asyncio.get_running_loop())
49+
return object()
50+
51+
monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)
52+
53+
fresh_event_loop_policy.set_event_loop(None)
54+
55+
runner.run_sync(Agent(name="test-agent"), "input")
56+
created_loop = observed_loops[0]
57+
assert created_loop is fresh_event_loop_policy.get_event_loop()
58+
59+
fresh_event_loop_policy.set_event_loop(None)
60+
created_loop.close()
61+
62+
63+
def test_run_sync_errors_when_loop_already_running(monkeypatch, fresh_event_loop_policy):
64+
runner = AgentRunner()
65+
66+
async def fake_run(self, *_args, **_kwargs):
67+
return object()
68+
69+
monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)
70+
71+
async def invoke():
72+
with pytest.raises(RuntimeError):
73+
runner.run_sync(Agent(name="test-agent"), "input")
74+
75+
asyncio.run(invoke())
76+
77+
78+
def test_run_sync_cancels_task_when_interrupted(monkeypatch, fresh_event_loop_policy):
79+
runner = AgentRunner()
80+
81+
async def fake_run(self, *_args, **_kwargs):
82+
await asyncio.sleep(3600)
83+
84+
monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)
85+
86+
test_loop = asyncio.new_event_loop()
87+
fresh_event_loop_policy.set_event_loop(test_loop)
88+
89+
created_tasks: list[asyncio.Task[Any]] = []
90+
original_create_task = test_loop.create_task
91+
92+
def capturing_create_task(coro):
93+
task = original_create_task(coro)
94+
created_tasks.append(task)
95+
return task
96+
97+
original_run_until_complete = test_loop.run_until_complete
98+
call_count = {"value": 0}
99+
100+
def interrupt_once(future):
101+
call_count["value"] += 1
102+
if call_count["value"] == 1:
103+
raise KeyboardInterrupt()
104+
return original_run_until_complete(future)
105+
106+
monkeypatch.setattr(test_loop, "create_task", capturing_create_task)
107+
monkeypatch.setattr(test_loop, "run_until_complete", interrupt_once)
108+
109+
try:
110+
with pytest.raises(KeyboardInterrupt):
111+
runner.run_sync(Agent(name="test-agent"), "input")
112+
113+
assert created_tasks, "Expected run_sync to schedule a task."
114+
assert created_tasks[0].done()
115+
assert created_tasks[0].cancelled()
116+
assert call_count["value"] >= 2
117+
finally:
118+
monkeypatch.undo()
119+
fresh_event_loop_policy.set_event_loop(None)
120+
test_loop.close()
121+
122+
123+
def test_run_sync_finalizes_async_generators(monkeypatch, fresh_event_loop_policy):
124+
runner = AgentRunner()
125+
cleanup_markers: list[str] = []
126+
127+
async def fake_run(self, *_args, **_kwargs):
128+
async def agen():
129+
try:
130+
yield None
131+
finally:
132+
cleanup_markers.append("done")
133+
134+
gen = agen()
135+
await gen.__anext__()
136+
return "ok"
137+
138+
monkeypatch.setattr(AgentRunner, "run", fake_run, raising=False)
139+
140+
test_loop = asyncio.new_event_loop()
141+
fresh_event_loop_policy.set_event_loop(test_loop)
142+
143+
try:
144+
runner.run_sync(Agent(name="test-agent"), "input")
145+
assert cleanup_markers == ["done"], (
146+
"Async generators must be finalized after run_sync returns."
147+
)
148+
finally:
149+
fresh_event_loop_policy.set_event_loop(None)
150+
test_loop.close()

0 commit comments

Comments
 (0)