Workflows
A Workflow
in LlamaIndex is a lightweight, event-driven abstraction used to chain together several events. Workflows are made up of handlers
, with each one responsible for processing specific event types and emitting new events.
Workflows are designed to be flexible and can be used to build agents, RAG flows, extraction flows, or anything else you want to implement.
npm i @llamaindex/workflow @llamaindex/openai
Getting Started
Section titled “Getting Started”Let’s explore a simple workflow example where a joke is generated and then critiqued and iterated on:
There are a few moving pieces here, so let’s go through this step by step.
Defining Workflow Events
Section titled “Defining Workflow Events”const startEvent = workflowEvent<string>(); // Input topic for jokeconst jokeEvent = workflowEvent<{ joke: string }>(); // Intermediate jokeconst critiqueEvent = workflowEvent<{ joke: string; critique: string }>(); // Intermediate critiqueconst resultEvent = workflowEvent<{ joke: string; critique: string }>(); // Final joke + critique
Events are defined using the workflowEvent
function and contain arbitrary data provided as a generic type. In this example, we have four events:
startEvent
: Takes a string input (the joke topic)jokeEvent
: Contains an object with a joke propertycritiqueEvent
: Contains both the joke and its critique, used for the feedback loopresultEvent
: Contains the final joke and critique after any iterations
Setting up the Workflow with Stateful Middleware
Section titled “Setting up the Workflow with Stateful Middleware”const { withState, getContext } = createStatefulMiddleware(() => ({ numIterations: 0, maxIterations: 3,}));const jokeFlow = withState(createWorkflow());
Our workflow is implemented using the createWorkflow()
function, enhanced with the withState
middleware. This middleware provides shared state across all handlers, which in this case tracks:
numIterations
: Counts how many iterations of joke improvement we’ve donemaxIterations
: Sets a limit to prevent infinite loops
This state will be accessible within workflows by using the getContext().state
function.
Adding Handlers with Loops
Section titled “Adding Handlers with Loops”We have three key handlers in our workflow:
- The first handler processes the
startEvent
, generates an initial joke, and emits ajokeEvent
:
jokeFlow.handle([startEvent], async (event) => { // Prompt the LLM to write a joke const prompt = `Write your best joke about ${event.data}. Write the joke between <joke> and </joke> tags.`; const response = await llm.complete({ prompt });
// Parse the joke from the response const joke = response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ?? response.text; return jokeEvent.with({ joke: joke });});
- The second handler handles the
jokeEvent
, critiques the joke, and either:- Emits a
critiqueEvent
if the joke needs improvement - Emits a
resultEvent
if the joke is good enough
- Emits a
jokeFlow.handle([jokeEvent], async (event) => { // Prompt the LLM to critique the joke const prompt = `Give a thorough critique of the following joke. If the joke needs improvement, put "IMPROVE" somewhere in the critique: ${event.data.joke}`; const response = await llm.complete({ prompt });
// If the critique includes "IMPROVE", keep iterating, else, return the result if (response.text.includes("IMPROVE")) { return critiqueEvent.with({ joke: event.data.joke, critique: response.text, }); }
return resultEvent.with({ joke: event.data.joke, critique: response.text });});
- The third handler processes the
critiqueEvent
, generates an improved joke based on the critique, and either:- Loops back to the joke evaluation (if under the iteration limit)
- Emits the final
resultEvent
(if iteration limit reached)
jokeFlow.handle([critiqueEvent], async (event) => { // Keep track of the number of iterations const state = getContext().state; state.numIterations++;
// Write a new joke based on the previous joke and critique const prompt = `Write a new joke based on the following critique and the original joke. Write the joke between <joke> and </joke> tags.\n\nJoke: ${event.data.joke}\n\nCritique: ${event.data.critique}`; const response = await llm.complete({ prompt });
// Parse the joke from the response const joke = response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ?? response.text;
// If we've done less than the max number of iterations, keep iterating // else, return the result if (state.numIterations < state.maxIterations) { return jokeEvent.with({ joke: joke }); }
return resultEvent.with({ joke: joke, critique: event.data.critique });});
Running the Workflow
Section titled “Running the Workflow”async function main() { const { stream, sendEvent } = jokeFlow.createContext(); sendEvent(startEvent.with("pirates"));
let result: { joke: string, critique: string } | undefined;
for await (const event of stream) { // console.log(event.data); optionally log the event data if (resultEvent.include(event)) { result = event.data; break; // Stop when we get the final result } }
console.log(result);}
To run the workflow, we:
- Create a workflow context with
createContext()
- Trigger the initial event with
sendEvent()
- Listen to the event stream and process events as they arrive
- Use
include()
to check if an event is of a specific type - Break the loop when we receive our final result
Using Stream Utilities
Section titled “Using Stream Utilities”The stream
returned by createContext
contains utility functions to make working with event streams easier:
// Create a workflow context and send the initial eventconst { stream, sendEvent } = jokeFlow.createContext();sendEvent(startEvent.with("pirates"));
// Collect all events until we get a resultEventconst allEvents = await stream.until(resultEvent).toArray();
// The last event will be the resultEventconst finalEvent = allEvents.at(-1);console.log(finalEvent.data); // Output the joke and critique
The stream utilities make it easier to work with the asynchronous event flow. In this example, we use:
toArray
: Aggregates all events into an arrayuntil
: Creates a stream that emits events until a condition is met (in this case, until a resultEvent is received)
You can combine these utilities with other stream operators like filter
and map
to create powerful processing pipelines.
Next Steps
Section titled “Next Steps”To learn more about workflows, check out the Workflows documentation.