Events
Event
#
Bases: DictLikeModel
Base class for all workflow events.
Events are light-weight, serializable payloads passed between steps. They support both attribute and mapping access to dynamic fields.
Examples:
Subclassing with typed fields:
from pydantic import Field
class CustomEv(Event):
score: int = Field(ge=0)
e = CustomEv(score=10)
print(e.score)
Source code in workflows/events.py
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
InputRequiredEvent
#
Bases: Event
Emitted when human input is required to proceed.
Automatically written to the event stream if returned from a step.
If returned from a step, it does not need to be consumed by other steps and will pass validation. It's expected that the caller will respond to this event and send back a HumanResponseEvent.
Use this directly or subclass it.
Typical flow: a step returns InputRequiredEvent, callers consume it from
the stream and send back a HumanResponseEvent.
Examples:
from workflows.events import InputRequiredEvent, HumanResponseEvent
class HITLWorkflow(Workflow):
@step
async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
return InputRequiredEvent(prefix="What's your name? ")
@step
async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
return StopEvent(result=ev.response)
Source code in workflows/events.py
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 | |
HumanResponseEvent
#
Bases: Event
Carries a human's response for a prior input request.
If consumed by a step and not returned by another, it will still pass validation.
Examples:
from workflows.events import InputRequiredEvent, HumanResponseEvent
class HITLWorkflow(Workflow):
@step
async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
return InputRequiredEvent(prefix="What's your name? ")
@step
async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
return StopEvent(result=ev.response)
Source code in workflows/events.py
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 | |
StartEvent
#
Bases: Event
Implicit entry event sent to kick off a Workflow.run().
Source code in workflows/events.py
257 258 | |
StopEvent
#
Bases: Event
Terminal event that signals the workflow has completed.
The result property contains the return value of the workflow run. When a
custom stop event subclass is used, the workflow result is that event
instance itself.
Examples:
# default stop event: result holds the value
return StopEvent(result={"answer": 42})
Subclassing to provide a custom result:
```python class MyStopEv(StopEvent): pass
@step async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv: return MyStopEv(result={"answer": 42})
Source code in workflows/events.py
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 | |
WorkflowTimedOutEvent
#
Bases: StopEvent
Published when a workflow exceeds its configured timeout.
This event is published to the event stream when a workflow times out, allowing consumers to understand why the workflow ended before the WorkflowTimeoutError exception is raised.
Attributes:
| Name | Type | Description |
|---|
Examples:
async for event in handler.stream_events():
if isinstance(event, WorkflowTimedOutEvent):
print(f"Workflow timed out after {event.timeout}s")
print(f"Active steps: {event.active_steps}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
|
required |
active_steps
|
list[str]
|
|
required |
Source code in workflows/events.py
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 | |
WorkflowCancelledEvent
#
Bases: StopEvent
Published when a workflow is cancelled by the user.
This event is published to the event stream when a workflow is cancelled via the handler or programmatically, allowing consumers to understand why the workflow ended before the WorkflowCancelledByUser exception is raised.
Examples:
async for event in handler.stream_events():
if isinstance(event, WorkflowCancelledEvent):
print("Workflow was cancelled by user")
Source code in workflows/events.py
342 343 344 345 346 347 348 349 350 351 352 353 354 355 | |
WorkflowFailedEvent
#
Bases: StopEvent
Published when a workflow step fails permanently.
Published when a step fails and all retries are exhausted (or no retry policy permits a retry, or a catch_error handler itself raised).
Attributes:
| Name | Type | Description |
|---|
Examples:
async for event in handler.stream_events():
if isinstance(event, WorkflowFailedEvent):
print(f"Step '{event.step_name}' failed after {event.attempts} attempts")
print(f"{type(event.exception).__name__}: {event.exception}")
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_name
|
str
|
|
required |
exception
|
Exception
|
|
required |
attempts
|
int
|
|
required |
elapsed_seconds
|
float
|
|
required |
Source code in workflows/events.py
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 | |
StepFailedEvent
#
Bases: Event
Delivered to a @catch_error handler when a step exhausts its retries.
The handler may inspect the fields to decide how to recover. Returning a
StopEvent completes the workflow successfully; raising from the handler
propagates the new exception and fails the workflow.
Attributes:
| Name | Type | Description |
|---|
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_name
|
str
|
|
required |
input_event
|
Event
|
|
required |
exception
|
Exception
|
|
required |
attempts
|
int
|
|
required |
elapsed_seconds
|
float
|
|
required |
failed_at
|
datetime
|
|
required |
Source code in workflows/events.py
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 | |