Skip to content

Events

Event #

Bases: DictLikeModel

Base class for all workflow events.

Events are light-weight, serializable payloads passed between steps. They support both attribute and mapping access to dynamic fields.

Examples:

Subclassing with typed fields:

from pydantic import Field

class CustomEv(Event):
    score: int = Field(ge=0)

e = CustomEv(score=10)
print(e.score)
See Also
Source code in workflows/events.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class Event(DictLikeModel):
    """
    Base class for all workflow events.

    Events are light-weight, serializable payloads passed between steps.
    They support both attribute and mapping access to dynamic fields.

    Examples:
        Subclassing with typed fields:

        ```python
        from pydantic import Field

        class CustomEv(Event):
            score: int = Field(ge=0)

        e = CustomEv(score=10)
        print(e.score)
        ```

    See Also:
        - [StartEvent][workflows.events.StartEvent]
        - [StopEvent][workflows.events.StopEvent]
        - [InputRequiredEvent][workflows.events.InputRequiredEvent]
        - [HumanResponseEvent][workflows.events.HumanResponseEvent]
    """

    def __init__(self, **params: Any):
        super().__init__(**params)

InputRequiredEvent #

Bases: Event

Emitted when human input is required to proceed.

Automatically written to the event stream if returned from a step.

If returned from a step, it does not need to be consumed by other steps and will pass validation. It's expected that the caller will respond to this event and send back a HumanResponseEvent.

Use this directly or subclass it.

Typical flow: a step returns InputRequiredEvent, callers consume it from the stream and send back a HumanResponseEvent.

Examples:

from workflows.events import InputRequiredEvent, HumanResponseEvent

class HITLWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="What's your name? ")

    @step
    async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(result=ev.response)
Source code in workflows/events.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
class InputRequiredEvent(Event):
    """Emitted when human input is required to proceed.

    Automatically written to the event stream if returned from a step.

    If returned from a step, it does not need to be consumed by other steps and will pass validation.
    It's expected that the caller will respond to this event and send back a [HumanResponseEvent][workflows.events.HumanResponseEvent].

    Use this directly or subclass it.

    Typical flow: a step returns `InputRequiredEvent`, callers consume it from
    the stream and send back a [HumanResponseEvent][workflows.events.HumanResponseEvent].

    Examples:
        ```python
        from workflows.events import InputRequiredEvent, HumanResponseEvent

        class HITLWorkflow(Workflow):
            @step
            async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
                return InputRequiredEvent(prefix="What's your name? ")

            @step
            async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
                return StopEvent(result=ev.response)
        ```
    """

HumanResponseEvent #

Bases: Event

Carries a human's response for a prior input request.

If consumed by a step and not returned by another, it will still pass validation.

Examples:

from workflows.events import InputRequiredEvent, HumanResponseEvent

class HITLWorkflow(Workflow):
    @step
    async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
        return InputRequiredEvent(prefix="What's your name? ")

    @step
    async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
        return StopEvent(result=ev.response)
Source code in workflows/events.py
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class HumanResponseEvent(Event):
    """Carries a human's response for a prior input request.

    If consumed by a step and not returned by another, it will still pass validation.

    Examples:
        ```python
        from workflows.events import InputRequiredEvent, HumanResponseEvent

        class HITLWorkflow(Workflow):
            @step
            async def my_step(self, ev: StartEvent) -> InputRequiredEvent:
                return InputRequiredEvent(prefix="What's your name? ")

            @step
            async def my_step(self, ev: HumanResponseEvent) -> StopEvent:
                return StopEvent(result=ev.response)
        ```
    """

StartEvent #

Bases: Event

Implicit entry event sent to kick off a Workflow.run().

Source code in workflows/events.py
257
258
class StartEvent(Event):
    """Implicit entry event sent to kick off a `Workflow.run()`."""

StopEvent #

Bases: Event

Terminal event that signals the workflow has completed.

The result property contains the return value of the workflow run. When a custom stop event subclass is used, the workflow result is that event instance itself.

Examples:

# default stop event: result holds the value
return StopEvent(result={"answer": 42})

Subclassing to provide a custom result:

