Managing events
Waiting for Multiple Events
Section titled “Waiting for Multiple Events”The context does more than just hold data, it also provides utilities to buffer and wait for multiple events.
For example, you might have a step that waits for a query and retrieved nodes before synthesizing a response:
from llama_index.core import get_response_synthesizer
@stepasync def synthesize( self, ctx: Context, ev: QueryEvent | RetrieveEvent) -> StopEvent | None: data = ctx.collect_events(ev, [QueryEvent, RetrieveEvent]) # check if we can run if data is None: return None
# unpack -- data is returned in order query_event, retrieve_event = data
# run response synthesis synthesizer = get_response_synthesizer() response = synthesizer.synthesize( query_event.query, nodes=retrieve_event.nodes )
return StopEvent(result=response)
Using ctx.collect_events()
we can buffer and wait for ALL expected events to arrive. This function will only return data (in the requested order) once all events have arrived.
Manually Triggering Events
Section titled “Manually Triggering Events”Normally, events are triggered by returning another event during a step. However, events can also be manually dispatched using the ctx.send_event(event)
method within a workflow.
Here is a short toy example showing how this would be used:
from llama_index.core.workflow import step, Context, Event, Workflow
class MyEvent(Event): pass
class MyEventResult(Event): result: str
class GatherEvent(Event): pass
class MyWorkflow(Workflow): @step async def dispatch_step( self, ctx: Context, ev: StartEvent ) -> MyEvent | GatherEvent: ctx.send_event(MyEvent()) ctx.send_event(MyEvent())
return GatherEvent()
@step async def handle_my_event(self, ev: MyEvent) -> MyEventResult: return MyEventResult(result="result")
@step async def gather( self, ctx: Context, ev: GatherEvent | MyEventResult ) -> StopEvent | None: # wait for events to finish events = ctx.collect_events(ev, [MyEventResult, MyEventResult]) if not events: return None
return StopEvent(result=events)
Streaming Events
Section titled “Streaming Events”You can also iterate over events as they come in. This is useful for streaming purposes, showing progress, or for debugging. The handler object will emit events that are explicitly written to the stream using ctx.write_event_to_stream()
:
class ProgressEvent(Event): msg: str
class MyWorkflow(Workflow): @step async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening")) return FirstEvent(first_output="First step complete.")
You can then pick up the events like this:
w = MyWorkflow(...)
handler = w.run(topic="Pirates")
async for event in handler.stream_events(): print(event)
result = await handler
Human-in-the-loop
Section titled “Human-in-the-loop”Since workflows are so flexible, there are many possible ways to implement human-in-the-loop patterns.
The easiest way to implement a human-in-the-loop is to use the InputRequiredEvent
and HumanResponseEvent
events during event streaming.
from llama_index.core.workflow import InputRequiredEvent, HumanResponseEvent
class HumanInTheLoopWorkflow(Workflow): @step async def step1(self, ev: StartEvent) -> InputRequiredEvent: return InputRequiredEvent(prefix="Enter a number: ")
@step async def step2(self, ev: HumanResponseEvent) -> StopEvent: return StopEvent(result=ev.response)
# workflow should work with streamingworkflow = HumanInTheLoopWorkflow()
handler = workflow.run()async for event in handler.stream_events(): if isinstance(event, InputRequiredEvent): # here, we can handle human input however you want # this means using input(), websockets, accessing async state, etc. # here, we just use input() response = input(event.prefix) handler.ctx.send_event(HumanResponseEvent(response=response))
final_result = await handler
Here, the workflow will wait until the HumanResponseEvent
is emitted.
Also note that you can break out of the loop, and resume it later. This is useful if you want to pause the workflow to wait for a human response, but continue the workflow later.
handler = workflow.run()async for event in handler.stream_events(): if isinstance(event, InputRequiredEvent): break
# now we handle the human responseresponse = input(event.prefix)handler.ctx.send_event(HumanResponseEvent(response=response))
# now we resume the workflow streamingasync for event in handler.stream_events(): continue
final_result = await handler