Skip to content

Production Extraction: Batch Processing, Polling, and Latency Management

This cookbook covers production patterns for LlamaExtract V2: extracting from single files, processing batches concurrently, composing parse-then-extract workflows, handling latency, and managing schemas programmatically.

Every example uses the V2 Extract API (client.extract), which accepts a document_input_value — either a File ID or Parse Job ID.

pip install llama-cloud>=1.0
import os
from llama_cloud import LlamaCloud, AsyncLlamaCloud
client = LlamaCloud(api_key=os.environ["LLAMA_CLOUD_API_KEY"])

Upload a file, define a schema, and extract structured data.

from pydantic import BaseModel, Field
from typing import Optional
import time
# Define your schema
class InvoiceData(BaseModel):
vendor_name: str = Field(description="Name of the vendor or supplier")
invoice_number: str = Field(description="Unique invoice identifier")
total_amount: float = Field(description="Total amount due")
currency: str = Field(description="Currency code (e.g. USD, EUR)")
due_date: Optional[str] = Field(None, description="Payment due date")
# Upload the file
file_obj = client.files.create(
file="./invoices/invoice_001.pdf",
purpose="extract",
)
# Create an extraction job
job = client.extract.create(
document_input_value=file_obj.id,
config={
"extract_options": {
"data_schema": InvoiceData.model_json_schema(),
"extraction_target": "per_doc",
"tier": "cost_effective",
},
},
)
# Poll until complete
while job.status not in ("COMPLETED", "FAILED", "CANCELLED"):
time.sleep(2)
job = client.extract.get(job.id)
print(f"Status: {job.status}")
if job.status == "COMPLETED":
invoice = InvoiceData.model_validate(job.extract_result)
print(f"Vendor: {invoice.vendor_name}")
print(f"Total: {invoice.currency} {invoice.total_amount}")
else:
print(f"Job failed: {job.error_message}")

2. Batch Extraction from Multiple Documents

Section titled “2. Batch Extraction from Multiple Documents”

When you need to extract from many files, upload them all first, then create extraction jobs concurrently and poll them in parallel.

import asyncio
import time
from pathlib import Path
from pydantic import BaseModel, Field
from typing import Optional
from llama_cloud import AsyncLlamaCloud
async_client = AsyncLlamaCloud()
class InvoiceData(BaseModel):
vendor_name: str = Field(description="Name of the vendor or supplier")
invoice_number: str = Field(description="Unique invoice identifier")
total_amount: float = Field(description="Total amount due")
currency: str = Field(description="Currency code (e.g. USD, EUR)")
due_date: Optional[str] = Field(None, description="Payment due date")
EXTRACT_CONFIG = {
"extract_options": {
"data_schema": InvoiceData.model_json_schema(),
"extraction_target": "per_doc",
"tier": "cost_effective",
},
}
async def upload_files(file_paths: list[Path]) -> list[str]:
"""Upload files concurrently and return file IDs."""
async def upload_one(path: Path) -> str:
file_obj = await async_client.files.create(
file=str(path), purpose="extract"
)
return file_obj.id
file_ids = await asyncio.gather(*[upload_one(p) for p in file_paths])
return list(file_ids)
async def extract_one(file_id: str) -> dict:
"""Create an extract job for a single file and poll until done."""
job = await async_client.extract.create(
document_input_value=file_id,
config=EXTRACT_CONFIG,
)
while job.status not in ("COMPLETED", "FAILED", "CANCELLED"):
await asyncio.sleep(2)
job = await async_client.extract.get(job.id)
if job.status == "COMPLETED":
return {"file_id": file_id, "data": job.extract_result}
else:
return {"file_id": file_id, "error": job.error_message}
async def batch_extract(file_paths: list[Path], concurrency: int = 10) -> list[dict]:
"""Extract from multiple files with bounded concurrency."""
file_ids = await upload_files(file_paths)
semaphore = asyncio.Semaphore(concurrency)
async def extract_with_limit(file_id: str) -> dict:
async with semaphore:
return await extract_one(file_id)
results = await asyncio.gather(
*[extract_with_limit(fid) for fid in file_ids]
)
return list(results)
# Usage
async def main():
invoice_files = list(Path("./invoices").glob("*.pdf"))
print(f"Processing {len(invoice_files)} files...")
results = await batch_extract(invoice_files, concurrency=10)
succeeded = [r for r in results if "data" in r]
failed = [r for r in results if "error" in r]
print(f"Completed: {len(succeeded)}, Failed: {len(failed)}")
for result in succeeded:
invoice = InvoiceData.model_validate(result["data"])
print(f" {invoice.vendor_name}: {invoice.currency} {invoice.total_amount}")
asyncio.run(main())
  • Concurrency: Start with 10 concurrent jobs and adjust based on your plan limits. If you see THROTTLED status, reduce concurrency.
  • Credits: cost_effective tier costs 4 credits per page. agentic tier costs 15 credits per page. Plan your budget for large batches.
  • File upload: The file upload endpoint has its own rate limits. Upload files before creating extract jobs to separate the two bottlenecks.

