Skip to content

Resources

Resources are external dependencies such as memory, LLMs, query engines or chat history instances that will be injected into workflow steps at runtime.

Resources are a powerful way of binding workflow steps to Python objects that we otherwise would need to create by hand every time. For performance reasons, by default resources are cached for a workflow, meaning the same resource instance is passed to every step where it’s injected. It’s important to master this concept because cached and non-cached resources can lead to unexpected behaviour, let’s see it in detail.

First of all, to use resources within our code, we need to import Resource from the resource submodule:

from llama_index.core.workflow.resource import Resource
from llama_index.core.workflow import (
Event,
step,
StartEvent,
StopEvent,
Workflow,
)

Resource wraps a function or callable that must return an object of the same type as the one in the resource definition, let’s see an example:

from typing import Annotated
from llama_index.core.memory import Memory
def get_memory(*args, **kwargs) -> Memory:
return Memory.from_defaults("user_id_123", token_limit=60000)
resource = Annotated[Memory, Resource(get_memory)]

In the example above, Annotated[Memory, Resource(get_memory) defines a resource of type Memory that will be provided at runtime by the get_memory() function. A resource defined like this can be injected into a step by passing it as a method parameter:

import random
from typing import Union
from llama_index.core.llms import ChatMessage
RANDOM_MESSAGES = [
"Hello World!",
"Python is awesome!",
"Resources are great!",
]
class CustomStartEvent(StartEvent):
message: str
class SecondEvent(Event):
message: str
class ThirdEvent(Event):
message: str
class WorkflowWithMemory(Workflow):
@step
async def first_step(
self,
ev: CustomStartEvent,
memory: Annotated[Memory, Resource(get_memory)],
) -> SecondEvent:
await memory.aput(
ChatMessage.from_str(
role="user", content="First step: " + ev.message
)
)
return SecondEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
@step
async def second_step(
self, ev: SecondEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> Union[ThirdEvent, StopEvent]:
await memory.aput(
ChatMessage(role="assistant", content="Second step: " + ev.message)
)
if random.randint(0, 1) == 0:
return ThirdEvent(message=RANDOM_MESSAGES[random.randint(0, 2)])
else:
messages = await memory.aget_all()
return StopEvent(result=messages)
@step
async def third_step(
self, ev: ThirdEvent, memory: Annotated[Memory, Resource(get_memory)]
) -> StopEvent:
await memory.aput(
ChatMessage(role="user", content="Third step: " + ev.message)
)
messages = await memory.aget_all()
return StopEvent(result=messages)

As you can see, each step has access to the memory resource and can write to it. It’s important to note that get_memory() will be called only once, and the same memory instance will be injected into the different steps. We can see this is the case by running the workflow:

wf = WorkflowWithMemory(disable_validation=True)
async def main():
messages = await wf.run(
start_event=CustomStartEvent(message="Happy birthday!")
)
for m in messages:
print(m.blocks[0].text)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

A potential result for this might be:

First step: Happy birthday!
Second step: Python is awesome!
Third step: Hello World!

This shows that each step added its message to a global memory, which is exactly what we were expecting!

Note that resources are preserved across steps of the same workflow instance, but not across different workflows. If we were to run two WorkflowWithMemory instances, get_memory would be called one time for each workflow and as a result their memories would be separate and independent:

wf1 = WorkflowWithMemory(disable_validation=True)
wf2 = WorkflowWithMemory(disable_validation=True)
async def main():
messages1 = await wf1.run(
start_event=CustomStartEvent(message="Happy birthday!")
)
messages2 = await wf1.run(
start_event=CustomStartEvent(message="Happy New Year!")
)
for m in messages1:
print(m.blocks[0].text)
print("===================")
for m in messages2:
print(m.blocks[0].text)
if __name__ == "__main__":
import asyncio
asyncio.run(main())

This is a possible output:

First step: Happy birthday!
Second step: Resources are great!
===================
First step: Happy New Year!
Second step: Python is awesome!

If we pass cache=False to Resource when defining a resource, the wrapped function is called every time the resource is injected into a step. This behaviour can be desirable at times, let’s see a simple example using a custom Counter class:

from pydantic import BaseModel, Field
class Counter(BaseModel):
counter: int = Field(description="A simple counter", default=0)
async def increment(self) -> None:
self.counter += 1
def get_counter() -> Counter:
return Counter()
class SecondEvent(Event):
count: int
class WorkflowWithCounter(Workflow):
@step
async def first_step(
self,
ev: StartEvent,
counter: Annotated[Counter, Resource(get_counter, cache=False)],
) -> SecondEvent:
await counter.increment()
return SecondEvent(count=counter.counter)
@step
async def second_step(
self,
ev: SecondEvent,
counter: Annotated[Counter, Resource(get_counter, cache=False)],
) -> StopEvent:
print("Counter at first step: ", ev.count)
await counter.increment()
print("Counter at second step: ", counter.counter)
return StopEvent(result="End of Workflow")

If we now run this workflow, we will get out:

Counter at first step: 1
Counter at second step: 1

A note about stateful and stateless resources

Section titled “A note about stateful and stateless resources”

As we have seen, cached resources are expected to be stateful, meaning that they can maintain their state across different workflow runs and different steps, unless otherwise specified. But this doesn’t mean we can consider a resource stateless only because we disable caching. Let’s see an example:

global_mem = Memory.from_defaults("global_id", token_limit=60000)
def get_memory(*args, **kwargs) -> Memory:
return global_mem

If we disable caching with Annotated[Memory, Resource(get_memory, cache=False)], the function get_memory is going to be called multiple times but the resource instance will be always the same. Such a resource should be considered stateful not regarding its caching behaviour.

Now that we’ve mastered resources, let’s take a look at observability and debugging in workflows.