5. Express Agent
This section assumes you’ve completed the previous step adding HITL (Human-In-The-Loop). Here we convert the single-process workflow into a client/server model using the Express framework.
Creating the server process
Section titled “Creating the server process”We’re using the workflow code from the previous step and turn it into a server process using Express. This process will listen to start requests from the client to initiate a workflow and resume requests to continue an existing workflow.
As filename, we’re using 5-server.ts
.
1. Wrap the workflow in an Express app
Section titled “1. Wrap the workflow in an Express app”We start by creating an Express server:
import express from "express";
// Add existing workflow code here
const app = express();app.use(express.json());
// In the following steps, we will add the POST handlers here// POST /workflow/start// POST /workflow/resume
const PORT = process.env.PORT || 3000;app.listen(PORT, () => { console.log(`Server running on port ${PORT}`);});
2. Persist snapshots per request
Section titled “2. Persist snapshots per request”When a human interaction is requested, the server takes a snapshot and stores it by requestId
in a map so a later call can resume the workflow. We’re going to place this map at the header of the file:
import { v4 as uuid } from "uuid";import { SnapshotData } from "@llamaindex/workflow-core/middleware/state";
const snapshots = new Map<string, SnapshotData>();
3. Start endpoint: run and possibly pause for human
Section titled “3. Start endpoint: run and possibly pause for human”Now, let’s add the start endpoint. It creates a workflow context and listens for humanRequestEvent
to create a snapshot and early-return with a response of type waiting_for_human
that contains the request id and the current messages.
Otherwise, if no human interaction is requested, it waits until finalResponseEvent
and returns a response of type completed
that contains the final messages.
app.post("/workflow/start", async (req, res) => { const requestId = uuid(); const { messages } = req.body;
const context = workflow.createContext({ expectedToolCount: 0, messages: [], toolResponses: [], humanToolId: null, });
context.stream.on(humanRequestEvent, async () => { const snapshotData = await context.snapshot(); snapshots.set(requestId, snapshotData); res.json({ type: "waiting_for_human", requestId, messages: context.state.messages, }); });
context.sendEvent(userInputEvent.with({ messages })); await context.stream.until(finalResponseEvent).toArray();
res.json({ type: "completed", messages: context.state.messages });});
4. Resume endpoint: continue with human input
Section titled “4. Resume endpoint: continue with human input”Time to add the resume endpoint that continues a workflow that is waiting for human input.
It finds the stored snapshot by requestId
and re-creates the workflow using this previously persisted snapshot.
Then it sends a humanResponseEvent
with the user’s answer to continue the workflow. The user’s answer is passed as userInput
in the request body.
After the workflow is complete (we’re waiting for finalResponseEvent
), we delete the snapshot from the map and return a response of type completed
that contains the final messages.
app.post("/workflow/resume", async (req, res) => { const { requestId, userInput } = req.body; const snapshotData = snapshots.get(requestId); if (!snapshotData) { res.status(404).json({ error: "Request ID not found" }); return; }
const context = workflow.resume(snapshotData); context.sendEvent(humanResponseEvent.with(userInput)); await context.stream.until(finalResponseEvent).toArray();
snapshots.delete(requestId); res.json({ type: "completed", messages: context.state.messages });});
Creating the client process
Section titled “Creating the client process”As filename, we’re using 5-client.ts
. The client process simply orchestrates two HTTP calls: start and then resumes with human input.
First, we add the imports and create a helper function to send HTTP requests to the server:
import * as readline from "readline/promises";
async function makeRequest(endpoint: string, data: any) { const response = await fetch(`http://localhost:3000${endpoint}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(data), }); return response.json();}
Now we can use this helper function to start the workflow:
const startResponse = await makeRequest("/workflow/start", { messages: [ { role: "user" as const, content: "What's the weather in San Francisco and what is the user's name?", }, ],});
Then we check that we’re receiving a waiting_for_human
response, otherwise we log the final messages and exit (this case happens when the workflow doesn’t require human input):
if (startResponse.type !== "waiting_for_human") { console.log( "Workflow completed immediately. Messages:", startResponse.messages, ); process.exit(0);}
Otherwise, if we receive a waiting_for_human
response, we collect the user’s name and resume the workflow with a second request to /workflow/resume
:
console.log("Workflow is waiting for human input...");console.log("Current messages:", startResponse.messages);
// Get user inputconst rl = readline.createInterface({ input: process.stdin, output: process.stdout,});
const userName = await rl.question("What is your name? ");rl.close();
// Second request: Resume the workflow with user inputconst resumeResponse = await makeRequest("/workflow/resume", { requestId: startResponse.requestId, userInput: userName,});
console.log("Final messages:", resumeResponse.messages);
Run the example
Section titled “Run the example”First, you can compare your code with the complete version:
- Server:
demo/express/5-server.ts
- Client:
demo/express/5-client.ts
Then, you can run the example. First, start the server in a terminal:
npm run tsx 5-server.ts
Then, run the client in another terminal:
npm run tsx 5-client.ts