3. Parse-Then-Extract: Composable Workflow

Section titled “3. Parse-Then-Extract: Composable Workflow”

When you need fine-grained control over parsing, or want to parse once and extract with different schemas, use the parse-then-extract pattern. Parse the document first, then pass the parse_job_id to extraction.

This avoids re-parsing the same document for each extraction configuration, saving both time and credits.

from pydantic import BaseModel, Field
from typing import Optional
import time
# Step 1: Parse the document
parse_job = client.parsing.create(
tier="agentic",
version="latest",
upload_file="./contracts/master_agreement.pdf",
)
# Wait for parse to complete
parse_result = client.parsing.wait_for_completion(
parse_job.id, verbose=True
)
print(f"Parse complete: {parse_result.status}")
# Step 2: Extract with the first schema (contract metadata)
class ContractMetadata(BaseModel):
parties: list[str] = Field(description="Names of all contracting parties")
effective_date: str = Field(description="Contract effective date")
termination_date: Optional[str] = Field(None, description="Contract end date")
governing_law: str = Field(description="Governing law jurisdiction")
metadata_job = client.extract.create(
document_input_value=parse_job.id,
config={
"extract_options": {
"data_schema": ContractMetadata.model_json_schema(),
"extraction_target": "per_doc",
"tier": "agentic",
"cite_sources": True,
},
},
)
while metadata_job.status not in ("COMPLETED", "FAILED", "CANCELLED"):
time.sleep(2)
metadata_job = client.extract.get(metadata_job.id)
metadata = ContractMetadata.model_validate(metadata_job.extract_result)
print(f"Parties: {metadata.parties}")
print(f"Governing law: {metadata.governing_law}")
# Step 3: Extract with a second schema (financial terms) from the SAME parse
class FinancialTerms(BaseModel):
total_value: Optional[float] = Field(None, description="Total contract value")
payment_schedule: Optional[str] = Field(None, description="Payment frequency and terms")
penalties: Optional[str] = Field(None, description="Late payment or breach penalties")
financial_job = client.extract.create(
document_input_value=parse_job.id, # Reuse the same parse result
config={
"extract_options": {
"data_schema": FinancialTerms.model_json_schema(),
"extraction_target": "per_doc",
"tier": "agentic",
},
},
)
while financial_job.status not in ("COMPLETED", "FAILED", "CANCELLED"):
time.sleep(2)
financial_job = client.extract.get(financial_job.id)
terms = FinancialTerms.model_validate(financial_job.extract_result)
print(f"Contract value: {terms.total_value}")
print(f"Payment schedule: {terms.payment_schedule}")

Extract from specific pages using target_pages. This is useful for long documents where you only need data from certain pages.

job = client.extract.create(
document_input_value=file_obj.id,
config={
"extract_options": {
"data_schema": InvoiceData.model_json_schema(),
"extraction_target": "per_doc",
"tier": "cost_effective",
"target_pages": "1,3,5-7", # Pages 1, 3, 5, 6, 7
},
},
)

Pages are 1-indexed. Supported formats: "1", "1,3", "1-5", "1,3,5-7,9". You are only billed for pages extracted.

By default, extract_metadata (usage stats, citations, field-level confidence) is not included in the response. Use the expand query parameter to request it.

# Get job with metadata
job = client.extract.get(job.id, expand=["extract_metadata"])
if job.extract_metadata:
usage = job.extract_metadata.usage
print(f"Pages extracted: {usage.num_pages_extracted}")
print(f"Output tokens: {usage.num_output_tokens}")
ScenarioApproach
One file, one schemaUse document_input_value with file ID directly
One file, multiple schemasParse once, then extract with each schema via document_input_value with parse job ID
Need to inspect parse quality before extractingParse first, review, then extract
Batch of files with same schemaUse file ID with batch pattern from Section 2
Only need specific pagesAdd target_pages to extract_options
TierCredits/PageTypical LatencyBest For
cost_effective45-30 secondsHigh-volume, simpler documents
agentic1515-90 secondsComplex documents, higher accuracy

Latency scales with document length. A 100-page PDF on the agentic tier will take longer than a 2-page invoice.

Wrap your polling loop with a timeout to avoid waiting indefinitely.

import time
def extract_with_timeout(
client,
file_id: str,
config: dict,
timeout_seconds: float = 300,
poll_interval: float = 2.0,
) -> dict:
"""Extract with a client-side timeout."""
job = client.extract.create(
document_input_value=file_id,
config=config,
)
start = time.monotonic()
while job.status not in ("COMPLETED", "FAILED", "CANCELLED"):
elapsed = time.monotonic() - start
if elapsed > timeout_seconds:
raise TimeoutError(
f"Extraction job {job.id} did not complete within "
f"{timeout_seconds}s (last status: {job.status})"
)
time.sleep(poll_interval)
job = client.extract.get(job.id)
if job.status != "COMPLETED":
raise RuntimeError(f"Job {job.id} ended with status: {job.status} - {job.error_message}")
return job.extract_result
# Usage
try:
result = extract_with_timeout(
client,
file_id=file_obj.id,
config=EXTRACT_CONFIG,
timeout_seconds=120,
)
print("Extracted:", result)
except TimeoutError as e:
print(f"Timed out: {e}")
except RuntimeError as e:
print(f"Failed: {e}")

