Skip to content
LlamaAgents
Agent Workflows

Writing durable workflows

Workflows are ephemeral by default. Once run() returns, the state is gone, and the next run() starts fresh. But a workflow is often where you’ve written an ad hoc concurrent process. You fan out over a hundred documents, call an LLM per chunk, and collect the results. That is the kind of run you don’t want to start over from zero when the process is killed halfway through.

This page shows how to checkpoint that run and resume it after a restart.

A run advances by reducing events in a controlled way, so its state is well defined at every step boundary. That state is the events still in flight plus the state store. Context.to_dict() serializes it, Context.from_dict() rebuilds it, and run(ctx=...) continues from there, even in a different process:

w = MyWorkflow()
handler = w.run()
result = await handler
# Persist the snapshot
db.save("my-run", json.dumps(handler.ctx.to_dict()))
# ...new process...
w = MyWorkflow()
ctx = Context.from_dict(w, json.loads(db.load("my-run")))
result = await w.run(ctx=ctx) # continues with the restored state

This survives a restart, but not a crash during the run. You only have the state as of the last to_dict(). To survive a crash, snapshot as the run progresses.

On resume, the restored run re-dispatches the events that were still pending and rebuilds the partial fan-in buffers. Completed steps don’t re-run, because their output is already in the restored state. A step that was mid-execution when you captured the state is rewound and runs again from the top. Resume is at-least-once, and step side effects need to be safe to repeat.

There is no built-in checkpointer to enable. The workflow stream emits an internal event every time a step changes state, and you snapshot when you see one finish. stream_events(expose_internal=True) surfaces those internal events, and StepStateChanged with StepState.NOT_RUNNING marks a step finishing:

from workflows.events import StepStateChanged, StepState
handler = w.run()
async for ev in handler.stream_events(expose_internal=True):
if isinstance(ev, StepStateChanged) and ev.step_state == StepState.NOT_RUNNING:
db.save("my-run", json.dumps(handler.ctx.to_dict()))
result = await handler

To resume after a crash, load the last snapshot and pass it to run():

w = MyWorkflow()
ctx = Context.from_dict(w, json.loads(db.load("my-run")))
result = await w.run(ctx=ctx)

In a busy run, you usually don’t need to write on every boundary. Each snapshot is a serialize and a write, so throttle it if the workflow is noisy. A crash then costs you at most that interval of redone work.

The snapshot is the state at the moment you take it. Other workers may have advanced since the event that triggered the write, but the state is still consistent and resumable.

A concurrent fan-out that survives a restart

Section titled “A concurrent fan-out that survives a restart”

A workflow that fans out work, runs items concurrently, and collects the results. Checkpointed so a kill mid-run doesn’t redo completed items:

import json, os
from typing import Annotated
from workflows import Context, Workflow, step
from workflows.events import Event, StartEvent, StepState, StepStateChanged, StopEvent
from workflows.resource import Resource
class WorkItem(Event):
item_id: int
class WorkDone(Event):
item_id: int
result: str
# Heavy inputs come from a Resource (see below). They aren't serialized into
# the snapshot; they're re-created on resume.
def get_client() -> MyApiClient:
return MyApiClient(...)
class EnrichBatch(Workflow):
@step
async def dispatch(self, ev: StartEvent) -> list[WorkItem]:
return [WorkItem(item_id=item_id) for item_id in ev.item_ids]
@step(num_workers=8)
async def enrich(
self,
ev: WorkItem,
client: Annotated[MyApiClient, Resource(get_client)],
) -> WorkDone:
result = await client.enrich(ev.item_id) # the expensive, repeatable work
return WorkDone(item_id=ev.item_id, result=result)
@step
async def collect(self, events: list[WorkDone]) -> StopEvent:
return StopEvent(result={ev.item_id: ev.result for ev in events})

Drive it with the checkpoint loop, and resume from the last snapshot if the process died:

CKPT = "enrich-batch.json"
async def run_batch(item_ids):
wf = EnrichBatch()
if os.path.exists(CKPT):
# Resume from the snapshot. The pending work and partial fan-in state
# are already in the context, so don't send the StartEvent again.
handler = wf.run(ctx=Context.from_dict(wf, json.load(open(CKPT))))
else:
handler = wf.run(item_ids=item_ids)
async for ev in handler.stream_events(expose_internal=True):
if isinstance(ev, StepStateChanged) and ev.step_state == StepState.NOT_RUNNING \
and ev.name == "enrich":
json.dump(handler.ctx.to_dict(), open(CKPT, "w"))
return await handler

Say this is killed after 60 of 100 items. The finished enrich calls are part of the restored workflow state, and the pending WorkItems are still pending. On resume, dispatch does not re-run, the completed items are not re-enriched, and the list fan-in fires once all 100 WorkDones are present across both runs.

The in-flight work does get repeated. At any moment up to num_workers enrich calls are running but not finished, and each of those is rewound and re-enriched on resume. With num_workers=8 that is up to 8 items run again. It is wasted compute, not wrong output, and it is why enrich has to be safe to repeat.

The snapshot is only as cheap and reliable as what you put in events and state.

Workflow instance attributes are not part of the snapshot. They can be useful for ordinary Python objects that live as long as the workflow instance does, but they are not durable state. If a value needs to survive restart, put it in an event or the state store.

Keep heavy or non-serializable inputs in a Resource, not in events or state. A Resource factory is resolved once, cached on the workflow, and never serialized into the context. On resume it is re-created by calling the factory again. So put API clients, model handles, and large reference data in resources, and let your events carry small identifiers instead. This keeps the snapshot small, and keeps you clear of the serialization rule below.

Everything on an in-flight event and in the state store has to be serializable. Snapshots use a JSON serializer, Pydantic models included. If it hits a value it can’t encode, to_dict() raises and the whole snapshot fails, not just that field. So don’t put raw bytes or open connections on events.

The checkpoint loop is durability you control. You decide when to snapshot, and resume is a couple of lines. If you’d rather not manage that, use a runtime plugin that owns persistence and recovery.

The default runtime runs workflows in-process. A plugin runtime can swap in a different execution model without changing the workflow code. For example, the DBOS runtime journals step transitions to a database, so a crashed workflow can resume without your checkpoint loop. WorkflowServer can run workflows with the same runtime plugin, so served workflows get the same persistence and recovery behavior.

Note for AI agents: this documentation is built for programmatic access. - Overview of all docs: https://developers.llamaindex.ai/llms.txt - Any page is available as raw Markdown by appending index.md to its URL — e.g. https://developers.llamaindex.ai/llamaparse/parse/getting_started/index.md - Agent-friendly REST search APIs live under https://developers.llamaindex.ai/api/ — search (BM25 full-text), grep (regex), read (fetch a page), and list (browse the doc tree). See https://developers.llamaindex.ai/llms.txt for parameters. - A hosted documentation MCP server is available at https://developers.llamaindex.ai/mcp. If you support MCP, you can ask the user to install it for browsing these docs directly (an alternative to the REST API). Setup: https://developers.llamaindex.ai/python/shared/mcp/