Skip to content

Server

WorkflowServer #

WorkflowServer(*, middleware: list[Middleware] | None = None, exception_handlers: Mapping[Any, Any] | None = None, workflow_store: AbstractWorkflowStore | None = None, persistence_backoff: list[float] = [0.5, 3], runtime: Runtime | None = None, idle_timeout: float = 60.0)

HTTP server that exposes workflows as REST APIs.

Wraps one or more Workflow instances behind an HTTP API with endpoints for running workflows, streaming events, and sending human-in-the-loop input. Includes a built-in debugging UI served at the root path.

Example:

from workflows import Workflow, step
from workflows.events import StartEvent, StopEvent
from llama_agents.server import WorkflowServer

class GreetingWorkflow(Workflow):
    @step
    async def greet(self, ev: StartEvent) -> StopEvent:
        name = ev.get("name", "World")
        return StopEvent(result=f"Hello, {name}!")

server = WorkflowServer()
server.add_workflow("greet", GreetingWorkflow())

# Run with: python -m workflows.server my_server.py
# Or programmatically:
# await server.serve(host="0.0.0.0", port=8080)

The ASGI application is available as server.app for embedding in a larger application or mounting behind a reverse proxy.

Parameters:

Name Type Description Default
middleware list[Middleware] | None

Starlette middleware to apply to the ASGI app. Defaults to a permissive CORS configuration. Passing a custom list replaces the default entirely.

None
exception_handlers Mapping[Any, Any] | None

Starlette exception handlers mapping exception types to handler callables. Defaults to JSON error responses with logging. Passing a custom mapping replaces the default entirely.

None
workflow_store AbstractWorkflowStore | None

Persistence backend for handler state, events, and ticks. Defaults to MemoryWorkflowStore. Use SqliteWorkflowStore or PostgresWorkflowStore for durable persistence across restarts.

None
persistence_backoff list[float]

Retry delays (in seconds) when writing handler state to the store fails. Each entry is a sleep duration before the next attempt. Defaults to [0.5, 3] (two retries).

[0.5, 3]
runtime Runtime | None

Custom workflow runtime. When None (the default), the server builds a runtime stack that handles persistence and idle-release automatically. Only override this if you need a custom execution backend.

None
idle_timeout float

Seconds to wait after a workflow becomes idle before releasing it from memory. The workflow is automatically reloaded when new events arrive. Defaults to 60.0.

60.0
Source code in llama_agents/server/server.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __init__(
    self,
    *,
    middleware: list[Middleware] | None = None,
    exception_handlers: Mapping[Any, Any] | None = None,
    workflow_store: AbstractWorkflowStore | None = None,
    persistence_backoff: list[float] = [0.5, 3],
    runtime: Runtime | None = None,
    idle_timeout: float = 60.0,
):
    """Create a new workflow server.

    Args:
        middleware: Starlette middleware to apply to the ASGI app. Defaults
            to a permissive CORS configuration. Passing a custom list
            replaces the default entirely.
        exception_handlers: Starlette exception handlers mapping exception
            types to handler callables. Defaults to JSON error responses
            with logging. Passing a custom mapping replaces the default
            entirely.
        workflow_store: Persistence backend for handler state, events, and
            ticks. Defaults to ``MemoryWorkflowStore``. Use
            ``SqliteWorkflowStore`` or ``PostgresWorkflowStore`` for
            durable persistence across restarts.
        persistence_backoff: Retry delays (in seconds) when writing handler
            state to the store fails. Each entry is a sleep duration before
            the next attempt. Defaults to ``[0.5, 3]`` (two retries).
        runtime: Custom workflow runtime. When ``None`` (the default), the
            server builds a runtime stack that handles persistence and
            idle-release automatically. Only override this if you need a
            custom execution backend.
        idle_timeout: Seconds to wait after a workflow becomes idle before
            releasing it from memory. The workflow is automatically
            reloaded when new events arrive. Defaults to ``60.0``.
    """
    self._workflow_store = (
        workflow_store if workflow_store is not None else MemoryWorkflowStore()
    )
    inner: Runtime = (
        runtime
        if runtime is not None
        else IdleReleaseDecorator(
            PersistenceDecorator(basic_runtime, store=self._workflow_store),
            store=self._workflow_store,
            idle_timeout=idle_timeout,
        )
    )
    self._runtime: ServerRuntimeDecorator = ServerRuntimeDecorator(
        inner,
        store=self._workflow_store,
        persistence_backoff=list(persistence_backoff),
    )
    self._service = _WorkflowService(
        runtime=self._runtime, store=self._workflow_store
    )

    self._api = _WorkflowAPI(
        self._service,
        middleware=middleware,
        exception_handlers=dict(exception_handlers) if exception_handlers else None,
    )
    self.app = self._api.app

