Workflows cookbook: walking through all features of Workflows
First, we install our dependencies. Core contains most of what we need; OpenAI is to handle LLM access and utils-workflow provides the visualization capabilities we’ll use later on.
!pip install --upgrade llama-index-core llama-index-llms-openai llama-index-utils-workflow
Then we bring in the deps we just installed
from llama_index.core.workflow import ( Event, StartEvent, StopEvent, Workflow, step, Context,)import randomfrom llama_index.core.workflow import draw_all_possible_flowsfrom llama_index.utils.workflow import draw_most_recent_executionfrom llama_index.llms.openai import OpenAI
Set up our OpenAI key, so we can do actual LLM things.
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
Workflow basics
Section titled “Workflow basics”Let’s start with the basic possible workflow: it just starts, does one thing, and stops. There’s no reason to have a real workflow if your task is this simple, but we’re just demonstrating how they work.
from llama_index.llms.openai import OpenAI
class OpenAIGenerator(Workflow): @step async def generate(self, ev: StartEvent) -> StopEvent: llm = OpenAI(model="gpt-4o") response = await llm.acomplete(ev.query) return StopEvent(result=str(response))
w = OpenAIGenerator(timeout=10, verbose=False)result = await w.run(query="What's LlamaIndex?")print(result)
LlamaIndex, formerly known as GPT Index, is a data framework designed to facilitate the connection between large language models (LLMs) and external data sources. It provides tools to index various data types, such as documents, databases, and APIs, enabling LLMs to interact with and retrieve information from these sources more effectively. The framework supports the creation of indices that can be queried by LLMs, enhancing their ability to access and utilize external data in a structured manner. This capability is particularly useful for applications requiring the integration of LLMs with specific datasets or knowledge bases.
One of the neat things about Workflows is that we can use pyvis to visualize them. Let’s see what that looks like for this very simple flow.
draw_all_possible_flows(OpenAIGenerator, filename="trivial_workflow.html")
Not a lot to see here, yet! The start event goes to generate() and then straight to StopEvent.
Loops and branches
Section titled “Loops and branches”Let’s go to a more interesting example, demonstrating our ability to loop:
class FailedEvent(Event): error: str
class QueryEvent(Event): query: str
class LoopExampleFlow(Workflow): @step async def answer_query( self, ev: StartEvent | QueryEvent ) -> FailedEvent | StopEvent: query = ev.query # try to answer the query random_number = random.randint(0, 1) if random_number == 0: return FailedEvent(error="Failed to answer the query.") else: return StopEvent(result="The answer to your query")
@step async def improve_query(self, ev: FailedEvent) -> QueryEvent | StopEvent: # improve the query or decide it can't be fixed random_number = random.randint(0, 1) if random_number == 0: return QueryEvent(query="Here's a better query.") else: return StopEvent(result="Your query can't be fixed.")
We’re using random numbers to simulate LLM actions here so that we can get reliably interesting behavior.
answer_query() accepts a start event. It can then do 2 things:
- it can answer the query and emit a StopEvent, which returns the result
- it can decide the query was bad and emit a FailedEvent
improve_query() accepts a FailedEvent. It can also do 2 things:
- it can decide the query can’t be improved and emit a StopEvent, which returns failure
- it can present a better query and emit a QueryEvent, which creates a loop back to answer_query()
We can also visualize this more complicated workflow:
draw_all_possible_flows(LoopExampleFlow, filename="loop_workflow.html")
loop_workflow.html
We’ve set verbose=True
here so we can see exactly what events were triggered. You can see it conveniently demonstrates looping and then answering.
l = LoopExampleFlow(timeout=10, verbose=True)result = await l.run(query="What's LlamaIndex?")print(result)
Running step answer_queryStep answer_query produced event FailedEventRunning step improve_queryStep improve_query produced event StopEventYour query can't be fixed.
Maintaining state between events
Section titled “Maintaining state between events”There is a global state which allows you to keep arbitrary data or functions around for use by all event handlers.
class GlobalExampleFlow(Workflow): @step async def setup(self, ctx: Context, ev: StartEvent) -> QueryEvent: # load our data here await ctx.store.set("some_database", ["value1", "value2", "value3"])
return QueryEvent(query=ev.query)
@step async def query(self, ctx: Context, ev: QueryEvent) -> StopEvent: # use our data with our query data = await ctx.store.get("some_database")
result = f"The answer to your query is {data[1]}" return StopEvent(result=result)
g = GlobalExampleFlow(timeout=10, verbose=True)result = await g.run(query="What's LlamaIndex?")print(result)
Running step setupStep setup produced event QueryEventRunning step queryStep query produced event StopEventThe answer to your query is value2
Of course, this flow is essentially still linear. A more realistic example would be if your start event could either be a query or a data population event, and you needed to wait. Let’s set that up to see what it looks like:
class WaitExampleFlow(Workflow): @step async def setup(self, ctx: Context, ev: StartEvent) -> StopEvent: if hasattr(ev, "data"): await ctx.store.set("data", ev.data)
return StopEvent(result=None)
@step async def query(self, ctx: Context, ev: StartEvent) -> StopEvent: if hasattr(ev, "query"): # do we have any data? if hasattr(self, "data"): data = await ctx.store.get("data") return StopEvent(result=f"Got the data {data}") else: # there's non data yet return None else: # this isn't a query return None
w = WaitExampleFlow(verbose=True)result = await w.run(query="Can I kick it?")if result is None: print("No you can't")print("---")result = await w.run(data="Yes you can")print("---")result = await w.run(query="Can I kick it?")print(result)
Running step queryStep query produced no eventRunning step setupStep setup produced event StopEventNo you can't---Running step queryStep query produced no eventRunning step setupStep setup produced event StopEvent---Running step queryStep query produced event StopEventRunning step setupStep setup produced event StopEventGot the data Yes you can
Let’s visualize how this flow works:
draw_all_possible_flows(WaitExampleFlow, filename="wait_workflow.html")
wait_workflow.html
Waiting for one or more events
Section titled “Waiting for one or more events”Because waiting for events is such a common pattern, the context object has a convenience function, collect_events()
. It will capture events and store them, returning None
until all the events it requires have been collected. Those events will be attached to the output of collect_events
in the order that they were specified. Let’s see this in action:
class InputEvent(Event): input: str
class SetupEvent(Event): error: bool
class QueryEvent(Event): query: str
class CollectExampleFlow(Workflow): @step async def setup(self, ctx: Context, ev: StartEvent) -> SetupEvent: # generically start everything up if not hasattr(self, "setup") or not self.setup: self.setup = True print("I got set up") return SetupEvent(error=False)
@step async def collect_input(self, ev: StartEvent) -> InputEvent: if hasattr(ev, "input"): # perhaps validate the input print("I got some input") return InputEvent(input=ev.input)
@step async def parse_query(self, ev: StartEvent) -> QueryEvent: if hasattr(ev, "query"): # parse the query in some way print("I got a query") return QueryEvent(query=ev.query)
@step async def run_query( self, ctx: Context, ev: InputEvent | SetupEvent | QueryEvent ) -> StopEvent | None: ready = ctx.collect_events(ev, [QueryEvent, InputEvent, SetupEvent]) if ready is None: print("Not enough events yet") return None
# run the query print("Now I have all the events") print(ready)
result = f"Ran query '{ready[0].query}' on input '{ready[1].input}'" return StopEvent(result=result)
c = CollectExampleFlow()result = await c.run(input="Here's some input", query="Here's my question")print(result)
I got some inputI got a queryNot enough events yetNot enough events yetNow I have all the events[QueryEvent(query="Here's my question"), InputEvent(input="Here's some input"), SetupEvent(error=False)]Ran query 'Here's my question' on input 'Here's some input'
You can see each of the events getting triggered as well as the collection event repeatedly returning None
until enough events have arrived. Let’s see what this looks like in a flow diagram:
draw_all_possible_flows(CollectExampleFlow, "collect_workflow.html")
collect_workflow.html
This concludes our tour of creating, running and visualizing workflows! Check out the docs and examples to learn more.