Multi-Agent Workflow with Weaviate QueryAgent
In this example, we will be building a LlamaIndex Agent Workflow that ends up being a multi-agent system that aims to be a Docs Assistant capable of:
- Writing new content to a “LlamaIndexDocs” collection in Weaviate
- Writing new content to a “WeaviateDocs” collection in Weaviate
- Using the Weaviate
QueryAgent
to answer questions based on the contents of these collections.
The QueryAgent
is a full agent prodcut by Weaviate, that is capable of doing regular search, as well as aggregations over the collections you give it access to. Our ‘orchestrator’ agent will decide when to invoke the Weaviate QueryAgent, leaving the job of creating Weaviate specific search queries to it.
Things you will need:
- An OpenAI API key (or switch to another provider and adjust the code below)
- A Weaviate sandbox (this is free)
- Your Weaviate sandbox URL and API key
Install & Import Dependencies
Section titled “Install & Import Dependencies”!pip install llama-index-core llama-index-utils-workflow weaviate-client[agents] llama-index-llms-openai llama-index-readers-web
from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, Context,)from llama_index.utils.workflow import draw_all_possible_flowsfrom llama_index.readers.web import SimpleWebPageReaderfrom llama_index.core.llms import ChatMessagefrom llama_index.core.tools import FunctionToolfrom llama_index.llms.openai import OpenAIfrom llama_index.core.agent.workflow import FunctionAgent
from enum import Enumfrom pydantic import BaseModel, Fieldfrom llama_index.llms.openai import OpenAIfrom typing import List, Unionimport json
import weaviatefrom weaviate.auth import Authfrom weaviate.agents.query import QueryAgentfrom weaviate.classes.config import Configure, Property, DataType
import osfrom getpass import getpass
Set up Weaviate
Section titled “Set up Weaviate”To use the Weaviate Query Agent, first, create a Weaviate Cloud account👇
- Create Serverless Weaviate Cloud account and set up a free Sandbox
- Go to ‘Embedding’ and enable it, by default, this will make it so that we use
Snowflake/snowflake-arctic-embed-l-v2.0
as the embedding model - Take note of the
WEAVIATE_URL
andWEAVIATE_API_KEY
to connect to your cluster below
Info: We recommend using Weaviate Embeddings so you do not have to provide any extra keys for external embedding providers.
if "WEAVIATE_API_KEY" not in os.environ: os.environ["WEAVIATE_API_KEY"] = getpass("Add Weaviate API Key")if "WEAVIATE_URL" not in os.environ: os.environ["WEAVIATE_URL"] = getpass("Add Weaviate URL")
client = weaviate.connect_to_weaviate_cloud( cluster_url=os.environ.get("WEAVIATE_URL"), auth_credentials=Auth.api_key(os.environ.get("WEAVIATE_API_KEY")),)
Create WeaviateDocs and LlamaIndexDocs Collections
Section titled “Create WeaviateDocs and LlamaIndexDocs Collections”The helper function below will create a “WeaviateDocs” and “LlamaIndexDocs” collection in Weaviate (if they don’t exist already). It will also set up a QueryAgent
that has access to both of these collections.
The Weaviate QueryAgent
is designed to be able to query Weviate Collections for both regular search and aggregations, and also handles the burden of creating the Weaviate specific queries internally.
The Agent will use the collection descriptions, as well as the property descriptions while formilating the queries.
def fresh_setup_weaviate(client): if client.collections.exists("WeaviateDocs"): client.collections.delete("WeaviateDocs") client.collections.create( "WeaviateDocs", description="A dataset with the contents of Weaviate technical Docs and website", vectorizer_config=Configure.Vectorizer.text2vec_weaviate(), properties=[ Property( name="url", data_type=DataType.TEXT, description="the source URL of the webpage", ), Property( name="text", data_type=DataType.TEXT, description="the content of the webpage", ), ], )
if client.collections.exists("LlamaIndexDocs"): client.collections.delete("LlamaIndexDocs") client.collections.create( "LlamaIndexDocs", description="A dataset with the contents of LlamaIndex technical Docs and website", vectorizer_config=Configure.Vectorizer.text2vec_weaviate(), properties=[ Property( name="url", data_type=DataType.TEXT, description="the source URL of the webpage", ), Property( name="text", data_type=DataType.TEXT, description="the content of the webpage", ), ], )
agent = QueryAgent( client=client, collections=["LlamaIndexDocs", "WeaviateDocs"] ) return agent
Write Contents of Webpage to the Collections
Section titled “Write Contents of Webpage to the Collections”The helper function below uses the SimpleWebPageReader
to write the contents of a webpage to the relevant Weaviate collection
def write_webpages_to_weaviate(client, urls: list[str], collection_name: str): documents = SimpleWebPageReader(html_to_text=True).load_data(urls) collection = client.collections.get(collection_name) with collection.batch.dynamic() as batch: for doc in documents: batch.add_object(properties={"url": doc.id_, "text": doc.text})
Create a Function Calling Agent
Section titled “Create a Function Calling Agent”Now that we have the relevant functions to write to a collection and also the QueryAgent
at hand, we can start by using the FunctionAgent
, which is a simple tool calling agent.
if "OPENAI_API_KEY" not in os.environ: os.environ["OPENAI_API_KEY"] = getpass("openai-key")
weaviate_agent = fresh_setup_weaviate(client)
llm = OpenAI(model="gpt-4o-mini")
def write_to_weaviate_collection(urls=list[str]): """Useful for writing new content to the WeaviateDocs collection""" write_webpages_to_weaviate(client, urls, "WeaviateDocs")
def write_to_li_collection(urls=list[str]): """Useful for writing new content to the LlamaIndexDocs collection""" write_webpages_to_weaviate(client, urls, "LlamaIndexDocs")
def query_agent(query: str) -> str: """Useful for asking questions about Weaviate and LlamaIndex""" response = weaviate_agent.run(query) return response.final_answer
agent = FunctionAgent( tools=[write_to_weaviate_collection, write_to_li_collection, query_agent], llm=llm, system_prompt="""You are a helpful assistant that can write the contents of urls to WeaviateDocs and LlamaIndexDocs collections, as well as forwarding questions to a QueryAgent""",)
response = await agent.run( user_msg="Can you save https://docs.llamaindex.ai/en/stable/examples/agent/agent_workflow_basic/")print(str(response))
response = await agent.run( user_msg="""What are llama index workflows? And can you save these to weaviate docs: https://weaviate.io/blog/what-are-agentic-workflows and https://weaviate.io/blog/ai-agents""")print(str(response))
Llama Index workflows refer to orchestrations involving one or more AI agents within the LlamaIndex framework. These workflows manage complex tasks dynamically by leveraging components such as large language models (LLMs), tools, and memory states. Key features of Llama Index workflows include:
- Support for single or multiple agents managed within an AgentWorkflow orchestrator.- Ability to maintain state across runs via serializable context objects.- Integration of external tools with type annotations, including asynchronous functions.- Streaming of intermediate outputs and event-based interactions.- Human-in-the-loop capabilities to confirm or guide agent actions during workflow execution.
These workflows enable agents to execute sequences of operations, call external tools asynchronously, maintain conversation or task states, stream partial results, and incorporate human inputs when necessary. They embody dynamic, agent-driven sequences of task decomposition, tool use, and reflection, allowing AI systems to plan, act, and improve iteratively toward specific goals.
I have also saved the contents from the provided URLs to the WeaviateDocs collection.
response = await agent.run( user_msg="How many docs do I have in the weaviate and llamaindex collections in total?")print(str(response))
You have a total of 2 documents in the WeaviateDocs collection and 1 document in the LlamaIndexDocs collection. In total, that makes 3 documents across both collections.
weaviate_agent = fresh_setup_weaviate(client)
Create a Workflow with Branches
Section titled “Create a Workflow with Branches”Simple Example: Create Events
Section titled “Simple Example: Create Events”A LlamaIndex Workflow has 2 fundamentals:
- An Event
- A Step
An step may return an event, and an event may trigger a step!
For our use-case, we can imagine thet there are 4 events:
class EvaluateQuery(Event): query: str
class WriteLlamaIndexDocsEvent(Event): urls: list[str]
class WriteWeaviateDocsEvent(Event): urls: list[str]
class QueryAgentEvent(Event): query: str
Simple Example: A Branching Workflow (that does nothing yet)
Section titled “Simple Example: A Branching Workflow (that does nothing yet)”class DocsAssistantWorkflow(Workflow): @step async def start(self, ctx: Context, ev: StartEvent) -> EvaluateQuery: return EvaluateQuery(query=ev.query)
@step async def evaluate_query( self, ctx: Context, ev: EvaluateQuery ) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent | StopEvent: if ev.query == "llama": return WriteLlamaIndexDocsEvent(urls=[ev.query]) if ev.query == "weaviate": return WriteWeaviateDocsEvent(urls=[ev.query]) if ev.query == "question": return QueryAgentEvent(query=ev.query) return StopEvent()
@step async def write_li_docs( self, ctx: Context, ev: WriteLlamaIndexDocsEvent ) -> StopEvent: print(f"Got a request to write something to LlamaIndexDocs") return StopEvent()
@step async def write_weaviate_docs( self, ctx: Context, ev: WriteWeaviateDocsEvent ) -> StopEvent: print(f"Got a request to write something to WeaviateDocs") return StopEvent()
@step async def query_agent( self, ctx: Context, ev: QueryAgentEvent ) -> StopEvent: print(f"Got a request to forward a query to the QueryAgent") return StopEvent()
workflow_that_does_nothing = DocsAssistantWorkflow()
# draw_all_possible_flows(workflow_that_does_nothing)
print( await workflow_that_does_nothing.run(start_event=StartEvent(query="llama")))
Got a request to write something to LlamaIndexDocsNone
Classify the Query with Structured Outputs
Section titled “Classify the Query with Structured Outputs”class SaveToLlamaIndexDocs(BaseModel): """The URLs to parse and save into a llama-index specific docs collection."""
llama_index_urls: List[str] = Field(default_factory=list)
class SaveToWeaviateDocs(BaseModel): """The URLs to parse and save into a weaviate specific docs collection."""
weaviate_urls: List[str] = Field(default_factory=list)
class Ask(BaseModel): """The natural language questions that can be asked to a Q&A agent."""
queries: List[str] = Field(default_factory=list)
class Actions(BaseModel): """Actions to take based on the latest user message."""
actions: List[ Union[SaveToLlamaIndexDocs, SaveToWeaviateDocs, Ask] ] = Field(default_factory=list)
Create a Workflow
Section titled “Create a Workflow”Let’s create a workflow that, still, does nothing, but the incoming user query will be converted to our structure. Based on the contents of that structure, the workflow will decide which step to run.
Notice how whichever step runs first, will return a StopEvent
… This is good, but maybe we can improve that later!
from llama_index.llms.openai import OpenAIResponses
class DocsAssistantWorkflow(Workflow): def __init__(self, *args, **kwargs): self.llm = OpenAIResponses(model="gpt-4.1-mini") self.system_prompt = """You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed. You decide on the next best course of action. Overall, here are the options: - You can write the contents of a URL to llamaindex docs (if it's a llamaindex url) - You can write the contents of a URL to weaviate docs (if it's a weaviate url) - You can answer a question about llamaindex and weaviate using the QueryAgent""" super().__init__(*args, **kwargs)
@step async def start(self, ev: StartEvent) -> EvaluateQuery: return EvaluateQuery(query=ev.query)
@step async def evaluate_query( self, ev: EvaluateQuery ) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent: sllm = self.llm.as_structured_llm(Actions) response = await sllm.achat( [ ChatMessage(role="system", content=self.system_prompt), ChatMessage(role="user", content=ev.query), ] ) actions = response.raw.actions print(actions) for action in actions: if isinstance(action, SaveToLlamaIndexDocs): return WriteLlamaIndexDocsEvent(urls=action.llama_index_urls) elif isinstance(action, SaveToWeaviateDocs): return WriteWeaviateDocsEvent(urls=action.weaviate_urls) elif isinstance(action, Ask): for query in action.queries: return QueryAgentEvent(query=query)
@step async def write_li_docs(self, ev: WriteLlamaIndexDocsEvent) -> StopEvent: print(f"Writing {ev.urls} to LlamaIndex Docs") return StopEvent()
@step async def write_weaviate_docs( self, ev: WriteWeaviateDocsEvent ) -> StopEvent: print(f"Writing {ev.urls} to Weaviate Docs") return StopEvent()
@step async def query_agent(self, ev: QueryAgentEvent) -> StopEvent: print(f"Sending `'{ev.query}`' to agent") return StopEvent()
everything_docs_agent_beta = DocsAssistantWorkflow()
async def run_docs_agent_beta(query: str): print( await everything_docs_agent_beta.run( start_event=StartEvent(query=query) ) )
await run_docs_agent_beta( """Can you save https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract and https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more??""")
[SaveToLlamaIndexDocs(llama_index_urls=['https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract', 'https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more'])]Writing ['https://www.llamaindex.ai/blog/get-citations-and-reasoning-for-extracted-data-in-llamaextract', 'https://www.llamaindex.ai/blog/llamaparse-update-may-2025-new-models-skew-detection-and-more'] to LlamaIndex DocsNone
await run_docs_agent_beta( "How many documents do we have in the LlamaIndexDocs collection now?")
[Ask(queries=['How many documents are in the LlamaIndexDocs collection?'])]Sending `'How many documents are in the LlamaIndexDocs collection?`' to agentNone
await run_docs_agent_beta("What are LlamaIndex workflows?")
[Ask(queries=['What are LlamaIndex workflows?'])]Sending `'What are LlamaIndex workflows?`' to agentNone
await run_docs_agent_beta( "Can you save https://weaviate.io/blog/graph-rag and https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks??")
[SaveToWeaviateDocs(weaviate_urls=['https://weaviate.io/blog/graph-rag', 'https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks'])]Writing ['https://weaviate.io/blog/graph-rag', 'https://weaviate.io/blog/genai-apps-with-weaviate-and-databricks'] to Weaviate DocsNone
Run Multiple Branches & Put it all togehter
Section titled “Run Multiple Branches & Put it all togehter”In these cases, it makes sense to run multiple branches. So, a single step can trigger multiple events at once! We can send_event
via the context 👇
class ActionCompleted(Event): result: str
class DocsAssistantWorkflow(Workflow): def __init__(self, *args, **kwargs): self.llm = OpenAIResponses(model="gpt-4.1-mini") self.system_prompt = """You are a docs assistant. You evaluate incoming queries and break them down to subqueries when needed. You decide on the next best course of action. Overall, here are the options: - You can write the contents of a URL to llamaindex docs (if it's a llamaindex url) - You can write the contents of a URL to weaviate docs (if it's a weaviate url) - You can answer a question about llamaindex and weaviate using the QueryAgent""" super().__init__(*args, **kwargs)
@step async def start(self, ctx: Context, ev: StartEvent) -> EvaluateQuery: return EvaluateQuery(query=ev.query)
@step async def evaluate_query( self, ctx: Context, ev: EvaluateQuery ) -> QueryAgentEvent | WriteLlamaIndexDocsEvent | WriteWeaviateDocsEvent | None: await ctx.store.set("results", []) sllm = self.llm.as_structured_llm(Actions) response = await sllm.achat( [ ChatMessage(role="system", content=self.system_prompt), ChatMessage(role="user", content=ev.query), ] ) actions = response.raw.actions await ctx.store.set("num_events", len(actions)) await ctx.store.set("results", []) print(actions) for action in actions: if isinstance(action, SaveToLlamaIndexDocs): ctx.send_event( WriteLlamaIndexDocsEvent(urls=action.llama_index_urls) ) elif isinstance(action, SaveToWeaviateDocs): ctx.send_event( WriteWeaviateDocsEvent(urls=action.weaviate_urls) ) elif isinstance(action, Ask): for query in action.queries: ctx.send_event(QueryAgentEvent(query=query))
@step async def write_li_docs( self, ctx: Context, ev: WriteLlamaIndexDocsEvent ) -> ActionCompleted: print(f"Writing {ev.urls} to LlamaIndex Docs") write_webpages_to_weaviate( client, urls=ev.urls, collection_name="LlamaIndexDocs" ) results = await ctx.store.get("results") results.append(f"Wrote {ev.urls} it LlamaIndex Docs") return ActionCompleted(result=f"Writing {ev.urls} to LlamaIndex Docs")
@step async def write_weaviate_docs( self, ctx: Context, ev: WriteWeaviateDocsEvent ) -> ActionCompleted: print(f"Writing {ev.urls} to Weaviate Docs") write_webpages_to_weaviate( client, urls=ev.urls, collection_name="WeaviateDocs" ) results = await ctx.store.get("results") results.append(f"Wrote {ev.urls} it Weavite Docs") return ActionCompleted(result=f"Writing {ev.urls} to Weaviate Docs")
@step async def query_agent( self, ctx: Context, ev: QueryAgentEvent ) -> ActionCompleted: print(f"Sending {ev.query} to agent") response = weaviate_agent.run(ev.query) results = await ctx.store.get("results") results.append(f"QueryAgent responded with:\n {response.final_answer}") return ActionCompleted(result=f"Sending `'{ev.query}`' to agent")
@step async def collect( self, ctx: Context, ev: ActionCompleted ) -> StopEvent | None: num_events = await ctx.store.get("num_events") evs = ctx.collect_events(ev, [ActionCompleted] * num_events) if evs is None: return None return StopEvent(result=[ev.result for ev in evs])
everything_docs_agent = DocsAssistantWorkflow(timeout=None)
async def run_docs_agent(query: str): handler = everything_docs_agent.run(start_event=StartEvent(query=query)) result = await handler for response in await handler.ctx.store.get("results"): print(response)
await run_docs_agent( "Can you save https://docs.llamaindex.ai/en/stable/understanding/workflows/ and https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/")
[SaveToLlamaIndexDocs(llama_index_urls=['https://docs.llamaindex.ai/en/stable/understanding/workflows/']), SaveToLlamaIndexDocs(llama_index_urls=['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'])]Writing ['https://docs.llamaindex.ai/en/stable/understanding/workflows/'] to LlamaIndex DocsWriting ['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'] to LlamaIndex DocsWrote ['https://docs.llamaindex.ai/en/stable/understanding/workflows/'] it LlamaIndex DocsWrote ['https://docs.llamaindex.ai/en/stable/understanding/workflows/branches_and_loops/'] it LlamaIndex Docs
await run_docs_agent( "How many documents do we have in the LlamaIndexDocs collection now?")
[Ask(queries=['How many documents are in the LlamaIndexDocs collection?'])]Sending How many documents are in the LlamaIndexDocs collection? to agentQueryAgent responded with: The LlamaIndexDocs collection contains 2 documents, specifically related to workflows and branches and loops within the documentation.
await run_docs_agent( "What are LlamaIndex workflows? And can you save https://weaviate.io/blog/graph-rag")
[Ask(queries=['What are LlamaIndex workflows?'])]Sending What are LlamaIndex workflows? to agentQueryAgent responded with: LlamaIndex workflows are an event-driven, step-based framework designed to control and manage the execution flow of complex applications, particularly those involving generative AI. They break an application into discrete Steps, each triggered by Events and capable of emitting further Events, allowing for complex logic involving loops, branches, and parallel execution.
In a LlamaIndex workflow, steps perform functions ranging from simple tasks to complex agents, with inputs and outputs communicated via Events. This event-driven model facilitates maintainability and clarity, overcoming limitations of previous approaches like directed acyclic graphs (DAGs) which struggled with complex flows involving loops and branching.
Key features include:- **Loops:** Steps can return events that loop back to previous steps to enable iterative processes.- **Branches:** Workflows can branch into different paths based on conditions, allowing for multiple distinct sequences of steps.- **Parallelism:** Multiple branches or steps can run concurrently and synchronize their results.- **State Maintenance:** Workflows support maintaining state and context throughout execution.- **Observability and Debugging:** Supported by various components and callbacks for monitoring.
An example workflow might involve judging whether a query is of sufficient quality, looping to improve it if not, then concurrently executing different retrieval-augmented generation (RAG) strategies, and finally judging their responses to produce a single output.
Workflows are especially useful as applications grow in complexity, enabling developers to organize and control intricate AI logic more naturally and efficiently than traditional graph-based methods. For simpler pipelines, LlamaIndex suggests using workflows optionally, but for advanced agentic applications, workflows provide a flexible and powerful control abstraction.
await run_docs_agent("How do I use loops in llamaindex workflows?")
[Ask(queries=['How to use loops in llamaindex workflows'])]Sending How to use loops in llamaindex workflows to agentQueryAgent responded with: In LlamaIndex workflows, loops are implemented using an event-driven approach where you define custom event types and steps that emit events to control the workflow's execution flow. To create a loop, you define a custom event (e.g., `LoopEvent`) and a workflow step that can return either the event continuing the loop or another event to proceed. For example, a workflow step might randomly decide to either loop back (emit `LoopEvent` again) or continue to a next step emitting a different event.
This allows creating flexible looping behaviors where any step can loop back to any other step by returning the corresponding event instances. The approach leverages Python's async functions decorated with `@step`, which process events and return the next event(s), enabling both loops and conditional branching in workflows.
Thus, loops in LlamaIndex workflows are event-based, using custom event types and the return of events from steps to signal iterations until a condition is met.
Example:
```pythonfrom llamaindex.workflow import Workflow, Event, StartEvent, StopEvent, stepimport random
class LoopEvent(Event): loop_output: str
class FirstEvent(Event): first_output: str
class MyWorkflow(Workflow): @step async def step_one(self, ev: StartEvent | LoopEvent) -> FirstEvent | LoopEvent: if random.randint(0, 1) == 0: print("Bad thing happened") return LoopEvent(loop_output="Back to step one.") else: print("Good thing happened") return FirstEvent(first_output="First step complete.")
# ... other steps ...
# Running this workflow will cause step_one to loop randomly until it proceeds.```
You can combine loops with branching and parallel execution in workflows to build complex control flows. For detailed guidance and examples, consult the LlamaIndex documentation under "Branches and Loops" and the "Workflows" guides.