add_workflow #

add_workflow(name: str, workflow: Workflow, additional_events: list[type[Event]] | None = None) -> None

Register a workflow under the given name.

The workflow becomes available at /workflows/{name}/run and /workflows/{name}/run-nowait.

Parameters:

Name Type Description Default
name str

URL-safe name for the workflow.

required
workflow Workflow

The workflow instance to serve.

required
additional_events list[type[Event]] | None

Extra event types to expose in the debugger UI and Send Event functionality. Use this for events that aren't discoverable from step signatures alone (e.g. events consumed via ctx.wait_for_event()).

None
Source code in llama_agents/server/server.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def add_workflow(
    self,
    name: str,
    workflow: Workflow,
    additional_events: list[type[Event]] | None = None,
) -> None:
    """Register a workflow under the given name.

    The workflow becomes available at ``/workflows/{name}/run`` and
    ``/workflows/{name}/run-nowait``.

    Args:
        name: URL-safe name for the workflow.
        workflow: The workflow instance to serve.
        additional_events: Extra event types to expose in the debugger UI
            and ``Send Event`` functionality. Use this for events that
            aren't discoverable from step signatures alone (e.g. events
            consumed via ``ctx.wait_for_event()``).
    """
    workflow._switch_workflow_name(name)
    workflow._switch_runtime(self._runtime)

    if additional_events is not None:
        self._api.register_additional_events(name, additional_events)

get_workflows #

get_workflows() -> dict[str, Workflow]

Return registered workflows as a dict by name. Only available after start().

Source code in llama_agents/server/server.py
151
152
153
154
155
156
157
def get_workflows(self) -> dict[str, Workflow]:
    """Return registered workflows as a dict by name. Only available after start()."""
    return {
        n: wf
        for n in self._service.get_workflow_names()
        if (wf := self._service.get_workflow(n)) is not None
    }

start async #

start() -> WorkflowServer

Resumes previously running workflows, if they were not complete at last shutdown.

Idle workflows are not resumed - they remain released and will be loaded on-demand when events arrive for them.

Source code in llama_agents/server/server.py
163
164
165
166
167
168
169
170
async def start(self) -> WorkflowServer:
    """Resumes previously running workflows, if they were not complete at last shutdown.

    Idle workflows are not resumed - they remain released and will be
    loaded on-demand when events arrive for them.
    """
    await self._service.start()
    return self

contextmanager async #

contextmanager() -> AsyncGenerator[WorkflowServer, None]

Use this server as a context manager to start and stop it

Source code in llama_agents/server/server.py
172
173
174
175
176
177
178
179
@asynccontextmanager
async def contextmanager(self) -> AsyncGenerator[WorkflowServer, None]:
    """Use this server as a context manager to start and stop it"""
    await self.start()
    try:
        yield self
    finally:
        await self.stop()

stop async #

stop() -> None

Gracefully shut down all running workflow handlers.

Source code in llama_agents/server/server.py
181
182
183
async def stop(self) -> None:
    """Gracefully shut down all running workflow handlers."""
    await self._service.stop()

serve async #

serve(host: str = 'localhost', port: int = 80, uvicorn_config: dict[str, Any] | None = None) -> None

Start the HTTP server and block until shutdown.

Calls start() internally before serving.

Parameters:

Name Type Description Default
host str

Bind address. Defaults to "localhost".

'localhost'
port int

Bind port. Defaults to 80.

80
uvicorn_config dict[str, Any] | None

Additional keyword arguments forwarded to uvicorn.Config (e.g. root_path, log_level).

None
Source code in llama_agents/server/server.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
async def serve(
    self,
    host: str = "localhost",
    port: int = 80,
    uvicorn_config: dict[str, Any] | None = None,
) -> None:
    """Start the HTTP server and block until shutdown.

    Calls ``start()`` internally before serving.

    Args:
        host: Bind address. Defaults to ``"localhost"``.
        port: Bind port. Defaults to ``80``.
        uvicorn_config: Additional keyword arguments forwarded to
            ``uvicorn.Config`` (e.g. ``root_path``, ``log_level``).
    """
    uvicorn_config = uvicorn_config or {}

    config = uvicorn.Config(self.app, host=host, port=port, **uvicorn_config)
    server = uvicorn.Server(config)
    logger.info(
        f"Starting Workflow server at http://{host}:{port}{uvicorn_config.get('root_path', '/')}"
    )

    await server.serve()