```python class MyStopEv(StopEvent): pass

@step async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv: return MyStopEv(result={"answer": 42})

Source code in workflows/events.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class StopEvent(Event):
    """Terminal event that signals the workflow has completed.

    The `result` property contains the return value of the workflow run. When a
    custom stop event subclass is used, the workflow result is that event
    instance itself.

    Examples:
        ```python
        # default stop event: result holds the value
        return StopEvent(result={"answer": 42})
        ```

        Subclassing to provide a custom result:

        ```python
        class MyStopEv(StopEvent):
            pass

        @step
        async def my_step(self, ctx: Context, ev: StartEvent) -> MyStopEv:
            return MyStopEv(result={"answer": 42})
    """

    _result: Any = PrivateAttr(default=None)

    def __init__(self, result: Any = None, **kwargs: Any) -> None:
        # forces the user to provide a result
        super().__init__(_result=result, **kwargs)

    def _get_result(self) -> Any:
        """This can be overridden by subclasses to return the desired result."""
        return self._result

    @property
    def result(self) -> Any:
        return self._get_result()

    @model_serializer(mode="wrap")
    def custom_model_dump(self, handler: Any) -> dict[str, Any]:
        data = handler(self)
        # include _result in serialization for base StopEvent
        if self._result is not None:
            data["result"] = self._result
        return data

    def __repr__(self) -> str:
        dict_items = {**self._data, **self.model_dump()}
        # Format as key=value pairs
        parts = [f"{k}={v!r}" for k, v in dict_items.items()]
        dict_str = ", ".join(parts)
        return f"{self.__class__.__name__}({dict_str})"

    def __str__(self) -> str:
        return str(self._result)

WorkflowTimedOutEvent #

Bases: StopEvent

Published when a workflow exceeds its configured timeout.

This event is published to the event stream when a workflow times out, allowing consumers to understand why the workflow ended before the WorkflowTimeoutError exception is raised.

Attributes:

Name Type Description

Examples:

async for event in handler.stream_events():
    if isinstance(event, WorkflowTimedOutEvent):
        print(f"Workflow timed out after {event.timeout}s")
        print(f"Active steps: {event.active_steps}")

Parameters:

Name Type Description Default
timeout float
required
active_steps list[str]
required
Source code in workflows/events.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class WorkflowTimedOutEvent(StopEvent):
    """Published when a workflow exceeds its configured timeout.

    This event is published to the event stream when a workflow times out,
    allowing consumers to understand why the workflow ended before the
    WorkflowTimeoutError exception is raised.

    Attributes:
        timeout: The timeout duration in seconds that was exceeded.
        active_steps: List of step names that were still active when the timeout occurred.

    Examples:
        ```python
        async for event in handler.stream_events():
            if isinstance(event, WorkflowTimedOutEvent):
                print(f"Workflow timed out after {event.timeout}s")
                print(f"Active steps: {event.active_steps}")
        ```
    """

    timeout: float
    active_steps: list[str]

WorkflowCancelledEvent #

Bases: StopEvent

Published when a workflow is cancelled by the user.

This event is published to the event stream when a workflow is cancelled via the handler or programmatically, allowing consumers to understand why the workflow ended before the WorkflowCancelledByUser exception is raised.

Examples:

async for event in handler.stream_events():
    if isinstance(event, WorkflowCancelledEvent):
        print("Workflow was cancelled by user")
Source code in workflows/events.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
class WorkflowCancelledEvent(StopEvent):
    """Published when a workflow is cancelled by the user.

    This event is published to the event stream when a workflow is cancelled
    via the handler or programmatically, allowing consumers to understand why
    the workflow ended before the WorkflowCancelledByUser exception is raised.

    Examples:
        ```python
        async for event in handler.stream_events():
            if isinstance(event, WorkflowCancelledEvent):
                print("Workflow was cancelled by user")
        ```
    """

WorkflowFailedEvent #

Bases: StopEvent

Published when a workflow step fails permanently.

Published when a step fails and all retries are exhausted (or no retry policy permits a retry, or a catch_error handler itself raised).

Attributes:

Name Type Description

Examples:

async for event in handler.stream_events():
    if isinstance(event, WorkflowFailedEvent):
        print(f"Step '{event.step_name}' failed after {event.attempts} attempts")
        print(f"{type(event.exception).__name__}: {event.exception}")

Parameters:

Name Type Description Default
step_name str
required
exception Exception
required
attempts int
required
elapsed_seconds float
required
Source code in workflows/events.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
class WorkflowFailedEvent(StopEvent):
    """Published when a workflow step fails permanently.

    Published when a step fails and all retries are exhausted (or no retry
    policy permits a retry, or a catch_error handler itself raised).

    Attributes:
        step_name: The name of the step that failed.
        exception: The raised exception. ``__traceback__`` is present only
            in-process; ``None`` after a replay.
        attempts: The total number of attempts made before giving up.
        elapsed_seconds: Time in seconds from first attempt to final failure.

    Examples:
        ```python
        async for event in handler.stream_events():
            if isinstance(event, WorkflowFailedEvent):
                print(f"Step '{event.step_name}' failed after {event.attempts} attempts")
                print(f"{type(event.exception).__name__}: {event.exception}")
        ```
    """

    step_name: str
    exception: SerializableException
    attempts: int
    elapsed_seconds: float

StepFailedEvent #

Bases: Event

Delivered to a @catch_error handler when a step exhausts its retries.

The handler may inspect the fields to decide how to recover. Returning a StopEvent completes the workflow successfully; raising from the handler propagates the new exception and fails the workflow.

Attributes:

Name Type Description

Parameters:

Name Type Description Default
step_name str
required
input_event Event
required
exception Exception
required
attempts int
required
elapsed_seconds float
required
failed_at datetime
required
Source code in workflows/events.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
class StepFailedEvent(Event):
    """Delivered to a `@catch_error` handler when a step exhausts its retries.

    The handler may inspect the fields to decide how to recover. Returning a
    `StopEvent` completes the workflow successfully; raising from the handler
    propagates the new exception and fails the workflow.

    Attributes:
        step_name: The name of the step that failed.
        input_event: The triggering event instance that caused the failure.
        exception: The raised exception. ``__traceback__`` is present in-process
            but ``None`` after the event has crossed a serialization boundary
            (e.g., a replay).
        attempts: Total number of attempts made before giving up.
        elapsed_seconds: Seconds from first attempt to final failure.
        failed_at: Timezone-aware UTC datetime of the final failure.
    """

    step_name: str
    input_event: SerializableEvent
    exception: SerializableException
    attempts: int
    elapsed_seconds: float
    failed_at: datetime