Streaming events
Workflows often take time to finish. They may call slow providers, branch, fan out over a batch, or wait for a human response. Streaming lets you surface progress while the run is still moving.
There are two sides to streaming:
| Side | API |
|---|---|
| Inside a step | ctx.write_event_to_stream(...) |
| Outside the workflow | handler.stream_events() |
workflow.run(...) starts the workflow and returns a WorkflowHandler. The handler is awaitable for the final result, and it also owns the event stream for that run.
To get this done, let’s bring in all the deps we need:
import asynciofrom llama_index.llms.openai import OpenAI
from workflows import ( Workflow, Context, step,)from workflows.events import ( StartEvent, StopEvent, Event,)Let’s set up some events for a simple three-step workflow, plus an event to handle streaming our progress as we go:
class FirstEvent(Event): first_output: str
class SecondEvent(Event): second_output: str response: str
class ProgressEvent(Event): msg: strAnd define a workflow class that sends events:
class MyWorkflow(Workflow): @step async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening")) return FirstEvent(first_output="First step complete.")
@step async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent: llm = OpenAI(model="gpt-4o-mini") generator = await llm.astream_complete( "Please give me the first 3 paragraphs of Moby Dick, a book in the public domain." ) full_resp = "" async for response in generator: # Allow the workflow to stream this piece of response ctx.write_event_to_stream(ProgressEvent(msg=response.delta)) full_resp += response.delta
return SecondEvent( second_output="Second step complete, full response attached", response=full_resp, )
@step async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step three is happening")) return StopEvent(result="Workflow complete.")In step_one and step_three we write individual events to the event stream. In step_two we use astream_complete to produce an iterable generator of the LLM’s response, then we produce an event for each chunk of data the LLM sends back to us before returning the final response to step_three.
To actually get this output, we need to run the workflow asynchronously and listen for the events, like this:
async def main(): w = MyWorkflow(timeout=30, verbose=True) handler = w.run(first_input="Start the workflow.")
async for ev in handler.stream_events(): if isinstance(ev, ProgressEvent): print(ev.msg)
final_result = await handler print("Final result", final_result)
if __name__ == "__main__": asyncio.run(main())run schedules the workflow in the background. stream_events() yields every event written to the stream and stops when the stream delivers a StopEvent. After that, await the handler to get the final result.
A handler stream can be consumed once. If you need to broadcast workflow events to multiple clients, consume the stream once in your application and fan those events out yourself.
Handling workflow termination
Section titled “Handling workflow termination”When a workflow ends abnormally, a specific StopEvent subclass is published to the stream before the exception is raised from await handler:
WorkflowTimedOutEvent- Published when the workflow exceeds its timeout. Containstimeout(seconds) andactive_steps(list of step names that were running).WorkflowCancelledEvent- Published when the workflow is cancelled by the user.WorkflowFailedEvent- Published when a step fails permanently after exhausting retries. Containsstep_name,exception,attempts, andelapsed_seconds.
from workflows.events import ( WorkflowTimedOutEvent, WorkflowCancelledEvent, WorkflowFailedEvent,)
async for ev in handler.stream_events(): if isinstance(ev, WorkflowTimedOutEvent): print(f"Workflow timed out after {ev.timeout}s") elif isinstance(ev, WorkflowCancelledEvent): print("Workflow was cancelled") elif isinstance(ev, WorkflowFailedEvent): print(f"Step '{ev.step_name}' failed after {ev.attempts} attempts: {ev.exception}")