Skip to content

5. Express Agent

This section assumes you’ve completed the previous step adding HITL (Human-In-The-Loop). Here we convert the single-process workflow into a client/server model using the Express framework.

We’re using the workflow code from the previous step and turn it into a server process using Express. This process will listen to start requests from the client to initiate a workflow and resume requests to continue an existing workflow. As filename, we’re using 5-server.ts.

We start by creating an Express server:

import express from "express";
// Add existing workflow code here
const app = express();
app.use(express.json());
// In the following steps, we will add the POST handlers here
// POST /workflow/start
// POST /workflow/resume
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});

When a human interaction is requested, the server takes a snapshot and stores it by requestId in a map so a later call can resume the workflow. We’re going to place this map at the header of the file:

import { v4 as uuid } from "uuid";
import { SnapshotData } from "@llamaindex/workflow-core/middleware/state";
const snapshots = new Map<string, SnapshotData>();

3. Start endpoint: run and possibly pause for human

Section titled “3. Start endpoint: run and possibly pause for human”

Now, let’s add the start endpoint. It creates a workflow context and listens for humanRequestEvent to create a snapshot and early-return with a response of type waiting_for_human that contains the request id and the current messages. Otherwise, if no human interaction is requested, it waits until finalResponseEvent and returns a response of type completed that contains the final messages.

app.post("/workflow/start", async (req, res) => {
const requestId = uuid();
const { messages } = req.body;
const context = workflow.createContext({
expectedToolCount: 0,
messages: [],
toolResponses: [],
humanToolId: null,
});
context.stream.on(humanRequestEvent, async () => {
const snapshotData = await context.snapshot();
snapshots.set(requestId, snapshotData);
res.json({
type: "waiting_for_human",
requestId,
messages: context.state.messages,
});
});
context.sendEvent(userInputEvent.with({ messages }));
await context.stream.until(finalResponseEvent).toArray();
res.json({ type: "completed", messages: context.state.messages });
});

4. Resume endpoint: continue with human input

Section titled “4. Resume endpoint: continue with human input”

Time to add the resume endpoint that continues a workflow that is waiting for human input. It finds the stored snapshot by requestId and re-creates the workflow using this previously persisted snapshot.

Then it sends a humanResponseEvent with the user’s answer to continue the workflow. The user’s answer is passed as userInput in the request body.

After the workflow is complete (we’re waiting for finalResponseEvent), we delete the snapshot from the map and return a response of type completed that contains the final messages.

app.post("/workflow/resume", async (req, res) => {
const { requestId, userInput } = req.body;
const snapshotData = snapshots.get(requestId);
if (!snapshotData) {
res.status(404).json({ error: "Request ID not found" });
return;
}
const context = workflow.resume(snapshotData);
context.sendEvent(humanResponseEvent.with(userInput));
await context.stream.until(finalResponseEvent).toArray();
snapshots.delete(requestId);
res.json({ type: "completed", messages: context.state.messages });
});

As filename, we’re using 5-client.ts. The client process simply orchestrates two HTTP calls: start and then resumes with human input.

First, we add the imports and create a helper function to send HTTP requests to the server:

import * as readline from "readline/promises";
async function makeRequest(endpoint: string, data: any) {
const response = await fetch(`http://localhost:3000${endpoint}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
});
return response.json();
}

Now we can use this helper function to start the workflow:

const startResponse = await makeRequest("/workflow/start", {
messages: [
{
role: "user" as const,
content:
"What's the weather in San Francisco and what is the user's name?",
},
],
});

Then we check that we’re receiving a waiting_for_human response, otherwise we log the final messages and exit (this case happens when the workflow doesn’t require human input):

if (startResponse.type !== "waiting_for_human") {
console.log(
"Workflow completed immediately. Messages:",
startResponse.messages,
);
process.exit(0);
}

Otherwise, if we receive a waiting_for_human response, we collect the user’s name and resume the workflow with a second request to /workflow/resume:

console.log("Workflow is waiting for human input...");
console.log("Current messages:", startResponse.messages);
// Get user input
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
const userName = await rl.question("What is your name? ");
rl.close();
// Second request: Resume the workflow with user input
const resumeResponse = await makeRequest("/workflow/resume", {
requestId: startResponse.requestId,
userInput: userName,
});
console.log("Final messages:", resumeResponse.messages);

First, you can compare your code with the complete version:

Then, you can run the example. First, start the server in a terminal:

Terminal window
npm run tsx 5-server.ts

Then, run the client in another terminal:

Terminal window
npm run tsx 5-client.ts