Human in the Loop
Since workflows are so flexible, there are many possible ways to implement human-in-the-loop patterns.
The easiest way to implement a human-in-the-loop is to use the InputRequiredEvent and HumanResponseEvent events during event streaming.
from workflows.events import InputRequiredEvent, HumanResponseEvent class HumanInTheLoopWorkflow(Workflow): @step async def step1(self, ev: StartEvent) -> InputRequiredEvent: return InputRequiredEvent(prefix="Enter a number: ") @step async def step2(self, ev: HumanResponseEvent) -> StopEvent: return StopEvent(result=ev.response) # workflow should work with streamingworkflow = HumanInTheLoopWorkflow() handler = workflow.run()async for event in handler.stream_events(): if isinstance(event, InputRequiredEvent): # here, we can handle human input however you want # this means using input(), websockets, accessing async state, etc. # here, we just use input() response = input(event.prefix) handler.ctx.send_event(HumanResponseEvent(response=response)) final_result = await handlerHere, the workflow will wait until the HumanResponseEvent is emitted.
If needed, you can also subclass these two events to add custom payloads.
Stopping/Resuming Between Human Responses
Section titled “Stopping/Resuming Between Human Responses”Also note that you can break out of the loop, and resume it later. This is useful if you want to pause the workflow to wait for a human response, but continue the workflow later.
handler = workflow.run()async for event in handler.stream_events(): if isinstance(event, InputRequiredEvent): # Serialize the context, store it anywhere as a JSON blob ctx_dict = handler.ctx.to_dict() await handler.cancel_run() break ... # now we handle the human response once it comes inresponse = input(event.prefix) restored_ctx = Context.from_dict(workflow, ctx_dict)handler = workflow.run(ctx=restored_ctx) # Send the event to resume the workflowhandler.ctx.send_event(HumanResponseEvent(response=response)) # now we resume the workflow streaming with our restored contextasync for event in handler.stream_events(): continue final_result = await handler