Skip to content

Handler

WorkflowHandler #

Bases: Future[RunResultT]

Source code in workflows/handler.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class WorkflowHandler(asyncio.Future[RunResultT]):
    def __init__(
        self,
        *args: Any,
        ctx: Context | None = None,
        run_id: str | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.run_id = run_id
        self._ctx = ctx

    @property
    def ctx(self) -> Context | None:
        return self._ctx

    def __str__(self) -> str:
        return str(self.result())

    def is_done(self) -> bool:
        return self.done()

    async def stream_events(self) -> AsyncGenerator[Event, None]:
        if self.ctx is None:
            raise ValueError("Context is not set!")

        while True:
            ev = await self.ctx.streaming_queue.get()

            yield ev

            if isinstance(ev, StopEvent):
                break

    async def run_step(self) -> list[Event] | None:
        """
        Runs the next workflow step and returns the output Event.

        If return is None, then the workflow is considered done.

        Examples:
            ```python
            handler = workflow.run(stepwise=True)
            while not handler.is_done():
                ev = await handler.run_step()
                handler.ctx.send_event(ev)

            result = handler.result()
            print(result)
            ```

        """
        # since event is sent before calling this method, we need to unblock the event loop
        await asyncio.sleep(0)

        if self.ctx is None:
            raise ValueError("Context must be set to run a workflow step-wise!")

        if not self.ctx.stepwise:
            raise ValueError(
                "Workflow must be created passing stepwise=True to call this method."
            )

        try:
            # Reset the events collected in current step
            self.ctx._step_events_holding = None

            # Unblock all pending steps
            for flag in self.ctx._step_flags.values():
                flag.set()

            # Yield back control to the event loop to give an unblocked step
            # the chance to run (we won't actually sleep here).
            await asyncio.sleep(0)

            # check if we're done, or if a step raised error
            we_done = False
            exception_raised = None
            retval = None
            for t in self.ctx._tasks:
                # Check if we're done
                if not t.done():
                    continue

                we_done = True
                e = t.exception()
                if type(e) is not WorkflowDone:
                    exception_raised = e

            if we_done:
                await self.ctx.shutdown()

                if exception_raised:
                    raise exception_raised

                if not self.done():
                    self.set_result(self.ctx.get_result())
            else:
                # Continue with running next step. Make sure we wait for the
                # step function to return before proceeding.
                in_progress = len(await self.ctx.running_steps())
                while in_progress:
                    await asyncio.sleep(BUSY_WAIT_DELAY)
                    in_progress = len(await self.ctx.running_steps())

                # notify unblocked task that we're ready to accept next event
                async with self.ctx._step_condition:
                    self.ctx._step_condition.notify()

                # Wait to be notified that the new_ev has been written
                async with self.ctx._step_event_written:
                    await self.ctx._step_event_written.wait()
                    retval = self.ctx.get_holding_events()
        except Exception as e:
            if not self.is_done():  # Avoid InvalidStateError edge case
                self.set_exception(e)
            raise

        return retval

    async def cancel_run(self) -> None:
        """Method to cancel a Workflow execution."""
        if self.ctx:
            self.ctx._cancel_flag.set()
            await asyncio.sleep(0)

run_step async #

run_step() -> list[Event] | None

Runs the next workflow step and returns the output Event.

If return is None, then the workflow is considered done.

Examples:

handler = workflow.run(stepwise=True)
while not handler.is_done():
    ev = await handler.run_step()
    handler.ctx.send_event(ev)

result = handler.result()
print(result)
Source code in workflows/handler.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
async def run_step(self) -> list[Event] | None:
    """
    Runs the next workflow step and returns the output Event.

    If return is None, then the workflow is considered done.

    Examples:
        ```python
        handler = workflow.run(stepwise=True)
        while not handler.is_done():
            ev = await handler.run_step()
            handler.ctx.send_event(ev)

        result = handler.result()
        print(result)
        ```

    """
    # since event is sent before calling this method, we need to unblock the event loop
    await asyncio.sleep(0)

    if self.ctx is None:
        raise ValueError("Context must be set to run a workflow step-wise!")

    if not self.ctx.stepwise:
        raise ValueError(
            "Workflow must be created passing stepwise=True to call this method."
        )

    try:
        # Reset the events collected in current step
        self.ctx._step_events_holding = None

        # Unblock all pending steps
        for flag in self.ctx._step_flags.values():
            flag.set()

        # Yield back control to the event loop to give an unblocked step
        # the chance to run (we won't actually sleep here).
        await asyncio.sleep(0)

        # check if we're done, or if a step raised error
        we_done = False
        exception_raised = None
        retval = None
        for t in self.ctx._tasks:
            # Check if we're done
            if not t.done():
                continue

            we_done = True
            e = t.exception()
            if type(e) is not WorkflowDone:
                exception_raised = e

        if we_done:
            await self.ctx.shutdown()

            if exception_raised:
                raise exception_raised

            if not self.done():
                self.set_result(self.ctx.get_result())
        else:
            # Continue with running next step. Make sure we wait for the
            # step function to return before proceeding.
            in_progress = len(await self.ctx.running_steps())
            while in_progress:
                await asyncio.sleep(BUSY_WAIT_DELAY)
                in_progress = len(await self.ctx.running_steps())

            # notify unblocked task that we're ready to accept next event
            async with self.ctx._step_condition:
                self.ctx._step_condition.notify()

            # Wait to be notified that the new_ev has been written
            async with self.ctx._step_event_written:
                await self.ctx._step_event_written.wait()
                retval = self.ctx.get_holding_events()
    except Exception as e:
        if not self.is_done():  # Avoid InvalidStateError edge case
            self.set_exception(e)
        raise

    return retval

cancel_run async #

cancel_run() -> None

Method to cancel a Workflow execution.

Source code in workflows/handler.py
136
137
138
139
140
async def cancel_run(self) -> None:
    """Method to cancel a Workflow execution."""
    if self.ctx:
        self.ctx._cancel_flag.set()
        await asyncio.sleep(0)