Skip to content

Commit 58b4c81

Browse files
fix: Prevent client disconnect from stopping task execution (#440)
# Issue * Client disconnect triggered synchronous cleanup. * That awaited the producer task, effectively tying producer lifetime to the client connection. * Reconnecting with `tasks/resubscribe` would not receive further events because the producer had already been forced to finish. This behaviour no longer raises a `asyncio.exceptions.CancelledError` like claimed in #296 due to this fix: #383, but `tasks/resubscribe` still didn't behave as expected. # How it's reproduced In any streaming agent: Simply sending a (longer-running) `message/stream`, disconnecting, and then reconnecting to the task using `tasks/resubscribe` will no longer yield events, even though the task should have been still running. # Fix ## Code The fix is an one-liner. Now: * Client disconnect schedules cleanup in the background and returns immediately. * Producer continues; resubscribe taps the existing queue and receives subsequent events. * Cleanup still runs once the producer completes. ## Tests **Existing tests:** * Changed existing tests that asserted on `AgentExecutor.execute` by adding an `asyncio.Event` latch to wait until the background producer hits `execute`. **New tests:** * `test_stream_disconnect_then_resubscribe_receives_future_events` -- start streaming, disconnect, resubscribe, and confirm future events are received. * `test_on_message_send_stream_client_disconnect_triggers_background_cleanup_and_producer_continues` -- to validate that disconnecting is non-blocking, producer continues, and cleanup completes afterward. Fixes #296
1 parent 6d0ef59 commit 58b4c81

File tree

3 files changed

+381
-3
lines changed

3 files changed

+381
-3
lines changed

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class DefaultRequestHandler(RequestHandler):
6767
"""
6868

6969
_running_agents: dict[str, asyncio.Task]
70+
_background_tasks: set[asyncio.Task]
7071

7172
def __init__( # noqa: PLR0913
7273
self,
@@ -102,6 +103,9 @@ def __init__( # noqa: PLR0913
102103
# TODO: Likely want an interface for managing this, like AgentExecutionManager.
103104
self._running_agents = {}
104105
self._running_agents_lock = asyncio.Lock()
106+
# Tracks background tasks (e.g., deferred cleanups) to avoid orphaning
107+
# asyncio tasks and to surface unexpected exceptions.
108+
self._background_tasks = set()
105109

106110
async def on_get_task(
107111
self,
@@ -355,10 +359,11 @@ async def push_notification_callback() -> None:
355359
raise
356360
finally:
357361
if interrupted_or_non_blocking:
358-
# TODO: Track this disconnected cleanup task.
359-
asyncio.create_task( # noqa: RUF006
362+
cleanup_task = asyncio.create_task(
360363
self._cleanup_producer(producer_task, task_id)
361364
)
365+
cleanup_task.set_name(f'cleanup_producer:{task_id}')
366+
self._track_background_task(cleanup_task)
362367
else:
363368
await self._cleanup_producer(producer_task, task_id)
364369

@@ -394,7 +399,11 @@ async def on_message_send_stream(
394399
)
395400
yield event
396401
finally:
397-
await self._cleanup_producer(producer_task, task_id)
402+
cleanup_task = asyncio.create_task(
403+
self._cleanup_producer(producer_task, task_id)
404+
)
405+
cleanup_task.set_name(f'cleanup_producer:{task_id}')
406+
self._track_background_task(cleanup_task)
398407

399408
async def _register_producer(
400409
self, task_id: str, producer_task: asyncio.Task
@@ -403,6 +412,29 @@ async def _register_producer(
403412
async with self._running_agents_lock:
404413
self._running_agents[task_id] = producer_task
405414

415+
def _track_background_task(self, task: asyncio.Task) -> None:
416+
"""Tracks a background task and logs exceptions on completion.
417+
418+
This avoids unreferenced tasks (and associated lint warnings) while
419+
ensuring any exceptions are surfaced in logs.
420+
"""
421+
self._background_tasks.add(task)
422+
423+
def _on_done(completed: asyncio.Task) -> None:
424+
try:
425+
# Retrieve result to raise exceptions, if any
426+
completed.result()
427+
except asyncio.CancelledError:
428+
name = completed.get_name()
429+
logger.debug('Background task %s cancelled', name)
430+
except Exception:
431+
name = completed.get_name()
432+
logger.exception('Background task %s failed', name)
433+
finally:
434+
self._background_tasks.discard(completed)
435+
436+
task.add_done_callback(_on_done)
437+
406438
async def _cleanup_producer(
407439
self,
408440
producer_task: asyncio.Task,

0 commit comments

Comments
 (0)