π is a simple, lightweight workflow engine, in TypeScript.
- Minimal core API (<=2kb)
- 100% Type safe
- Event-driven, stream oriented programming
- Support multiple JS runtime/framework
npm i @llamaindex/workflow-core yarn add @llamaindex/workflow-core pnpm add @llamaindex/workflow-core bun add @llamaindex/workflow-core deno add npm:@llamaindex/workflow-core
For examples, check out the demo folder.
import { workflowEvent } from "@llamaindex/workflow-core"; const startEvent = workflowEvent<string>(); const stopEvent = workflowEvent<1 | -1>();
import { createWorkflow } from "@llamaindex/workflow-core"; const convertEvent = workflowEvent(); const workflow = createWorkflow(); workflow.handle([startEvent], (start) => { return convertEvent.with(Number.parseInt(start.data, 10)); }); workflow.handle([convertEvent], (convert) => { return stopEvent.with(convert.data > 0 ? 1 : -1); });
import { pipeline } from "node:stream/promises"; const { stream, sendEvent } = workflow.createContext(); sendEvent(startEvent.with()); const result = await pipeline(stream, async function (source) { for await (const event of source) { if (stopEvent.include(event)) { return "stop received!"; } } }); console.log(result); // stop received! // or const allEvents = await stream.until(stopEvent).toArray();
There are helper functions to make working with workflows even simpler:
import { runWorkflow, runAndCollect, runWorkflowWithFilter, } from "@llamaindex/workflow-core/stream/run"; // Run workflow and get final result const result = await runWorkflow(workflow, startEvent.with("42"), stopEvent); // Run workflow and collect all events const allEvents = await runAndCollect( workflow, startEvent.with("42"), stopEvent, );
By default, we provide a simple fan-out utility to run multiple workflows in parallel
context.sendEvent
will emit a new event to current workflowcontext.stream
will return a stream of events emitted by the sub-workflow
let condition = false; workflow.handle([startEvent], async (context, start) => { const { sendEvent, stream } = context; for (let i = 0; i < 10; i++) { sendEvent(convertEvent.with(i)); } // You define the condition to stop the workflow const results = await stream .until(() => condition) .filter(convertStopEvent) .toArray(); console.log(results.length); // 10 return stopEvent.with(); }); workflow.handle([convertEvent], (convert) => { if (convert.data === 9) { condition = true; } return convertStopEvent.with(/* ... */); });
Workflow is event-driven, you can use any stream API to handle the workflow like rxjs
import { from, pipe } from "rxjs"; const { stream, sendEvent } = workflow.createContext(); from(stream) .pipe(filter((ev) => eventSource(ev) === messageEvent)) .subscribe((ev) => { console.log(ev.data); }); sendEvent(fileParseWorkflow.startEvent(directory));
Workflow can be used as middleware in any server framework, like express
, hono
, fastify
, etc.
import { Hono } from "hono"; import { serve } from "@hono/node-server"; import { createHonoHandler } from "@llamaindex/workflow-core/interrupter/hono"; import { agentWorkflow, startEvent, stopEvent, } from "../workflows/tool-call-agent.js"; const app = new Hono(); app.post( "/workflow", createHonoHandler( agentWorkflow, async (ctx) => startEvent(await ctx.req.text()), stopEvent, ), ); serve(app, ({ port }) => { console.log(`Server started at http://localhost:${port}`); });
You can use signal
in the context parameter to handle error
workflow.handle([convertEvent], (context) => { const { signal } = context; signal.onabort = () => { console.error("error in convert event:", abort.reason); }; });
Workflow handlers receive the context as the first parameter, providing access to sendEvent
, stream
, and signal
.
workflow.handle([startEvent], async (context) => { const { sendEvent, stream, signal } = context; // Use context properties directly sendEvent(processEvent.with()); });
Adding a state
property to the workflow context, which returns a state object, each state is linked to the workflow context.
import { createStatefulMiddleware } from "@llamaindex/workflow-core/middleware/state"; const { withState } = createStatefulMiddleware(() => ({ pendingTasks: new Set<Promise<unknown>>(), })); const workflow = withState(createWorkflow()); workflow.handle([startEvent], (context) => { const { state } = context; state.pendingTasks.add( new Promise((resolve) => { setTimeout(() => { resolve(); }, 100); }), ); }); const { state } = workflow.createContext();
You can also create a state with input:
const { withState } = createStatefulMiddleware((input: { id: string }) => ({ id: input.id, })); const workflow = withState(createWorkflow()); const { state } = workflow.createContext({ id: "1" });
withState
also supports snapshot, you can use snapshot
to save the state of the workflow, and resume
to restore the state of the workflow.
const { snapshot, resume } = workflow.createContext(); // create snapshot const snapshotData = await snapshot(); // resume workflow from snapshot const { stream, sendEvent } = workflow.resume(snapshotData); sendEvent(humanResponseEvent.with("hello"));
Make first parameter of handler
to be sendEvent
and its type safe and runtime safe when you create a workflow using withValidation
.
// before: workflow.handle([startEvent], (start) => {}); // after: workflow.handle([startEvent], (sendEvent, start) => {});
import { withValidation } from "@llamaindex/workflow-core/middleware/validation"; const startEvent = workflowEvent<void, "start">(); const disallowedEvent = workflowEvent<void, "disallowed">({ debugLabel: "disallowed", }); const parseEvent = workflowEvent<string, "parse">(); const stopEvent = workflowEvent<number, "stop">(); const workflow = withValidation(createWorkflow(), [ [[startEvent], [stopEvent]], [[startEvent], [parseEvent]], ]); workflow.strictHandle([startEvent], (sendEvent, start) => { sendEvent( disallowedEvent.with(), // <-- β Type Check Failed, Runtime Error ); sendEvent(parseEvent.with("")); // <-- β
sendEvent(stopEvent.with(1)); // <-- β
});
Adds tracing capabilities to your workflow, allowing you to monitor/decorate handler and debug event flows easily.
When enabled, it collects events based on the directed graph of the runtime and provide lifecycle hooks for each handler.
import { withTraceEvents, runOnce, } from "@llamaindex/workflow-core/middleware/trace-events"; const workflow = withTraceEvents(createWorkflow()); workflow.handle( [messageEvent], runOnce(() => { console.log("This message handler will only run once"); }), ); workflow.handle([startEvent], (context) => { context.sendEvent(messageEvent.with()); context.sendEvent(messageEvent.with()); }); { const { sendEvent } = workflow.createContext(); sendEvent(startEvent.with()); sendEvent(messageEvent.with()); // This message handler will only run once! } { const { sendEvent } = workflow.createContext(); // For each new context, the decorator is isolated. sendEvent(startEvent.with()); sendEvent(messageEvent.with()); // This message handler will only run once! }
You can use substream
to create a substream from the workflow context, which will only emit events that are emitted by the target event.
const ev = startEvent.with(); const { sendEvent, stream } = workflow.createContext(); sendEvent(ev); sendEvent(messageEvent.with()); // <- this will not be included in the substream const substream = workflow.substream(ev, stream);
This is helpful when you have async requests, and you want to track the events that are emitted by the target event.
For example:
-
Parallel requests
without substream
workflow.handle([startEvent], async (context, { data: uuid }) => { const { sendEvent, stream } = context; const ev = networkRequestEvent.with(uuid); sendEvent(networkRequestEvent); // you need bypass uuid to all events to get the correct response const responses = await collect( filter(workflow.substream(ev, stream), (ev) => ev.data === uuid), ); }); sendEvent(startEvent.with(crypto.randomUUID())); sendEvent(startEvent.with(crypto.randomUUID()));
workflow.handle([startEvent], async (context) => { const { sendEvent, stream } = context; const ev = networkRequestEvent.with(); sendEvent(networkRequestEvent); const responses = await collect(workflow.substream(ev, stream)); }); sendEvent(startEvent.with()); sendEvent(startEvent.with());
You can create your own handler decorator to modify the behavior of the handler.
import { createHandlerDecorator } from "@llamaindex/workflow-core/middleware/trace-events"; const noop: (...args: any[]) => void = function noop() {}; export const runOnce = createHandlerDecorator({ debugLabel: "onceHook", getInitialValue: () => false, onBeforeHandler: (handler, handlerContext, tracked) => tracked ? noop : handler, onAfterHandler: () => true, });
The HandlerContext
includes the runtime information of the handler in the directed graph of the workflow.
type BaseHandlerContext = { // ... some other properties are hidden handler: Handler<WorkflowEvent<any>[], any>; inputEvents: WorkflowEvent<any>[]; // events data that are accepted by the handler inputs: WorkflowEventData<any>[]; // events data that are emitted by the handler outputs: WorkflowEventData<any>[]; //#region linked list data structure prev: HandlerContext; next: Set<HandlerContext>; root: HandlerContext; //#endregion }; type SyncHandlerContext = BaseHandlerContext & { async: false; pending: null; }; type AsyncHandlerContext = BaseHandlerContext & { async: true; pending: Promise<WorkflowEventData<any> | void> | null; }; type HandlerContext = AsyncHandlerContext | SyncHandlerContext;
For example, when you send two startEvent
events, and send messageEvent
twice (once in the handler and once in the global), the HandlerContext
from root to leaf is:
let once = false; workflow.handle([startEvent], (context) => { const { sendEvent } = context; if (once) { return; } once = true; sendEvent(messageEvent.with()); }); const { sendEvent } = workflow.createContext(); sendEvent(startEvent.with()); sendEvent(startEvent.with()); sendEvent(messageEvent.with());
rootHandlerContext(0) βββ startEventContext(0) β βββ messageEventContext(0) βββ startEventContext(1) βββ messageEventContext(1)
You can use any directed graph library to visualize the directed graph of the workflow.