Skip to content

Agent Data (Python)

See the Agent Data Overview for concepts, constraints, and environment details.

Terminal window
pip install llama-cloud-services llama-cloud

The Python SDK offers AsyncAgentDataClient with automatic retries and type validation.

from pydantic import BaseModel
from llama_cloud_services.beta.agent_data import AsyncAgentDataClient
class ExtractedPerson(BaseModel):
name: str
age: int
email: str
client = AsyncAgentDataClient(
type=ExtractedPerson,
collection="extracted_people",
agent_url_id="person-extraction-agent",
)

Retry behavior: network errors (timeouts, connect errors, retriable HTTP status) are retried up to 3 times with exponential backoff.

person = ExtractedPerson(name="John Doe", age=30, email="john@example.com")
created = await client.create_item(person)
fetched = await client.get_item(created.id)
updated = await client.update_item(created.id, ExtractedPerson(name="Jane", age=31, email="jane@example.com"))
await client.delete_item(updated.id)

Notes:

  • Updates overwrite the entire data object.

You can filter on data fields and on created_at/updated_at (top-level fields). Sort by comma-delimited fields; data. prefix is required when sorting by data fields. Default page size is 50 (max 1000).

results = await client.search(
filter={
# Data fields
"age": {"gte": 21, "lt": 65},
"status": {"eq": "active"},
"tags": {"includes": ["python", "ml"]},
# Top-level timestamps (ISO strings accepted)
"created_at": {"gte": "2024-01-01T00:00:00Z"},
},
order_by="data.name desc, created_at",
page_size=50,
offset=0,
include_total=True, # ask only on the first page if needed
)
for item in results.items:
print(item.data)
print(results.has_more, results.total)

Filter operators (per-field):

  • eq, gt, gte, lt, lte, includes (IN). Datetime filters accept ISO strings; server converts to timestamps.

Sorting:

  • Example: "data.name desc, created_at".
  • If no sort is provided, results default to created_at desc.

Pagination:

  • Use offset and page_size. The server may return has_more and a next_page_token (SDK exposes has_more).

Group data by one or more data fields, optionally count items per group, and/or fetch the first item per group.

agg = await client.aggregate(
filter={"status": {"eq": "active"}},
group_by=["department", "role"],
count=True,
first=True, # return earliest item per group by created_at
order_by="data.department asc, data.role asc",
page_size=100,
)
for group in agg.items: # items are groups
print(group.group_key) # {"department": "Sales", "role": "AE"}
print(group.count) # optional
print(group.first_item) # optional dict

Details:

  • group_by: dot-style data paths (e.g., "department", "contact.email").
  • count: adds a count per group.
  • first: returns the first data item per group (earliest created_at).
  • order_by: uses the same semantics as search (applies to group key expressions).
  • Pagination uses offset and page_size similarly to search.
  • Filter keys target data fields; only created_at/updated_at are top-level.
  • Sort with comma-separated specs; prefix data fields in order_by (e.g., "data.name desc, created_at").
  • Default page_size is 50 (max 1000). Request include_total=True on the first page only.
  • _public is required in local dev; use collections to separate apps. Non-_public requires an existing deployment and access.
  • get_item returns 404 if not found; update_item overwrites the entire data object.