Skip to content

Fan-In/Fan-Out

One of the most powerful features of workflows is the ability to run tasks in parallel. The fan-in/fan-out pattern allows you to emit multiple events simultaneously and process them concurrently, then aggregate the results.

This works by:

  1. Emitting multiple events to be processed in parallel
  2. Collecting results as they come in
  3. Completing once all parallel tasks are finished

In order to implement the fan-in/fan-out pattern, we can follow these steps:

1. Define events and workflow

First, we define the events that will flow through our workflow:

import {
createWorkflow,
workflowEvent,
} from "@llamaindex/workflow-core";
import { createStatefulMiddleware } from "@llamaindex/workflow-core/middleware/state";
// Define the events we'll use
const startEvent = workflowEvent<string>(); // Triggers the fan-out process
const processItemEvent = workflowEvent<number>(); // Individual items to process
const resultEvent = workflowEvent<string>(); // Results from processed items
const completeEvent = workflowEvent<string[]>(); // Final aggregated results

Next, we create our workflow and set up tracking variables in the state:

const { withState } = createStatefulMiddleware(() => ({
itemsToProcess: 10,
itemsProcessed: 0,
processResults: [] as string[],
}));
const workflow = withState(createWorkflow());

2. Create the fan-out handler

This handler receives the start event and fans out multiple processing tasks:

workflow.handle([startEvent], async (context, start) => {
const { sendEvent, state} = context;
state.itemsProcessed = 0; // Reset counter for this execution
// Fan out: emit multiple events to be processed in parallel
for (let i = 0; i < state.itemsToProcess; i++) {
sendEvent(processItemEvent.with(i));
}
});

The for loop emits all processItemEvent events in parallel, enabling concurrent processing. Results are collected as they arrive using stream.filter().until().toArray(), with the until() condition ensuring that processing continues until all items have been handled. This approach is non-blocking, as the handler completes immediately after initiating the parallel processing.

3. The processing handler

This handler processes each individual item:

workflow.handle([processItemEvent], async (context, event) => {
const { sendEvent, state } = context;
// Simulate some async work (like API calls, database operations, etc.)
await new Promise((resolve) => setTimeout(resolve, Math.random() * 100));
// Process the item
const processedValue = `Processed: ${event.data}`;
// Update the shared counter after processing completes
state.itemsProcessed++;
// Return the result event
sendEvent(resultEvent.with(processedValue));
});

Each item is processed independently and can perform asynchronous operations, such as API calls or database queries. Because a random delay is introduced for each item, they complete at different times, demonstrating true parallelism. After processing, the shared counter itemsProcessed is incremented to track progress, and a resultEvent is emitted with the processed data. This ensures that results are collected as soon as each item finishes, enabling efficient aggregation and real-time monitoring of workflow progress.

4. Add a Fan-In Handler

In the last step of our workflow, we need to collect the results from the processing steps and store them, so that we will be able to return them once all the events have been consumed (sending a completeEvent):

workflow.handle([resultEvent], async (context, event) => {
const { sendEvent, state } = context;
// store the processed message
state.processResults.push(event.data)
// return completeEvent if the processing is completed
if (state.itemsProcessed === state.itemsToProcess) {
sendEvent(completeEvent.with(state.processResults))
}
});

You will notice that the order of the processed items in the final result varies with each execution, since in the processing step we simulate a parallel processing behavior by introducing random time-outs.

4. Running the Workflow

Here’s how to execute the fan-in/fan-out workflow and observe the results:

async function runFanOut() {
console.log("Running fan-out");
const { stream, sendEvent } = workflow.createContext();
// Start the fan-out process
sendEvent(startEvent.with("Start fan-out"));
// Listen to all events as they occur
for await (const event of stream) {
if (processItemEvent.include(event)) {
console.log(`Processing item: ${event.data}`);
} else if (resultEvent.include(event)) {
console.log(`Result received: ${event.data}`);
} else if (completeEvent.include(event)) {
console.log("Final aggregated results:", event.data);
break; // All done!
}
}
}
runFanOut().catch(console.error);