When the system is under load, your job may enter THROTTLED status before transitioning to RUNNING. This is normal. The job will proceed once capacity is available. Do not cancel and retry throttled jobs. That creates more load and pushes your job to the back of the queue.

PENDING → THROTTLED → RUNNING → COMPLETED

Webhook Integration (Alternative to Polling)

Section titled “Webhook Integration (Alternative to Polling)”

For production systems where you don’t want long-lived polling connections, configure webhooks to receive notifications when jobs complete.

job = client.extract.create(
document_input_value=file_obj.id,
config=EXTRACT_CONFIG,
webhook_configurations=[
{
"webhook_url": "https://your-api.example.com/webhooks/extract",
"webhook_events": ["extract.success", "extract.error"],
"webhook_headers": {
"Authorization": "Bearer your-webhook-secret",
},
}
],
)
# No polling needed. Your webhook endpoint receives:
# POST https://your-api.example.com/webhooks/extract
# {
# "event": "extract.success",
# "job_id": "...",
# "status": "COMPLETED",
# ...
# }

Don’t want to write JSON schema by hand? Describe what you need in plain English and let the API generate a schema.

generated = client.extract.generate_schema(
prompt="Extract the company name, CEO name, founding year, and headquarters city from this annual report",
)
print("Generated schema:", generated.data_schema)
print("Suggested config name:", generated.name)
# Use the generated schema directly in an extraction job
job = client.extract.create(
document_input_value=file_obj.id,
config={
"extract_options": {
"data_schema": generated.data_schema,
"extraction_target": "per_doc",
"tier": "cost_effective",
},
},
)

Upload a representative file and let the API analyze it to suggest a schema.

sample_file = client.files.create(
file="./invoices/sample_invoice.pdf",
purpose="extract",
)
generated = client.extract.generate_schema(
file_id=sample_file.id,
prompt="Extract all invoice fields including line items",
)
print("Generated schema:", generated.data_schema)

Catch schema errors before running extraction jobs.

from pydantic import BaseModel, Field
class MySchema(BaseModel):
name: str = Field(description="Person's full name")
age: int = Field(description="Person's age in years")
validation = client.extract.validate_schema(
data_schema=MySchema.model_json_schema(),
)
if validation.valid:
print("Schema is valid")
else:
print(f"Schema errors: {validation.errors}")

Here’s a complete production script that combines batch upload, parse-then-extract, timeout handling, and result collection.

import asyncio
import time
from pathlib import Path
from pydantic import BaseModel, Field
from typing import Optional
from llama_cloud import AsyncLlamaCloud
async_client = AsyncLlamaCloud()
class ContractSummary(BaseModel):
parties: list[str] = Field(description="Names of all contracting parties")
effective_date: str = Field(description="Contract effective date")
contract_type: str = Field(description="Type of contract (NDA, MSA, SOW, etc.)")
total_value: Optional[float] = Field(None, description="Total contract value if specified")
EXTRACT_CONFIG = {
"extract_options": {
"data_schema": ContractSummary.model_json_schema(),
"extraction_target": "per_doc",
"tier": "agentic",
"cite_sources": True,
},
}
async def process_contract(file_path: Path, timeout: float = 300) -> dict:
"""Parse, then extract from a single contract with timeout."""
# Upload
file_obj = await async_client.files.create(
file=str(file_path), purpose="extract"
)
# Parse first for higher quality
parse_job = await async_client.parsing.create(
tier="agentic",
version="latest",
file_id=file_obj.id,
)
parse_job = await async_client.parsing.wait_for_completion(
parse_job.id, timeout=timeout
)
# Extract from parse result
job = await async_client.extract.create(
document_input_value=parse_job.id,
config=EXTRACT_CONFIG,
)
start = time.monotonic()
while job.status not in ("COMPLETED", "FAILED", "CANCELLED"):
if time.monotonic() - start > timeout:
return {"file": file_path.name, "error": f"Timeout after {timeout}s"}
await asyncio.sleep(2)
job = await async_client.extract.get(job.id)
if job.status == "COMPLETED":
return {
"file": file_path.name,
"data": job.extract_result,
# Note: extract_metadata requires ?expand=extract_metadata on GET
}
return {"file": file_path.name, "error": job.error_message}
async def main():
contracts = list(Path("./contracts").glob("*.pdf"))
semaphore = asyncio.Semaphore(5)
async def bounded(path):
async with semaphore:
return await process_contract(path)
results = await asyncio.gather(*[bounded(p) for p in contracts])
for r in results:
if "data" in r:
summary = ContractSummary.model_validate(r["data"])
print(f"{r['file']}: {summary.contract_type} between {', '.join(summary.parties)}")
else:
print(f"{r['file']}: ERROR - {r['error']}")
asyncio.run(main())