Fan-In/Fan-Out
Fan-In/Fan-out (Parallelism) Pattern
Section titled “Fan-In/Fan-out (Parallelism) Pattern”One of the most powerful features of workflows is the ability to run tasks in parallel. The fan-in/fan-out pattern allows you to emit multiple events simultaneously and process them concurrently, then aggregate the results.
This works by:
- Emitting multiple events to be processed in parallel
- Collecting results as they come in
- Completing once all parallel tasks are finished
In order to implement the fan-in/fan-out pattern, we can follow these steps:
1. Define events and workflow
First, we define the events that will flow through our workflow:
import { createWorkflow, workflowEvent,} from "@llamaindex/workflow-core";import { createStatefulMiddleware } from "@llamaindex/workflow-core/middleware/state";
// Define the events we'll useconst startEvent = workflowEvent<string>(); // Triggers the fan-out processconst processItemEvent = workflowEvent<number>(); // Individual items to processconst resultEvent = workflowEvent<string>(); // Results from processed itemsconst completeEvent = workflowEvent<string[]>(); // Final aggregated results
Next, we create our workflow and set up tracking variables in the state:
const { withState } = createStatefulMiddleware(() => ({ itemsToProcess: 10, itemsProcessed: 0, processResults: [] as string[],}));const workflow = withState(createWorkflow());
2. Create the fan-out handler
This handler receives the start event and fans out multiple processing tasks:
workflow.handle([startEvent], async (context, start) => { const { sendEvent, state} = context; state.itemsProcessed = 0; // Reset counter for this execution
// Fan out: emit multiple events to be processed in parallel for (let i = 0; i < state.itemsToProcess; i++) { sendEvent(processItemEvent.with(i)); }});
The for loop emits all processItemEvent events in parallel, enabling concurrent processing. Results are collected as they arrive using stream.filter().until().toArray()
, with the until()
condition ensuring that processing continues until all items have been handled. This approach is non-blocking, as the handler completes immediately after initiating the parallel processing.
3. The processing handler
This handler processes each individual item:
workflow.handle([processItemEvent], async (context, event) => { const { sendEvent, state } = context;
// Simulate some async work (like API calls, database operations, etc.) await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
// Process the item const processedValue = `Processed: ${event.data}`;
// Update the shared counter after processing completes state.itemsProcessed++;
// Return the result event sendEvent(resultEvent.with(processedValue));});
Each item is processed independently and can perform asynchronous operations, such as API calls or database queries. Because a random delay is introduced for each item, they complete at different times, demonstrating true parallelism. After processing, the shared counter itemsProcessed
is incremented to track progress, and a resultEvent
is emitted with the processed data. This ensures that results are collected as soon as each item finishes, enabling efficient aggregation and real-time monitoring of workflow progress.
4. Add a Fan-In Handler
In the last step of our workflow, we need to collect the results from the processing steps and store them, so that we will be able to return them once all the events have been consumed (sending a completeEvent
):
workflow.handle([resultEvent], async (context, event) => { const { sendEvent, state } = context;
// store the processed message state.processResults.push(event.data)
// return completeEvent if the processing is completed if (state.itemsProcessed === state.itemsToProcess) { sendEvent(completeEvent.with(state.processResults)) }});
You will notice that the order of the processed items in the final result varies with each execution, since in the processing step we simulate a parallel processing behavior by introducing random time-outs.
4. Running the Workflow
Here’s how to execute the fan-in/fan-out workflow and observe the results:
async function runFanOut() { console.log("Running fan-out"); const { stream, sendEvent } = workflow.createContext();
// Start the fan-out process sendEvent(startEvent.with("Start fan-out"));
// Listen to all events as they occur for await (const event of stream) { if (processItemEvent.include(event)) { console.log(`Processing item: ${event.data}`); } else if (resultEvent.include(event)) { console.log(`Result received: ${event.data}`); } else if (completeEvent.include(event)) { console.log("Final aggregated results:", event.data); break; // All done! } }}
runFanOut().catch(console.error);