Skip to content

Index API & Clients Guide

This guide highlights the core workflow for working with Index programmatically.

Install API client package

pip install llama-cloud>=1.0

Import and configure the client (you can use either the sync or async client):

from llama_cloud import LlamaCloud, AsyncLlamaCloud
client = LlamaCloud(api_key='<llama-cloud-api-key>')
file_obj = client.files.create(file="/path/to/doc1.pdf", purpose="user_data")
print(file_obj.id)
from llama_cloud.types.data_source_create_params import ComponentCloudS3DataSource
data_source = await client.data_sources.create(
name="my-s3-data-source",
component=ComponentCloudS3DataSource(
bucket="my-bucket",
aws_access_id="my-access-id",
prefix="documents/",
),
source_type="S3",
project_id="my-project-id",
)
from llama_cloud.types.data_sink_create_params import ComponentCloudPineconeVectorStore
data_sink = client.data_sinks.create(
name="my-pinecone-data-sink",
component=ComponentCloudPineconeVectorStore(
api_key="my-pinecone-api-key",
index_name="my-pinecone-index",
),
sink_type="PINECONE",
)
print(data_sink.id)
# Embedding config
embedding_config = {
'type': 'OPENAI_EMBEDDING',
'component': {
'api_key': '<YOUR_API_KEY_HERE>', # editable
'model_name': 'text-embedding-ada-002' # editable
}
}
# Transformation auto config
transform_config = {
'mode': 'auto',
'config': {
'chunk_size': 1024, # editable
'chunk_overlap': 20 # editable
}
}
pipeline = client.pipelines.upsert({
name: 'my-first-index',
project_id: 'my-project-id',
data_sink_id: data_sink.id,
embedding_config: {
"type": 'OPENAI_EMBEDDING',
"component": {
"api_key": 'sk-1234',
"model_name": 'text-embedding-3-small',
},
},
llama_parse_parameters: {},
transform_config: {
"mode": 'auto',
"chunk_overlap": 128,
"chunk_size": 1028,
},
});
print(pipeline.id);
files = client.pipelines.files.create(
pipeline_id="some-id",
body=[
{
"file_id": file_obj.id,
"custom_metadata": {"category": "invoices"},
}
]
)
await client.pipelines.data_sources.update_data_sources(
pipeline_id=pipeline.id,
body=[
{
"data_source_id": data_source.id,
"sync_interval": 43200.0 # Optional, scheduled sync frequency in seconds. In this case, every 12 hours.
},
]
)
documents = await client.pipelines.documents.create(
pipeline_id=pipeline.id,
body=[
{
"text": "This is my first document to be indexed.",
"metadata": {"source": "generated"},
}
],
)
print(f"Uploaded {len(documents)} documents to the index.")
# Wait for indexing to complete
status_resp = client.pipelines.get_status(pipeline_id=pipeline.id)
while status_resp.status not in ("NOT_STARTED", "IN_PROGRESS"):
time.sleep(5)
status_resp = client.pipelines.get_status(pipeline_id=pipeline.id)
print(f"Indexing completed with status: {status_resp.status}")
results = await client.pipelines.retrieve(
pipeline_id=pipeline.id,
query="Some query",
dense_similarity_top_k=20,
sparse_similarity_top_k=20,
alpha=0.5,
enable_reranking=True,
rerank_top_n=5,
)
for n in results.retrieval_nodes:
print(f"Score: {n.score}, Text: {n.node.text}")