Skip to content

LlamaCloud API & Clients Guide

This guide highlights the core workflow.

Install API client package

pip install llama-cloud

Import and configure client

from llama_cloud.client import LlamaCloud
client = LlamaCloud(token='<llama-cloud-api-key>')
with open('test.pdf', 'rb') as f:
file = client.files.upload_file(upload_file=f)
from llama_cloud.types import CloudS3DataSource
ds = {
'name': 's3',
'source_type': 'S3',
'component': CloudS3DataSource(bucket='test-bucket')
}
data_source = client.data_sources.create_data_source(request=ds)
from llama_cloud.types import CloudPineconeVectorStore
ds = {
'name': 'pinecone',
'sink_type': 'PINECONE',
'component': CloudPineconeVectorStore(api_key='test-key', index_name='test-index')
}
data_sink = client.data_sinks.create_data_sink(request=ds)
# Embedding config
embedding_config = {
'type': 'OPENAI_EMBEDDING',
'component': {
'api_key': '<YOUR_API_KEY_HERE>', # editable
'model_name': 'text-embedding-ada-002' # editable
}
}
# Transformation auto config
transform_config = {
'mode': 'auto',
'config': {
'chunk_size': 1024, # editable
'chunk_overlap': 20 # editable
}
}
pipeline = {
'name': 'test-pipeline',
'embedding_config': embedding_config,
'transform_config': transform_config,
'data_sink_id': data_sink.id
}
pipeline = client.pipelines.upsert_pipeline(request=pipeline)
files = [
{
'file_id': file.id,
'custom_metadata': {
'document_type': 'INVOICE' # Optioal, an example on how to add custom metadata to your files
}
}
]
pipeline_files = client.pipelines.add_files_to_pipeline(pipeline.id, request=files)
data_sources = [
{
'data_source_id': data_source.id,
'sync_interval': 43200.0 # Optional, scheduled sync frequency in seconds. In this case, every 12 hours.
}
]
pipeline_data_sources = client.pipelines.add_data_sources_to_pipeline(pipeline.id, request=data_sources)
from llama_cloud.types import CloudDocumentCreate
documents = [
CloudDocumentCreate(
text='test-text',
metadata={
'test-key': 'test-val'
}
)
]
documents = client.pipelines.create_batch_pipeline_documents(pipeline.id, request=documents)
status = client.pipelines.get_pipeline_status(pipeline.id)
jobs = client.pipelines.list_pipeline_jobs(pipeline.id)
results = client.pipelines.run_search(pipeline.id, query='test-query')