Skip to content
LlamaAgents
Agent Workflows

Streaming events

Workflows often take time to finish. They may call slow providers, branch, fan out over a batch, or wait for a human response. Streaming lets you surface progress while the run is still moving.

There are two sides to streaming:

SideAPI
Inside a stepctx.write_event_to_stream(...)
Outside the workflowhandler.stream_events()

workflow.run(...) starts the workflow and returns a WorkflowHandler. The handler is awaitable for the final result, and it also owns the event stream for that run.

To get this done, let’s bring in all the deps we need:

import asyncio
from llama_index.llms.openai import OpenAI
from workflows import (
Workflow,
Context,
step,
)
from workflows.events import (
StartEvent,
StopEvent,
Event,
)

Let’s set up some events for a simple three-step workflow, plus an event to handle streaming our progress as we go:

class FirstEvent(Event):
first_output: str
class SecondEvent(Event):
second_output: str
response: str
class ProgressEvent(Event):
msg: str

And define a workflow class that sends events:

class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening"))
return FirstEvent(first_output="First step complete.")
@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
llm = OpenAI(model="gpt-4o-mini")
generator = await llm.astream_complete(
"Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
)
full_resp = ""
async for response in generator:
# Allow the workflow to stream this piece of response
ctx.write_event_to_stream(ProgressEvent(msg=response.delta))
full_resp += response.delta
return SecondEvent(
second_output="Second step complete, full response attached",
response=full_resp,
)
@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening"))
return StopEvent(result="Workflow complete.")

In step_one and step_three we write individual events to the event stream. In step_two we use astream_complete to produce an iterable generator of the LLM’s response, then we produce an event for each chunk of data the LLM sends back to us before returning the final response to step_three.

To actually get this output, we need to run the workflow asynchronously and listen for the events, like this:

async def main():
w = MyWorkflow(timeout=30, verbose=True)
handler = w.run(first_input="Start the workflow.")
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg)
final_result = await handler
print("Final result", final_result)
if __name__ == "__main__":
asyncio.run(main())

run schedules the workflow in the background. stream_events() yields every event written to the stream and stops when the stream delivers a StopEvent. After that, await the handler to get the final result.

A handler stream can be consumed once. If you need to broadcast workflow events to multiple clients, consume the stream once in your application and fan those events out yourself.

When a workflow ends abnormally, a specific StopEvent subclass is published to the stream before the exception is raised from await handler:

  • WorkflowTimedOutEvent - Published when the workflow exceeds its timeout. Contains timeout (seconds) and active_steps (list of step names that were running).
  • WorkflowCancelledEvent - Published when the workflow is cancelled by the user.
  • WorkflowFailedEvent - Published when a step fails permanently after exhausting retries. Contains step_name, exception, attempts, and elapsed_seconds.
from workflows.events import (
WorkflowTimedOutEvent,
WorkflowCancelledEvent,
WorkflowFailedEvent,
)
async for ev in handler.stream_events():
if isinstance(ev, WorkflowTimedOutEvent):
print(f"Workflow timed out after {ev.timeout}s")
elif isinstance(ev, WorkflowCancelledEvent):
print("Workflow was cancelled")
elif isinstance(ev, WorkflowFailedEvent):
print(f"Step '{ev.step_name}' failed after {ev.attempts} attempts: {ev.exception}")
Note for AI agents: this documentation is built for programmatic access. - Overview of all docs: https://developers.llamaindex.ai/llms.txt - Any page is available as raw Markdown by appending index.md to its URL — e.g. https://developers.llamaindex.ai/llamaparse/parse/getting_started/index.md - Agent-friendly REST search APIs live under https://developers.llamaindex.ai/api/ — search (BM25 full-text), grep (regex), read (fetch a page), and list (browse the doc tree). See https://developers.llamaindex.ai/llms.txt for parameters. - A hosted documentation MCP server is available at https://developers.llamaindex.ai/mcp. If you support MCP, you can ask the user to install it for browsing these docs directly (an alternative to the REST API). Setup: https://developers.llamaindex.ai/python/shared/mcp/