Using LlamaSheets with Coding Agents
LlamaSheets extracts messy spreadsheets into clean, structured parquet files with rich metadata.
This guide shows you how to set up a project to work with this data using a coding agent (Claude-Code in this case), and then walks through common workflows.
This guide is completely applicable to any other coding agent (Codex, Gemini CLI, Cursor, etc.), since all these agents support an AGENTS.md file and have file-system access.
You can find the complete setup for this tutorial in the llama-cloud-services github examples.
Setup Guide
Section titled โSetup GuideโFollow these steps to set up your project for working with LlamaSheets and Claude.
Step 1: Create Your Project Directory
Section titled โStep 1: Create Your Project Directoryโmkdir coding-agent-analysiscd coding-agent-analysis
# Create directoriesmkdir data # For extracted parquet filesmkdir scripts # For analysis scriptsmkdir reports # For output reportsStep 2: Install Dependencies
Section titled โStep 2: Install DependenciesโCreate a requirements.txt:
llama-cloud-services # LlamaSheets SDKpandas>=2.0.0pyarrow>=12.0.0openpyxl>=3.0.0 # For Excel file supportmatplotlib>=3.7.0 # For visualizations (optional)Install dependencies:
python -m venv venvsource venv/bin/activate # On Windows: venv\Scripts\activatepip install -r requirements.txtStep 3: Set Up Environment Variables
Section titled โStep 3: Set Up Environment VariablesโCreate a .env file:
LLAMA_CLOUD_API_KEY=your_api_key_hereStep 4: Create a CLAUDE.md File
Section titled โStep 4: Create a CLAUDE.md FileโCreate a CLAUDE.md (or AGENTS.md if not using Claude) file in your project root to guide the agentโs interactions:
This project uses LlamaSheets to extract data from spreadsheets for analysis.
## Current Project Structure
- `data/` - Contains extracted parquet files from LlamaSheets - `{name}_table_{N}.parquet` - Table data files - `{name}_metadata_{N}.parquet` - Cell metadata files - `{name}_job_metadata.json` - Extraction job information- `scripts/` - Analysis and helper scripts- `reports/` - Your generated reports and outputs
## Working with LlamaSheets Data
### Understanding the Files
When a spreadsheet is extracted, you'll find:
1. **Table parquet files** (`table_*.parquet`): The actual table data - Columns correspond to spreadsheet columns - Data types are preserved (dates, numbers, strings, booleans)
2. **Metadata parquet files** (`metadata_*.parquet`): Rich cell-level metadata - Formatting: `font_bold`, `font_italic`, `font_size`, `background_color_rgb` - Position: `row_number`, `column_number`, `coordinate` (e.g., "A1") - Type detection: `data_type`, `is_date_like`, `is_percentage`, `is_currency` - Layout: `is_in_first_row`, `is_merged_cell`, `horizontal_alignment` - Content: `cell_value`, `raw_cell_value`
3. **Job metadata JSON** (`job_metadata.json`): Overall extraction results - `tables[]`: List of extracted tables with IDs and locations - `worksheet_metadata[]`: Generated titles and descriptions - `status`: Success/failure status
### Key Principles
1. **Use metadata to understand structure**: Bold cells often indicate headers, colors indicate groupings2. **Validate before analysis**: Check data types, look for missing values3. **Preserve formatting context**: The metadata tells you what the spreadsheet author emphasized4. **Save intermediate results**: Store cleaned data as new parquet files
### Common Patterns
**Loading data:**```pythonimport pandas as pddf = pd.read_parquet("data/table_1_Sheet1.parquet")meta_df = pd.read_parquet("data/metadata_1_Sheet1.parquet")Finding headers:
headers = meta_df[meta_df['font_bold'] == True]['cell_value'].tolist()Finding date columns:
date_cols = meta_df[meta_df['is_date_like'] == True]['column_number'].unique()Tools Available
Section titled โTools Availableโ- Python 3.11+: For data analysis
- pandas: DataFrame manipulation
- pyarrow: Parquet file reading
- matplotlib: Visualization (optional)
Guidelines
Section titled โGuidelinesโ- Always read the job_metadata.json first to understand what was extracted
- Check both table data and metadata before making assumptions
- Write reusable functions for common operations
- Document any data quality issues discovered
### Step 5: Create an Extraction Helper Script
Create `scripts/extract.py`:
```python"""Helper script to extract spreadsheets using LlamaSheets."""
import asyncioimport jsonimport osfrom pathlib import Path
from llama_cloud_services.beta.sheets import LlamaSheetsfrom llama_cloud_services.beta.sheets.types import ( SpreadsheetParsingConfig, SpreadsheetResultType,)
async def extract_spreadsheet( file_path: str, output_dir: str = "data", generate_metadata: bool = True) -> dict: """Extract a spreadsheet using LlamaSheets."""
client = LlamaSheets(api_key=os.getenv("LLAMA_CLOUD_API_KEY"))
print(f"Extracting {file_path}...")
# Extract tables config = SpreadsheetParsingConfig( sheet_names=None, # Extract all sheets generate_additional_metadata=generate_metadata )
job_result = await client.aextract_tables(file_path, config=config)
print(f"Extracted {len(job_result.tables)} table(s)")
# Create output directory output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True)
# Get base name for files base_name = Path(file_path).stem
# Save job metadata job_metadata_path = output_path / f"{base_name}_job_metadata.json" with open(job_metadata_path, "w") as f: json.dump(job_result.model_dump(mode="json"), f, indent=2) print(f"Saved job metadata to {job_metadata_path}")
# Download each table for idx, table in enumerate(job_result.tables, 1): sheet_name = table.sheet_name.replace(" ", "_")
# Download table data table_bytes = await client.adownload_table_result( job_id=job_result.id, table_id=table.table_id, result_type=SpreadsheetResultType.TABLE )
table_path = output_path / f"{base_name}_table_{idx}_{sheet_name}.parquet" with open(table_path, "wb") as f: f.write(table_bytes) print(f" Table {idx}: {table_path}")
# Download metadata metadata_bytes = await client.adownload_table_result( job_id=job_result.id, table_id=table.table_id, result_type=SpreadsheetResultType.CELL_METADATA )
metadata_path = output_path / f"{base_name}_metadata_{idx}_{sheet_name}.parquet" with open(metadata_path, "wb") as f: f.write(metadata_bytes) print(f" Metadata {idx}: {metadata_path}")
print(f"\nAll files saved to {output_path}/")
return job_result.model_dump(mode="json")
if __name__ == "__main__": import sys
if len(sys.argv) < 2: print("Usage: python scripts/extract.py <spreadsheet_file>") sys.exit(1)
file_path = sys.argv[1]
if not Path(file_path).exists(): print(f"โ File not found: {file_path}") sys.exit(1)
result = asyncio.run(extract_spreadsheet(file_path)) print(f"\nโ
Extraction complete! Job ID: {result['id']}")Step 6: Generate Sample Data (Optional)
Section titled โStep 6: Generate Sample Data (Optional)โTo follow along with the workflows below, you can generate sample spreadsheets using the provided generator script.
Download and save as generate_sample_data.py:
# Get the script from GitHub examples or create it from the documentationcurl -o generate_sample_data.py https://raw.githubusercontent.com/run-llama/llama-cloud-services/main/examples/sheets/coding-agent-analysis/generate_sample_data.pyOr copy the full script here (click to expand)
"""Generate sample spreadsheets for LlamaSheets + Claude workflows.
This script creates example Excel files that demonstrate different use cases:1. Simple data table (for Workflow 1)2. Regional sales data (for Workflow 2)3. Complex budget with formatting (for Workflow 3)4. Weekly sales report (for Workflow 4)
Usage: python generate_sample_data.py"""
import randomfrom datetime import datetime, timedeltafrom pathlib import Path
import pandas as pdfrom openpyxl import Workbookfrom openpyxl.styles import Font, PatternFill, Alignment, Border, Side
def generate_workflow_1_data(output_dir: Path): """Generate simple financial report for Workflow 1.""" print("๐ Generating Workflow 1: financial_report_q1.xlsx")
# Create sample quarterly data months = ["January", "February", "March"] categories = ["Revenue", "Cost of Goods Sold", "Operating Expenses", "Net Income"]
data = [] for category in categories: row = {"Category": category} for month in months: if category == "Revenue": value = random.randint(80000, 120000) elif category == "Cost of Goods Sold": value = random.randint(30000, 50000) elif category == "Operating Expenses": value = random.randint(20000, 35000) else: # Net Income value = row.get("January", 0) + row.get("February", 0) + row.get("March", 0) value = random.randint(15000, 40000) row[month] = value data.append(row)
df = pd.DataFrame(data)
# Write to Excel output_file = output_dir / "financial_report_q1.xlsx" with pd.ExcelWriter(output_file, engine="openpyxl") as writer: df.to_excel(writer, sheet_name="Q1 Summary", index=False)
# Format it nicely worksheet = writer.sheets["Q1 Summary"] for cell in worksheet[1]: # Header row cell.font = Font(bold=True) cell.fill = PatternFill(start_color="4F81BD", end_color="4F81BD", fill_type="solid") cell.font = Font(color="FFFFFF", bold=True)
print(f" โ
Created {output_file}")
def generate_workflow_2_data(output_dir: Path): """Generate regional sales data for Workflow 2.""" print("\n๐ Generating Workflow 2: Regional sales data")
regions = ["northeast", "southeast", "west"] products = ["Widget A", "Widget B", "Widget C", "Gadget X", "Gadget Y"]
for region in regions: data = [] start_date = datetime(2024, 1, 1)
# Generate 90 days of sales data for day in range(90): date = start_date + timedelta(days=day) # Random number of sales per day (3-8) for _ in range(random.randint(3, 8)): product = random.choice(products) units_sold = random.randint(1, 20) price_per_unit = random.randint(50, 200) revenue = units_sold * price_per_unit
data.append({ "Date": date.strftime("%Y-%m-%d"), "Product": product, "Units_Sold": units_sold, "Revenue": revenue })
df = pd.DataFrame(data)
# Write to Excel output_file = output_dir / f"sales_{region}.xlsx" df.to_excel(output_file, sheet_name="Sales", index=False) print(f" โ
Created {output_file} ({len(df)} rows)")
def generate_workflow_3_data(output_dir: Path): """Generate complex budget spreadsheet with formatting for Workflow 3.""" print("\n๐ Generating Workflow 3: company_budget_2024.xlsx")
wb = Workbook() ws = wb.active ws.title = "Budget"
# Define departments with colors departments = { "Engineering": "C6E0B4", "Marketing": "FFD966", "Sales": "F4B084", "Operations": "B4C7E7" }
# Define categories categories = { "Personnel": ["Salaries", "Benefits", "Training"], "Infrastructure": ["Office Rent", "Equipment", "Software Licenses"], "Operations": ["Travel", "Supplies", "Miscellaneous"] }
# Styles header_font = Font(bold=True, size=12) category_font = Font(bold=True, size=11) dept_fonts = {dept: Font(size=10) for dept in departments}
row = 1
# Title ws.merge_cells(f"A{row}:E{row}") ws[f"A{row}"] = "2024 Annual Budget" ws[f"A{row}"].font = Font(bold=True, size=14) ws[f"A{row}"].alignment = Alignment(horizontal="center") row += 2
# Headers ws[f"A{row}"] = "Category" ws[f"B{row}"] = "Item" for i, dept in enumerate(departments.keys()): ws.cell(row, 3 + i, dept) ws.cell(row, 3 + i).font = header_font
for cell in ws[row]: cell.font = header_font row += 1
# Data for category, items in categories.items(): # Category header (bold) ws[f"A{row}"] = category ws[f"A{row}"].font = category_font row += 1
# Items with department budgets for item in items: ws[f"A{row}"] = "" ws[f"B{row}"] = item
# Add budget amounts for each department (with color) for i, (dept, color) in enumerate(departments.items()): amount = random.randint(5000, 50000) cell = ws.cell(row, 3 + i, amount) cell.fill = PatternFill(start_color=color, end_color=color, fill_type="solid") cell.number_format = "$#,##0"
row += 1
row += 1 # Blank row between categories
# Adjust column widths ws.column_dimensions["A"].width = 20 ws.column_dimensions["B"].width = 25 for i in range(len(departments)): ws.column_dimensions[chr(67 + i)].width = 15 # C, D, E, F
output_file = output_dir / "company_budget_2024.xlsx" wb.save(output_file) print(f" โ
Created {output_file}") print(f" โข Bold categories, colored departments, merged title cell")
def generate_workflow_4_data(output_dir: Path): """Generate weekly sales report for Workflow 4.""" print("\n๐ Generating Workflow 4: sales_weekly.xlsx")
products = [ "Product A", "Product B", "Product C", "Product D", "Product E", "Product F", "Product G", "Product H" ]
# Generate one week of data data = [] start_date = datetime(2024, 11, 4) # Monday
for day in range(7): date = start_date + timedelta(days=day) # Each product has 3-10 transactions per day for product in products: for _ in range(random.randint(3, 10)): units = random.randint(1, 15) price = random.randint(20, 150) revenue = units * price
data.append({ "Date": date.strftime("%Y-%m-%d"), "Product": product, "Units": units, "Revenue": revenue })
df = pd.DataFrame(data)
# Write to Excel with some formatting output_file = output_dir / "sales_weekly.xlsx" with pd.ExcelWriter(output_file, engine="openpyxl") as writer: df.to_excel(writer, sheet_name="Weekly Sales", index=False)
# Format header worksheet = writer.sheets["Weekly Sales"] for cell in worksheet[1]: cell.font = Font(bold=True)
print(f" โ
Created {output_file} ({len(df)} rows)")
def main(): """Generate all sample data files.""" print("=" * 60) print("Generating Sample Data for LlamaSheets + Coding Agent Workflows") print("=" * 60)
# Create output directory output_dir = Path("input_data") output_dir.mkdir(exist_ok=True)
# Generate data for each workflow generate_workflow_1_data(output_dir) generate_workflow_2_data(output_dir) generate_workflow_3_data(output_dir) generate_workflow_4_data(output_dir)
print("\n" + "=" * 60) print("โ
All sample data generated!") print("=" * 60) print(f"\nFiles created in {output_dir.absolute()}:") print("\nWorkflow 1 (Understanding a New Spreadsheet):") print(" โข financial_report_q1.xlsx") print("\nWorkflow 2 (Generating Analysis Scripts):") print(" โข sales_northeast.xlsx") print(" โข sales_southeast.xlsx") print(" โข sales_west.xlsx") print("\nWorkflow 3 (Using Cell Metadata):") print(" โข company_budget_2024.xlsx") print("\nWorkflow 4 (Complete Automation):") print(" โข sales_weekly.xlsx") print("\nYou can now use these files with the workflows in the documentation!")
if __name__ == "__main__": main()Generate all sample files:
python generate_sample_data.pyThis creates in input_data/:
financial_report_q1.xlsx- Simple financial data (Workflow 1)sales_northeast.xlsx,sales_southeast.xlsx,sales_west.xlsx- Regional sales (Workflow 2)company_budget_2024.xlsx- Budget with formatting (Workflow 3)sales_weekly.xlsx- Weekly report data (Workflow 4)
Step 7: Extract Your First Spreadsheet
Section titled โStep 7: Extract Your First Spreadsheetโ# Extract sample data or your own spreadsheetpython scripts/extract.py input_data/financial_report_q1.xlsx# Or use your own:# python scripts/extract.py your_spreadsheet.xlsxThis will create files in data/:
financial_report_q1_table_1_Q1_Summary.parquetfinancial_report_q1_metadata_1_Q1_Summary.parquetfinancial_report_q1_job_metadata.json
Step 8: Open Your Project with your Agent
Section titled โStep 8: Open Your Project with your AgentโNow youโre ready to work with your Agent! Open Claude Code (or similar) and point it to your project directory. The agent will be able to:
- Read your CLAUDE.md/AGENTS.md to understand the project structure
- Access the extracted parquet files in
data/ - Write analysis scripts to
scripts/ - Generate reports to
reports/
Common Workflows
Section titled โCommon WorkflowsโNow that your project is set up, here are specific workflows you can follow with your agent.
Workflow 1: Understanding a New Spreadsheet
Section titled โWorkflow 1: Understanding a New SpreadsheetโGoal: Figure out what data you have and how itโs structured.
Steps:
- Extract your spreadsheet:
python scripts/extract.py input_data/financial_report_q1.xlsx- Ask the Agent:
I just extracted financial_report_q1.xlsx. Can you help me understand what's in this data? Please:1. Read the job metadata to see what tables were extracted2. Look at the table data to understand the columns and data types3. Check the cell metadata to identify any formatting patterns (bold headers, colored cells, etc.)4. Summarize what you findWhat the Agent will do:
The Agent will read the files, execute some python commands, and output itโs findings. You might see something like this:
Summary of financial_report_q1.xlsx
1. Extraction Overview
- Status: SUCCESS - Tables extracted: 1 table from sheet "Q1 Summary" - Location: A1:D5 (4 columns ร 5 rows) - Description: Q1 Financial Summary with income statement metrics
2. Table Data Structure
Dimensions: 5 rows ร 4 columns
Content: Category | January | February | March ---------------------|---------|----------|------- Revenue | 110,271 | 92,200 | 116,199 Cost of Goods Sold | 35,327 | 35,331 | 47,825 Operating Expenses | 25,127 | 21,747 | 33,692 Net Income | 28,453 | 16,475 | 23,312
Data Types: - 12 numeric cells (monthly values) - 8 text cells (headers and category labels)
3. Formatting Patterns
Bold Formatting: - First row only: "Category", "January", "February", "March" - Indicates header row
Background Colors: - Header row (A1:D1): Light gray background (RGB: 0.474078) - Data rows (A2:D5): No background color (white)
Borders: - Header row has border styling (score: 4.0) - Creates visual separation between headers and data
Alignment: - All cells use standard alignment (horizontal: 1.0, vertical: 0.0)
4. Key Insights
- Well-structured data: Clear header row with proper formatting - Financial metrics: Standard income statement structure (Revenue โ COGS โ OpEx โ Net Income) - Time series: Three months of Q1 data (Jan-Mar) - Clean extraction: No merged cells, no special characters, consistent formatting - Data quality: All numeric values successfully extracted as numbers (not strings)
The spreadsheet follows best practices with clear visual hierarchy (bold/colored headers) making it easy to identify data structure programmatically.When to use this: Any time you receive a new spreadsheet and need to understand its structure before analysis.
Workflow 2: Generating Analysis Scripts
Section titled โWorkflow 2: Generating Analysis ScriptsโGoal: Create a reusable Python script for recurring analysis tasks.
Steps:
- Extract your spreadsheets (can be multiple):
python scripts/extract.py input_data/sales_northeast.xlsxpython scripts/extract.py input_data/sales_southeast.xlsxpython scripts/extract.py input_data/sales_west.xlsx- Ask the Agent:
I've extracted sales data from 3 regional spreadsheets. Can you create a script in scripts/analyze_sales.py that:1. Loads all the regional data2. Combines them and calculates total revenue by product and region3. Finds the top 5 products overall4. Saves a summary report to reports/sales_summary.csvWhat the Agent will do:
The coding Agent will likely create a script like scripts/analyze_sales.py and likely run the script for you:
"""Sales Analysis Script
Loads regional sales data extracted by LlamaSheets, combines them,calculates revenue by product and region, identifies top products,and generates a summary report."""
import pandas as pdfrom pathlib import Path
def load_regional_data(data_dir: Path) -> pd.DataFrame: """ Load sales data from all regional spreadsheets.
Args: data_dir: Path to the data directory containing extracted parquet files
Returns: Combined DataFrame with all regional sales data """ regions = { 'Northeast': 'sales_northeast_table_2_Sales.parquet', 'Southeast': 'sales_southeast_table_2_Sales.parquet', 'West': 'sales_west_table_2_Sales.parquet' }
regional_dataframes = []
for region, filename in regions.items(): filepath = data_dir / filename
if not filepath.exists(): print(f"Warning: {filename} not found, skipping {region}") continue
# Load the data df = pd.read_parquet(filepath)
# Rename columns to meaningful names (based on header row analysis) df.columns = ['Date', 'Product', 'Units_Sold', 'Revenue']
# Add region identifier df['Region'] = region
# Convert data types df['Date'] = pd.to_datetime(df['Date']) df['Units_Sold'] = pd.to_numeric(df['Units_Sold'], errors='coerce') df['Revenue'] = pd.to_numeric(df['Revenue'], errors='coerce')
regional_dataframes.append(df) print(f"Loaded {len(df)} records from {region}")
# Combine all regional data combined_df = pd.concat(regional_dataframes, ignore_index=True) print(f"\nTotal records loaded: {len(combined_df)}")
return combined_df
def calculate_revenue_summary(df: pd.DataFrame) -> pd.DataFrame: """ Calculate total revenue by product and region.
Args: df: Combined sales DataFrame
Returns: DataFrame with revenue aggregated by product and region """ revenue_by_product_region = df.groupby(['Product', 'Region']).agg({ 'Revenue': 'sum', 'Units_Sold': 'sum' }).reset_index()
revenue_by_product_region = revenue_by_product_region.sort_values( ['Product', 'Revenue'], ascending=[True, False] )
return revenue_by_product_region
def find_top_products(df: pd.DataFrame, top_n: int = 5) -> pd.DataFrame: """ Find the top N products by total revenue across all regions.
Args: df: Combined sales DataFrame top_n: Number of top products to return
Returns: DataFrame with top products and their total revenue """ top_products = df.groupby('Product').agg({ 'Revenue': 'sum', 'Units_Sold': 'sum' }).reset_index()
top_products = top_products.sort_values('Revenue', ascending=False).head(top_n) top_products['Rank'] = range(1, len(top_products) + 1)
return top_products[['Rank', 'Product', 'Units_Sold', 'Revenue']]
def generate_summary_report(df: pd.DataFrame, output_path: Path) -> None: """ Generate and save a comprehensive sales summary report.
Args: df: Combined sales DataFrame output_path: Path where the summary CSV should be saved """ # Calculate revenue by product and region print("\n=== Revenue by Product and Region ===") revenue_summary = calculate_revenue_summary(df) print(revenue_summary.to_string(index=False))
# Find top 5 products print("\n=== Top 5 Products Overall ===") top_products = find_top_products(df, top_n=5) print(top_products.to_string(index=False))
# Create a comprehensive summary with both views # Pivot the revenue by product/region for easier reading revenue_pivot = revenue_summary.pivot( index='Product', columns='Region', values='Revenue' ).reset_index()
# Add total revenue column revenue_pivot['Total_Revenue'] = revenue_pivot[['Northeast', 'Southeast', 'West']].sum(axis=1) revenue_pivot = revenue_pivot.sort_values('Total_Revenue', ascending=False)
# Save to CSV output_path.parent.mkdir(parents=True, exist_ok=True) revenue_pivot.to_csv(output_path, index=False, float_format='%.2f')
print(f"\nโ Summary report saved to {output_path}") print(f"\nSummary Statistics:") print(f" Total Revenue: ${df['Revenue'].sum():,.2f}") print(f" Total Units Sold: {df['Units_Sold'].sum():,.0f}") print(f" Number of Transactions: {len(df):,}") print(f" Number of Products: {df['Product'].nunique()}") print(f" Date Range: {df['Date'].min().date()} to {df['Date'].max().date()}")
def main(): """Main execution function.""" # Set up paths base_dir = Path(__file__).parent.parent data_dir = base_dir / 'data' reports_dir = base_dir / 'reports' output_file = reports_dir / 'sales_summary.csv'
print("Loading regional sales data...") combined_data = load_regional_data(data_dir)
print("\nGenerating summary report...") generate_summary_report(combined_data, output_file)
if __name__ == '__main__': main()And if the agent runs the script for you:
Perfect! I've successfully created the sales analysis script at scripts/analyze_sales.py. Here's what it does:
Script Features
1. Loads all regional data: Automatically loads data from all 3 regions (Northeast, Southeast, West) 2. Combines and calculates: Merges the data and computes total revenue by product and region 3. Top 5 products: Identifies the best-performing products overall 4. Saves summary: Exports a clean CSV report to reports/sales_summary.csv
Key Findings
Top 5 Products by Revenue: 1. Widget C: $403,778 (3,158 units) 2. Widget B: $401,950 (3,168 units) 3. Gadget Y: $395,928 (3,059 units) 4. Widget A: $385,192 (2,985 units) 5. Gadget X: $361,464 (2,930 units)
Overall Stats: - Total Revenue: $1,948,312 - Total Units Sold: 15,300 - Total Transactions: 1,441 - Date Range: Q1 2024 (Jan 1 - Mar 30)
The summary report at reports/sales_summary.csv shows revenue broken down by product and region in an easy-to-read format. You can run the script anytime with:
python3 scripts/analyze_sales.py- Run the script yourself to test!
python scripts/analyze_sales.pyWhen to use this: When you need reusable analysis that runs regularly (weekly reports, monthly summaries, etc.)
Workflow 3: Using Cell Metadata to Understand Structure
Section titled โWorkflow 3: Using Cell Metadata to Understand StructureโGoal: Extract structured data from a messy spreadsheet by using formatting cues.
Steps:
- Extract your complex spreadsheet:
python scripts/extract.py input_data/company_budget_2024.xlsx- Ask the Agent:
I have a budget spreadsheet where:- Bold cells in the first column are category headers- Different background colors represent different departments- Some cells are merged for section groupings
Can you create a script that uses the cell metadata to parse this into a clean, structured dataset with proper category and department labels?What the Agent will do:
The Agent will likely create a script like scripts/parse_budget_with_metadata.py and likely run the script for you:
"""Parse budget spreadsheet using LlamaSheets metadata.
This script uses cell metadata (bold formatting, background colors, merged cells)to parse a budget spreadsheet into a clean, structured dataset with propercategory and department labels."""
import pandas as pdimport jsonfrom pathlib import Pathfrom typing import Dict, List, Tuple
def load_budget_data(data_dir: str = "data", prefix: str = "company_budget_2024") -> Tuple[pd.DataFrame, pd.DataFrame, dict]: """Load table data, metadata, and job metadata.""" data_path = Path(data_dir)
# Load job metadata to understand structure with open(data_path / f"{prefix}_job_metadata.json") as f: job_meta = json.load(f)
# Load first table (main budget data) table_df = pd.read_parquet(data_path / f"{prefix}_table_1_Budget.parquet") metadata_df = pd.read_parquet(data_path / f"{prefix}_metadata_1_Budget.parquet")
# Flatten multi-level column names if present if isinstance(table_df.columns, pd.MultiIndex): table_df.columns = [col[0] if col[1] == '' else f"{col[0]}_{col[1]}" for col in table_df.columns]
return table_df, metadata_df, job_meta
def identify_categories(metadata_df: pd.DataFrame) -> Dict[int, str]: """ Identify category headers using bold formatting in the first column.
Returns: Dict mapping row numbers to category names """ # Find bold cells in the first column (column_number == 1) bold_first_col = metadata_df[ (metadata_df['font_bold'] == True) & (metadata_df['column_number'] == 1) ]
categories = {} for _, row in bold_first_col.iterrows(): row_num = row['row_number'] category_name = row['cell_value'] categories[row_num] = category_name
print(f"Found {len(categories)} categories:") for row_num, cat in sorted(categories.items()): print(f" Row {row_num}: {cat}")
return categories
def map_colors_to_departments(metadata_df: pd.DataFrame, table_df: pd.DataFrame) -> Dict[float, str]: """ Map background colors to department names using the column headers.
Returns: Dict mapping background color values to department names """ # Get unique background colors (excluding 0.0 which is no color) colors = sorted([c for c in metadata_df['background_color_rgb'].unique() if c > 0])
# Get department names from table columns (skip first two columns: Category and Item) dept_columns = [col for col in table_df.columns if col not in ['Category', 'Item']]
# Map colors to departments # Assumption: colors are used in the same order as departments appear in columns color_map = {} for i, color in enumerate(colors): if i < len(dept_columns): color_map[color] = dept_columns[i]
print(f"\nColor to Department mapping:") for color, dept in color_map.items(): print(f" Color {color:.4f}: {dept}")
return color_map
def assign_category_to_rows(table_df: pd.DataFrame, categories: Dict[int, str]) -> List[str]: """ Assign category labels to each row based on bold headers. Rows belong to the most recent category header above them. """ category_labels = [] current_category = None category_values = set(categories.values())
# Categories dict uses absolute row numbers from metadata # Need to map to table dataframe row indices for idx in range(len(table_df)): # Check if this row is a category header row_value = table_df.iloc[idx]['Category_l_0'] if pd.notna(row_value) and row_value in category_values: current_category = row_value category_labels.append(current_category) else: category_labels.append(current_category)
return category_labels
def parse_budget_to_structured_data(table_df: pd.DataFrame, metadata_df: pd.DataFrame) -> pd.DataFrame: """ Parse the budget spreadsheet into a clean, structured dataset.
Returns: DataFrame with columns: Category, Item, Department, Amount """ # Step 1: Identify categories from bold cells categories = identify_categories(metadata_df)
# Step 2: Map colors to departments color_map = map_colors_to_departments(metadata_df, table_df)
# Step 3: Build mapping of (row, col) to department based on background color cell_to_dept = {} for idx, row in metadata_df.iterrows(): row_num = row['row_number'] col_num = row['column_number'] bg_color = row['background_color_rgb'] if bg_color in color_map: cell_to_dept[(row_num, col_num)] = color_map[bg_color]
# Step 4: Create structured dataset records = []
# Assign categories to rows category_labels = assign_category_to_rows(table_df, categories)
# Get department columns (skip Category and Item) dept_columns = [col for col in table_df.columns if not col.startswith('Category') and not col.startswith('Item')]
for idx, row in table_df.iterrows(): category = category_labels[idx] item = row['Item_l_1']
# Skip rows that are category headers or have no item if pd.isna(item) or item == 'None': continue
# Extract values for each department for dept in dept_columns: amount = row[dept] if pd.notna(amount): records.append({ 'Category': category, 'Item': item, 'Department': dept, 'Amount': amount })
structured_df = pd.DataFrame(records)
print(f"\nStructured dataset created:") print(f" Total records: {len(structured_df)}") print(f" Categories: {structured_df['Category'].nunique()}") print(f" Items: {structured_df['Item'].nunique()}") print(f" Departments: {structured_df['Department'].nunique()}")
return structured_df
def main(): """Main execution function.""" print("=" * 60) print("Budget Spreadsheet Parser - Using LlamaSheets Metadata") print("=" * 60) print()
# Load data print("Loading data...") table_df, metadata_df, job_meta = load_budget_data() print(f"Loaded table with shape: {table_df.shape}") print(f"Loaded metadata with shape: {metadata_df.shape}") print()
# Parse into structured format print("Parsing budget data...") print("-" * 60) structured_df = parse_budget_to_structured_data(table_df, metadata_df)
# Save results output_path = Path("data/budget_structured.parquet") structured_df.to_parquet(output_path, index=False) print() print(f"Saved structured data to: {output_path}")
# Display sample results print() print("=" * 60) print("Sample Results:") print("=" * 60) print(structured_df.head(15).to_string(index=False))
# Summary statistics print() print("=" * 60) print("Summary by Category:") print("=" * 60) summary = structured_df.groupby('Category')['Amount'].agg(['count', 'sum', 'mean']) print(summary)
print() print("=" * 60) print("Summary by Department:") print("=" * 60) dept_summary = structured_df.groupby('Department')['Amount'].agg(['count', 'sum', 'mean']) print(dept_summary)
return structured_df
if __name__ == "__main__": structured_df = main()And if the agent runs the script for you, it might say:
Summary
I've created two comprehensive scripts that parse your budget spreadsheet using LlamaSheets cell metadata:
Created Files
1. scripts/parse_budget_with_metadata.py - Basic parser demonstrating core concepts- Uses bold formatting to identify category headers (Personnel, Infrastructure, Operations)- Maps background colors to departments- Creates clean structured output2. scripts/parse_budget_advanced.py - Enhanced parser with full analysis- Detailed metadata inspection (bold cells, colors, merged cells, data types)- Data validation (verifies totals match: $986,239)- Multiple output formats (structured + pivot view)- Comprehensive summary statistics3. scripts/README.md - Complete documentation with examples and patterns
Output Files
- data/budget_structured.parquet - 36 records in clean format (Category, Item, Department, Amount)- data/budget_structured_advanced.parquet - Same data with enhanced validation- data/budget_pivot_view.parquet - Pivot table matching original layout
Key Features
The scripts demonstrate how to:- Identify structure from formatting: Bold cells in column 1 = category headers- Use background colors: 4 different colors mapped to 4 departments (Engineering, Marketing, Sales, Operations)- Handle complex layouts: Navigate rows with different meanings (headers vs data)- Validate results: Ensure parsed data totals match original ($986,239 โ)
Usage
# Run basic parserpython3 scripts/parse_budget_with_metadata.py
# Run advanced parser with full analysispython3 scripts/parse_budget_advanced.py
The scripts are fully reusable - just update the prefix parameter in load_budget_data() to parse other budget files withsimilar formatting patterns!- Run the script:
python scripts/parse_budget_with_metadata.pyWhen to use this: When you have complex spreadsheets with visual formatting that conveys structure (headers, groupings, categories).
Workflow 4: Building Complete Automation
Section titled โWorkflow 4: Building Complete AutomationโGoal: Create an end-to-end pipeline that runs regularly without manual intervention.
Steps:
- Ask the Agent:
I receive a weekly sales report as input_data/sales_weekly.xlsx. I need a complete automation that:1. Extracts the data using LlamaSheets2. Validates that all expected columns are present (Date, Product, Units, Revenue)3. Calculates KPIs: total revenue, top 5 products, daily average4. Generates a text report and saves the data as CSV5. Can be run with a single command
Can you create this as scripts/weekly_report.py?What the Agent will do:
The Agent will likely create a script like scripts/weekly_report.py and run it for you:
"""Weekly Sales Report Automation
This script automates the complete workflow for processing weekly sales reports:1. Extracts data from input_data/sales_weekly.xlsx using LlamaSheets2. Validates that all expected columns are present3. Calculates key performance indicators (KPIs)4. Generates a text report and saves data as CSV"""
import asyncioimport jsonimport osfrom datetime import datetimefrom pathlib import Pathfrom typing import Dict, Optional
import pandas as pdfrom llama_cloud_services.beta.sheets import LlamaSheetsfrom llama_cloud_services.beta.sheets.types import ( SpreadsheetParsingConfig, SpreadsheetResultType,)
class WeeklyReportProcessor: """Handles the complete weekly sales report processing workflow."""
REQUIRED_COLUMNS = ["Date", "Product", "Units", "Revenue"]
def __init__(self, input_file: str = "input_data/sales_weekly.xlsx"): """ Initialize the processor.
Args: input_file: Path to the input Excel file """ self.input_file = Path(input_file) self.base_dir = Path(__file__).parent.parent self.data_dir = self.base_dir / "data" self.reports_dir = self.base_dir / "reports" self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Ensure directories exist self.data_dir.mkdir(parents=True, exist_ok=True) self.reports_dir.mkdir(parents=True, exist_ok=True)
async def extract_data(self) -> Dict: """ Extract the spreadsheet using LlamaSheets.
Returns: Job result metadata
Raises: FileNotFoundError: If input file doesn't exist Exception: If extraction fails """ if not self.input_file.exists(): raise FileNotFoundError(f"Input file not found: {self.input_file}")
print(f"๐ Extracting data from {self.input_file}...")
# Initialize LlamaSheets client client = LlamaSheets( base_url="https://api.staging.llamaindex.ai", api_key=os.getenv("LLAMA_CLOUD_API_KEY") )
# Configure extraction config = SpreadsheetParsingConfig( sheet_names=None, # Extract all sheets generate_additional_metadata=True )
# Extract tables job_result = await client.aextract_tables(str(self.input_file), config=config)
print(f" โ Extracted {len(job_result.tables)} table(s)")
# Save job metadata base_name = self.input_file.stem job_metadata_path = self.data_dir / f"{base_name}_job_metadata.json" with open(job_metadata_path, "w") as f: json.dump(job_result.model_dump(mode="json"), f, indent=2)
# Identify the main data table and header table if not job_result.tables: raise Exception("No tables extracted from the spreadsheet")
data_table = None header_table = None
for tbl in job_result.tables: # Check if this looks like a header row (single row in A1) if tbl.table_location.startswith("A1:") and tbl.table_location.endswith("1"): header_table = tbl else: data_table = tbl if data_table is None else data_table
# Use the data table (or first table if we can't determine) table = data_table if data_table else job_result.tables[0] sheet_name = table.sheet_name.replace(" ", "_")
# Download the header table if available if header_table: header_bytes = await client.adownload_table_result( job_id=job_result.id, table_id=header_table.table_id, result_type=SpreadsheetResultType.TABLE ) header_df = pd.read_parquet(pd.io.common.BytesIO(header_bytes)) # Store header names for later use self.detected_headers = header_df.iloc[0].tolist() if len(header_df) > 0 else None if self.detected_headers: print(f" โ Detected headers from table: {', '.join(self.detected_headers)}") else: self.detected_headers = None
# Download table data table_bytes = await client.adownload_table_result( job_id=job_result.id, table_id=table.table_id, result_type=SpreadsheetResultType.TABLE )
table_path = self.data_dir / f"{base_name}_table_1_{sheet_name}.parquet" with open(table_path, "wb") as f: f.write(table_bytes)
print(f" โ Table saved to {table_path}")
# Download metadata metadata_bytes = await client.adownload_table_result( job_id=job_result.id, table_id=table.table_id, result_type=SpreadsheetResultType.CELL_METADATA )
metadata_path = self.data_dir / f"{base_name}_metadata_1_{sheet_name}.parquet" with open(metadata_path, "wb") as f: f.write(metadata_bytes)
print(f" โ Metadata saved to {metadata_path}")
# Store the paths for later use self.table_path = table_path self.metadata_path = metadata_path
return job_result.model_dump(mode="json")
def detect_and_rename_columns(self, df: pd.DataFrame) -> pd.DataFrame: """ Detect and rename columns using headers extracted during data extraction.
Args: df: DataFrame with generic column names
Returns: DataFrame with properly named columns """ # Check if columns already have proper names if all(col in df.columns for col in self.REQUIRED_COLUMNS): return df
print("\n๐ง Renaming columns...")
# Use detected headers from extraction if hasattr(self, 'detected_headers') and self.detected_headers: if len(self.detected_headers) == len(df.columns): df.columns = self.detected_headers print(f" โ Renamed columns: {', '.join(self.detected_headers)}") else: print(f" โ Warning: Header count mismatch ({len(self.detected_headers)} headers, {len(df.columns)} columns)") else: print(f" โ Warning: No headers detected, keeping generic column names")
return df
def validate_data(self, df: pd.DataFrame) -> None: """ Validate that all required columns are present.
Args: df: DataFrame to validate
Raises: ValueError: If required columns are missing """ print("\n๐ Validating data...")
missing_columns = [col for col in self.REQUIRED_COLUMNS if col not in df.columns]
if missing_columns: raise ValueError( f"Missing required columns: {', '.join(missing_columns)}\n" f"Found columns: {', '.join(df.columns)}" )
print(f" โ All required columns present: {', '.join(self.REQUIRED_COLUMNS)}")
# Check for missing values missing_counts = df[self.REQUIRED_COLUMNS].isnull().sum() if missing_counts.any(): print(f" โ Warning: Found missing values:") for col, count in missing_counts[missing_counts > 0].items(): print(f" - {col}: {count} missing values")
print(f" โ Dataset contains {len(df)} rows")
def clean_data(self, df: pd.DataFrame) -> pd.DataFrame: """ Clean and prepare the data for analysis.
Args: df: Raw DataFrame
Returns: Cleaned DataFrame """ print("\n๐งน Cleaning data...")
df_clean = df.copy()
# Convert data types df_clean['Date'] = pd.to_datetime(df_clean['Date'], errors='coerce') df_clean['Units'] = pd.to_numeric(df_clean['Units'], errors='coerce') df_clean['Revenue'] = pd.to_numeric(df_clean['Revenue'], errors='coerce')
# Remove rows with invalid data initial_count = len(df_clean) df_clean = df_clean.dropna(subset=self.REQUIRED_COLUMNS) removed_count = initial_count - len(df_clean)
if removed_count > 0: print(f" โ Removed {removed_count} rows with invalid data")
print(f" โ Clean dataset: {len(df_clean)} rows")
return df_clean
def calculate_kpis(self, df: pd.DataFrame) -> Dict: """ Calculate key performance indicators.
Args: df: Cleaned DataFrame
Returns: Dictionary containing KPI values """ print("\n๐ Calculating KPIs...")
kpis = {}
# Total revenue kpis['total_revenue'] = df['Revenue'].sum() print(f" โ Total Revenue: ${kpis['total_revenue']:,.2f}")
# Top 5 products by revenue top_products = df.groupby('Product')['Revenue'].sum().sort_values(ascending=False).head(5) kpis['top_5_products'] = top_products.to_dict() print(f" โ Top 5 Products identified")
# Daily average revenue daily_revenue = df.groupby('Date')['Revenue'].sum() kpis['daily_average'] = daily_revenue.mean() kpis['num_days'] = len(daily_revenue) print(f" โ Daily Average Revenue: ${kpis['daily_average']:,.2f} (across {kpis['num_days']} days)")
# Additional useful KPIs kpis['total_units'] = df['Units'].sum() kpis['num_products'] = df['Product'].nunique() kpis['num_transactions'] = len(df) kpis['date_range'] = { 'start': df['Date'].min().strftime('%Y-%m-%d'), 'end': df['Date'].max().strftime('%Y-%m-%d') }
return kpis
def generate_text_report(self, df: pd.DataFrame, kpis: Dict) -> str: """ Generate a formatted text report.
Args: df: Cleaned DataFrame kpis: Calculated KPIs
Returns: Formatted text report """ report_lines = [ "=" * 70, "WEEKLY SALES REPORT", "=" * 70, f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", f"Report Period: {kpis['date_range']['start']} to {kpis['date_range']['end']}", "", "-" * 70, "KEY PERFORMANCE INDICATORS", "-" * 70, f"Total Revenue: ${kpis['total_revenue']:>15,.2f}", f"Total Units Sold: {kpis['total_units']:>15,.0f}", f"Number of Transactions: {kpis['num_transactions']:>15,}", f"Number of Products: {kpis['num_products']:>15}", f"Number of Days: {kpis['num_days']:>15}", f"Daily Average Revenue: ${kpis['daily_average']:>15,.2f}", "", "-" * 70, "TOP 5 PRODUCTS BY REVENUE", "-" * 70, ]
# Add top products for rank, (product, revenue) in enumerate(kpis['top_5_products'].items(), 1): pct = (revenue / kpis['total_revenue']) * 100 report_lines.append(f"{rank}. {product:<40} ${revenue:>12,.2f} ({pct:>5.1f}%)")
report_lines.extend([ "", "-" * 70, "DAILY REVENUE BREAKDOWN", "-" * 70, ])
# Add daily breakdown daily_revenue = df.groupby('Date')['Revenue'].sum().sort_index() for date, revenue in daily_revenue.items(): report_lines.append(f"{date.strftime('%Y-%m-%d')} ${revenue:>15,.2f}")
report_lines.extend([ "", "=" * 70, "END OF REPORT", "=" * 70, ])
return "\n".join(report_lines)
def save_csv(self, df: pd.DataFrame) -> Path: """ Save the cleaned data as CSV.
Args: df: Cleaned DataFrame
Returns: Path to saved CSV file """ csv_path = self.reports_dir / f"sales_weekly_{self.timestamp}.csv" df.to_csv(csv_path, index=False) return csv_path
def save_report(self, report_text: str) -> Path: """ Save the text report to a file.
Args: report_text: Formatted report text
Returns: Path to saved report file """ report_path = self.reports_dir / f"weekly_report_{self.timestamp}.txt" with open(report_path, "w") as f: f.write(report_text) return report_path
async def run(self) -> None: """Execute the complete workflow.""" try: print("\n" + "=" * 70) print("WEEKLY SALES REPORT AUTOMATION") print("=" * 70)
# Step 1: Extract data await self.extract_data()
# Step 2: Load data print("\n๐ฅ Loading extracted data...") df = pd.read_parquet(self.table_path) print(f" โ Loaded {len(df)} rows, {len(df.columns)} columns")
# Step 3: Detect and rename columns using metadata df = self.detect_and_rename_columns(df)
# Step 4: Validate data self.validate_data(df)
# Step 5: Clean data df_clean = self.clean_data(df)
# Step 6: Calculate KPIs kpis = self.calculate_kpis(df_clean)
# Step 7: Generate text report print("\n๐ Generating report...") report_text = self.generate_text_report(df_clean, kpis) report_path = self.save_report(report_text) print(f" โ Report saved to {report_path}")
# Step 8: Save CSV print("\n๐พ Saving CSV export...") csv_path = self.save_csv(df_clean) print(f" โ CSV saved to {csv_path}")
# Display the report print("\n" + report_text)
print("\n" + "=" * 70) print("โ
AUTOMATION COMPLETE") print("=" * 70) print(f"\nOutputs:") print(f" - Text Report: {report_path}") print(f" - CSV Export: {csv_path}") print(f" - Table Data: {self.table_path}")
except Exception as e: print(f"\nโ Error: {e}") raise
def main(): """Main entry point.""" import sys
# Allow custom input file path input_file = "input_data/sales_weekly.xlsx" if len(sys.argv) > 1: input_file = sys.argv[1]
processor = WeeklyReportProcessor(input_file) asyncio.run(processor.run())
if __name__ == "__main__": main()And if the agent summarizes the script for you:
Excellent! I've successfully created scripts/weekly_report.py - a complete automation script for your weekly salesreports. Here's what it does:
Features
1. Data Extraction- Automatically extracts data from input_data/sales_weekly.xlsx using LlamaSheets- Intelligently detects header row and data table- Downloads both table data and metadata
2. Column Detection & Validation- Automatically detects column names (Date, Product, Units, Revenue)- Validates that all required columns are present- Reports any missing values found
3. Data Cleaning- Converts data types (dates, numbers)- Removes invalid rows- Reports data quality issues
4. KPI Calculation- Total revenue- Top 5 products by revenue (with percentages)- Daily average revenue- Additional metrics: total units, number of transactions, products, date range
5. Report Generation- Formatted text report with all KPIs and daily breakdown- CSV export of the cleaned data- Timestamped filenames to preserve history
Usage
Run with a single command:python3 scripts/weekly_report.py
Or specify a custom input file:python3 scripts/weekly_report.py path/to/your/file.xlsx
Test Results
The script successfully processed your sample data:- 368 transactions across 7 days- $242,943 total revenue- $34,706 daily average- All outputs saved to the reports/ directory
The automation is ready to use!- Run it weekly:
python scripts/weekly_report.py- (Optional) Automate with cron:
# Add to crontab to run every Monday at 9 AM0 9 * * 1 cd /path/to/project && /path/to/venv/bin/python scripts/weekly_report.pyWhen to use this: When you need a complete end-to-end pipeline that runs regularly without manual intervention.