Skip to content

Conversation

@dsfaccini
Copy link
Collaborator

@dsfaccini dsfaccini commented Dec 11, 2025

Problem

Sync history processors fail inside Temporal workflows because anyio.to_thread.run_sync tries to create threads, which Temporal's sandboxed event loop doesn't support. This causes NotImplementedError.

Solution

Use a ContextVar to control when run_in_executor() should execute sync functions directly (blocking) vs using threading. This follows the approach discussed in the issue comments and approved by @DouweM.

Changes

  • _utils.py: Added _prefer_blocking_execution ContextVar, removed Temporal detection code
  • temporal/_agent.py: Set ContextVar in _temporal_overrides() context manager
  • test_utils.py: Updated test to use ContextVar instead of mocking Temporal internals
  • test_temporal.py: Added integration test for sync history processors in workflows

Testing

  • Unit test verifies ContextVar controls blocking behavior
  • Integration test validates sync history processors work in Temporal workflows
@github-actions
Copy link

github-actions bot commented Dec 11, 2025

Docs Preview

commit: f331873
Preview URL: https://b5b2951d-pydantic-ai-previews.pydantic.workers.dev
with super().override(model=self._model, toolsets=self._toolsets, tools=[]):
token = self._temporal_overrides_active.set(True)
temporal_active_token = self._temporal_overrides_active.set(True)
blocking_token = _prefer_blocking_execution.set(True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a contextmanager exposed by the utils module, so that we don't deal with the contextvar directly here. See the current_run_context feature in #3537

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we can add that contextmanager to the with statement above

return messages[1:] if len(messages) > 1 else messages


agent_with_sync_history_processor = Agent(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to just ahead of the workflow that uses it, to keep the test file consistently ordered

simple_temporal_agent = TemporalAgent(simple_agent, activity_config=BASE_ACTIVITY_CONFIG)


def drop_first_message_sync(messages: list[ModelMessage]) -> list[ModelMessage]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this an inline lambda as it's very simple

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyright always complains about lambdas, and the length causes a line break, so I'd prefer to keep the extra 1-2 lines with proper types and a nice name



async def test_run_in_executor_with_blocking_execution_enabled() -> None:
from pydantic_ai._utils import _prefer_blocking_execution # pyright: ignore[reportPrivateUsage]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above; let's not use the private var, but a new public contextmanager

_P = ParamSpec('_P')
_R = TypeVar('_R')

_prefer_blocking_execution: ContextVar[bool] = ContextVar('_prefer_blocking_execution', default=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"prefer" makes it sound like this expresses a preference, that may or may not be respected. So "enable" or no prefix at all would be better



@contextmanager
def blocking_execution() -> Iterator[None]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with _utils.blocking_execution() makes it seems like all execution will be blocking, but that's not true because of async etc. So maybe we can rename this to something like with disable_threads()?


@contextmanager
def _temporal_overrides(self) -> Iterator[None]:
from pydantic_ai._utils import blocking_execution
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the top please! Imports inside methods are exclusively for circular import issues, and files that depend on optional packages

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good to know!

@DouweM DouweM changed the title Fix sync history processors in Temporal workflows using ContextVar Fix using sync history processors, instructions functions, and output functions with TemporalAgent Dec 16, 2025
@DouweM DouweM merged commit 848e381 into main Dec 16, 2025
30 checks passed
@DouweM DouweM deleted the fix-sync-history-processor-anyio-worker-exception branch December 16, 2025 23:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment