Skip to content

Decorators

step #

step(func: Callable[P, R]) -> StepFunction[P, R]
step(*, workflow: type['Workflow'] | None = None, num_workers: int = 4, retry_policy: RetryPolicy | None = None, skip_graph_checks: list[StepGraphCheck] | None = None) -> Callable[[Callable[P, R]], StepFunction[P, R]]
step(func: Callable[P, R] | None = None, *, workflow: type['Workflow'] | None = None, num_workers: int = 4, retry_policy: RetryPolicy | None = None, skip_graph_checks: list[StepGraphCheck] | None = None) -> Callable[[Callable[P, R]], StepFunction[P, R]] | StepFunction[P, R]

Decorate a callable to declare it as a workflow step.

The decorator inspects the function signature to infer the accepted event type, return event types, optional Context parameter (optionally with a typed state model), and any resource injections via typing.Annotated.

When applied to free functions, provide the workflow class via workflow=MyWorkflow. For instance methods, the association is automatic.

Parameters:

Name Type Description Default
workflow type[Workflow] | None

Workflow class to attach the free function step to. Not required for methods.

None
num_workers int

Number of workers for this step. Defaults to 4.

4
retry_policy RetryPolicy | None

Optional retry policy for failures.

None
skip_graph_checks list[str] | None

Graph validation checks to skip for this step. Currently supports "reachability" to allow intentionally unreachable steps.

None

Returns:

Name Type Description
Callable Callable[[Callable[P, R]], StepFunction[P, R]] | StepFunction[P, R]

The original function, annotated with internal step metadata.

Raises:

Type Description
WorkflowValidationError

If signature validation fails or when decorating a free function without specifying workflow.

Examples:

Method step:

class MyFlow(Workflow):
    @step
    async def start(self, ev: StartEvent) -> StopEvent:
        return StopEvent(result="done")

Free function step:

class MyWorkflow(Workflow):
    pass

@step(workflow=MyWorkflow)
async def generate(ev: StartEvent) -> NextEvent: ...
Source code in workflows/decorators.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def step(
    func: Callable[P, R] | None = None,
    *,
    workflow: type["Workflow"] | None = None,
    num_workers: int = 4,
    retry_policy: RetryPolicy | None = None,
    skip_graph_checks: list[StepGraphCheck] | None = None,
) -> Callable[[Callable[P, R]], StepFunction[P, R]] | StepFunction[P, R]:
    """
    Decorate a callable to declare it as a workflow step.

    The decorator inspects the function signature to infer the accepted event
    type, return event types, optional `Context` parameter (optionally with a
    typed state model), and any resource injections via `typing.Annotated`.

    When applied to free functions, provide the workflow class via
    `workflow=MyWorkflow`. For instance methods, the association is automatic.

    Args:
        workflow (type[Workflow] | None): Workflow class to attach the free
            function step to. Not required for methods.
        num_workers (int): Number of workers for this step. Defaults to 4.
        retry_policy (RetryPolicy | None): Optional retry policy for failures.
        skip_graph_checks (list[str] | None): Graph validation checks to skip
            for this step. Currently supports ``"reachability"`` to allow
            intentionally unreachable steps.

    Returns:
        Callable: The original function, annotated with internal step metadata.

    Raises:
        WorkflowValidationError: If signature validation fails or when decorating
            a free function without specifying `workflow`.

    Examples:
        Method step:

        ```python
        class MyFlow(Workflow):
            @step
            async def start(self, ev: StartEvent) -> StopEvent:
                return StopEvent(result="done")
        ```

        Free function step:

        ```python
        class MyWorkflow(Workflow):
            pass

        @step(workflow=MyWorkflow)
        async def generate(ev: StartEvent) -> NextEvent: ...
        ```
    """

    def decorator(func: Callable[P, R]) -> StepFunction[P, R]:
        localns = _capture_decorator_localns()
        return _apply_step_decorator(
            func,
            num_workers=num_workers,
            retry_policy=retry_policy,
            workflow=workflow,
            localns=localns,
            skip_graph_checks=skip_graph_checks or [],
        )

    if func is not None:
        # The decorator was used without parentheses, like `@step`
        localns = _capture_callsite_localns()
        return _apply_step_decorator(
            func,
            num_workers=num_workers,
            retry_policy=retry_policy,
            workflow=workflow,
            localns=localns,
            skip_graph_checks=skip_graph_checks or [],
        )
    return decorator

catch_error #

catch_error(func: Callable[P, R]) -> StepFunction[P, R]
catch_error(*, for_steps: list[str] | None = None, max_recoveries: int = 1) -> Callable[[Callable[P, R]], StepFunction[P, R]]
catch_error(func: Callable[P, R] | None = None, *, for_steps: list[str] | None = None, max_recoveries: int = 1) -> Callable[[Callable[P, R]], StepFunction[P, R]] | StepFunction[P, R]

Mark a method as a handler for steps that exhaust their retries.

Handlers can be scoped to specific steps via for_steps, or left as wildcards (default) to cover any step not claimed by a scoped handler. Each handler has a per-lineage recovery budget (max_recoveries): when the budget is exceeded the workflow fails instead of re-entering the handler.

A handler may return any event type — the graph validator checks that the handler's sub-graph eventually terminates at a StopEvent.

Parameters:

Name Type Description Default
for_steps list[str] | None

Step names this handler covers. None means wildcard.

None
max_recoveries int

How many times this handler may be invoked per lineage before the workflow fails. Must be >= 1. Defaults to 1.

1

Examples:

from workflows import Workflow, catch_error, step, Context
from workflows.events import StartEvent, StepFailedEvent, StopEvent

class MyFlow(Workflow):
    @step(retry_policy=...)
    async def fetch(self, ev: StartEvent) -> FetchedEvent: ...

    @catch_error(for_steps=["fetch"], max_recoveries=2)
    async def handle_fetch(self, ctx: Context, ev: StepFailedEvent) -> FallbackEvent:
        return FallbackEvent(...)

    @catch_error  # wildcard; covers any step not owned by a scoped handler
    async def handle_default(self, ctx: Context, ev: StepFailedEvent) -> StopEvent:
        return StopEvent(result={"failed": ev.step_name})
Source code in workflows/decorators.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
def catch_error(
    func: Callable[P, R] | None = None,
    *,
    for_steps: list[str] | None = None,
    max_recoveries: int = 1,
) -> Callable[[Callable[P, R]], StepFunction[P, R]] | StepFunction[P, R]:
    """Mark a method as a handler for steps that exhaust their retries.

    Handlers can be scoped to specific steps via `for_steps`, or left as
    wildcards (default) to cover any step not claimed by a scoped handler.
    Each handler has a per-lineage recovery budget (`max_recoveries`): when the
    budget is exceeded the workflow fails instead of re-entering the handler.

    A handler may return any event type — the graph validator checks that the
    handler's sub-graph eventually terminates at a `StopEvent`.

    Args:
        for_steps: Step names this handler covers. `None` means wildcard.
        max_recoveries: How many times this handler may be invoked per lineage
            before the workflow fails. Must be >= 1. Defaults to 1.

    Examples:
        ```python
        from workflows import Workflow, catch_error, step, Context
        from workflows.events import StartEvent, StepFailedEvent, StopEvent

        class MyFlow(Workflow):
            @step(retry_policy=...)
            async def fetch(self, ev: StartEvent) -> FetchedEvent: ...

            @catch_error(for_steps=["fetch"], max_recoveries=2)
            async def handle_fetch(self, ctx: Context, ev: StepFailedEvent) -> FallbackEvent:
                return FallbackEvent(...)

            @catch_error  # wildcard; covers any step not owned by a scoped handler
            async def handle_default(self, ctx: Context, ev: StepFailedEvent) -> StopEvent:
                return StopEvent(result={"failed": ev.step_name})
        ```
    """

    if not isinstance(max_recoveries, int) or max_recoveries < 1:
        raise WorkflowValidationError(
            "@catch_error max_recoveries must be an integer >= 1"
        )
    if for_steps is not None:
        if not isinstance(for_steps, list) or not all(
            isinstance(s, str) for s in for_steps
        ):
            raise WorkflowValidationError(
                "@catch_error for_steps must be None or a list of step name strings"
            )

    def _apply(inner: Callable[P, R], localns: dict[str, Any]) -> StepFunction[P, R]:
        step_fn = make_step_function(
            inner,
            num_workers=1,
            retry_policy=None,
            localns=localns,
        )
        accepted = step_fn._step_config.accepted_events
        if len(accepted) != 1 or accepted[0] is not StepFailedEvent:
            name = getattr(inner, "__name__", repr(inner))
            raise WorkflowValidationError(
                f"@catch_error handler '{name}' must accept StepFailedEvent "
                f"as its event parameter."
            )
        step_fn._step_config.role = "catch_error"
        step_fn._step_config.catch_error_for_steps = (
            list(for_steps) if for_steps is not None else None
        )
        step_fn._step_config.catch_error_max_recoveries = max_recoveries
        return step_fn

    if func is not None:
        # bare usage: `@catch_error`
        return _apply(func, _capture_callsite_localns())

    def decorator(inner: Callable[P, R]) -> StepFunction[P, R]:
        return _apply(inner, _capture_decorator_localns())

    return decorator