Build an AI Document Processing Pipeline: PDFs, CSVs, and Emails at Scale
Organizations accumulate documents faster than humans can process them. Invoices, contracts, support emails, survey responses, financial reports — all sitting in inboxes and file systems, waiting for someone to read them and extract the relevant information.
Before LLMs, automating this required custom ML models trained on domain-specific data, complex regex patterns, or expensive human-in-the-loop services. Now, a well-prompted general-purpose LLM can do it reliably for most document types out of the box.
This tutorial builds a complete document processing pipeline: ingest PDFs, CSVs, and emails — extract structured data — output to a database or downstream system. Production-ready, runs unattended, handles errors gracefully.
Architecture Overview
Input Sources Processing Layer Output
───────────── ───────────────── ──────
PDF files ──▶ Text extraction ──▶
CSV files ──▶ LLM analysis ──▶ Structured JSON
Email inbox ──▶ Schema validation ──▶ Database / API
Error handling
The pipeline follows a simple contract: every document becomes a JSON object with defined fields. What happens downstream — storing in Postgres, calling a webhook, updating a CRM — is decoupled from the extraction logic.
Prerequisites
pip install anthropic pymupdf python-dotenv pydantic imapclient chardet
Create a .env file:
ANTHROPIC_API_KEY=sk-ant-...
Core Extraction Engine
The heart of the pipeline is a reusable extraction function that takes text and a target schema, and returns validated structured data:
# pipeline/extractor.py
import json
import anthropic
from pydantic import BaseModel, ValidationError
from typing import Type, TypeVar
T = TypeVar("T", bound=BaseModel)
client = anthropic.Anthropic()
def extract(
text: str,
schema: Type[T],
context: str = "",
max_retries: int = 2
) -> T:
"""
Extract structured data from text using LLM.
Returns a validated Pydantic model instance.
"""
schema_json = json.dumps(schema.model_json_schema(), indent=2)
prompt = f"""{context}
Extract information from the following document and return ONLY valid JSON
matching this schema exactly:
{schema_json}
Rules:
- Return null for fields that are missing or ambiguous
- Dates must be ISO 8601 format (YYYY-MM-DD)
- Monetary amounts as numbers, not strings (e.g. 1250.00 not "$1,250")
- If a field appears multiple times, use the most recent/authoritative value
<document>
{text[:12000]}
</document>
Return only the JSON object, no explanation."""
for attempt in range(max_retries + 1):
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}]
)
raw = message.content[0].text.strip()
# Strip markdown code blocks if present
if raw.startswith("```"):
raw = raw.split("```")[1]
if raw.startswith("json"):
raw = raw[4:]
raw = raw.rsplit("```", 1)[0].strip()
try:
data = json.loads(raw)
return schema(**data)
except (json.JSONDecodeError, ValidationError) as e:
if attempt == max_retries:
raise RuntimeError(
f"Extraction failed after {max_retries + 1} attempts: {e}\nRaw output: {raw}"
) from e
# Retry with error context
prompt += f"\n\nPrevious attempt failed with error: {e}\nPlease fix and try again."
This function handles the messy parts: JSON parsing failures, schema validation, and retry logic. The caller just provides a Pydantic schema and gets back a typed object.
Module 1: PDF Processing
Extracting Text from PDFs
# pipeline/pdf_processor.py
import fitz # PyMuPDF
from pathlib import Path
from pydantic import BaseModel
from typing import Optional
from .extractor import extract
def pdf_to_text(path: str) -> str:
"""Extract all text from a PDF, preserving some structure."""
doc = fitz.open(path)
pages = []
for page_num, page in enumerate(doc, 1):
text = page.get_text("text")
if text.strip():
pages.append(f"--- Page {page_num} ---\n{text}")
return "\n\n".join(pages)
Invoice Extraction
Define what you want to extract as a Pydantic model:
class LineItem(BaseModel):
description: str
quantity: Optional[float] = None
unit_price: Optional[float] = None
total: float
class Invoice(BaseModel):
invoice_number: str
invoice_date: Optional[str] = None # ISO 8601
due_date: Optional[str] = None
vendor_name: str
vendor_address: Optional[str] = None
customer_name: Optional[str] = None
line_items: list[LineItem]
subtotal: Optional[float] = None
tax_amount: Optional[float] = None
total_amount: float
currency: str = "USD"
payment_terms: Optional[str] = None
def process_invoice(pdf_path: str) -> Invoice:
text = pdf_to_text(pdf_path)
return extract(
text,
Invoice,
context="This is an invoice or bill document."
)
Contract Key Terms Extraction
from typing import List
class ContractTerms(BaseModel):
parties: List[str] # list of party names
effective_date: Optional[str] = None
expiration_date: Optional[str] = None
auto_renewal: Optional[bool] = None
notice_period_days: Optional[int] = None # for termination
governing_law: Optional[str] = None # jurisdiction
payment_amount: Optional[float] = None
payment_frequency: Optional[str] = None # monthly, annual, etc.
key_obligations: List[str] = [] # top 3-5 obligations per party
termination_conditions: List[str] = []
confidentiality_clause: bool = False
non_compete_clause: bool = False
def process_contract(pdf_path: str) -> ContractTerms:
text = pdf_to_text(pdf_path)
return extract(
text,
ContractTerms,
context="This is a legal contract or agreement."
)
Batch PDF Processing
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
def batch_process_invoices(input_dir: str, output_file: str, workers: int = 4):
pdf_files = list(Path(input_dir).glob("*.pdf"))
results = []
errors = []
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_path = {
executor.submit(process_invoice, str(pdf)): pdf
for pdf in pdf_files
}
for future in as_completed(future_to_path):
path = future_to_path[future]
try:
invoice = future.result()
results.append({
"file": path.name,
"data": invoice.model_dump()
})
print(f"✓ {path.name}: ${invoice.total_amount}")
except Exception as e:
errors.append({"file": path.name, "error": str(e)})
print(f"✗ {path.name}: {e}")
output = {"processed": results, "errors": errors}
with open(output_file, "w") as f:
json.dump(output, f, indent=2)
print(f"\n{len(results)} succeeded, {len(errors)} failed → {output_file}")
Module 2: CSV Analysis with Natural Language
Traditional CSV analysis requires writing pandas code for each question. With an LLM, you describe what you want in plain English:
# pipeline/csv_analyzer.py
import pandas as pd
import anthropic
from io import StringIO
client = anthropic.Anthropic()
def analyze_csv(csv_path: str, questions: list[str]) -> dict[str, str]:
"""
Answer natural language questions about a CSV file.
Returns a dict mapping each question to its answer.
"""
df = pd.read_csv(csv_path)
# Build a compact representation of the data
summary = f"""Columns: {list(df.columns)}
Shape: {df.shape[0]} rows × {df.shape[1]} columns
Data types: {df.dtypes.to_dict()}
Sample (first 5 rows):
{df.head(5).to_string()}
Statistical summary:
{df.describe().to_string()}"""
answers = {}
for question in questions:
message = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""You are analyzing a CSV dataset.
Dataset summary:
{summary}
Question: {question}
Answer concisely and precisely. Include specific numbers where relevant.
If you need to calculate something, show your reasoning briefly."""
}]
)
answers[question] = message.content[0].text.strip()
return answers
Automated Report Generation
class ReportSection(BaseModel):
title: str
finding: str
supporting_data: str
recommendation: Optional[str] = None
class DataReport(BaseModel):
executive_summary: str
key_metrics: dict[str, str]
sections: list[ReportSection]
data_quality_notes: list[str]
def generate_csv_report(csv_path: str) -> DataReport:
df = pd.read_csv(csv_path)
# Include a meaningful sample in the prompt
sample_size = min(50, len(df))
csv_sample = df.sample(sample_size).to_csv(index=False)
text = f"""CSV file with {len(df)} rows and {len(df.columns)} columns.
Full column statistics:
{df.describe(include='all').to_string()}
Representative sample ({sample_size} rows):
{csv_sample}"""
from .extractor import extract
return extract(
text,
DataReport,
context="Generate a business intelligence report from this dataset."
)
Module 3: Email Processing
IMAP Inbox Integration
# pipeline/email_processor.py
import imapclient
import email
from email.header import decode_header
from pydantic import BaseModel
from typing import Optional, Literal
from .extractor import extract
def fetch_unread_emails(
host: str,
username: str,
password: str,
folder: str = "INBOX",
limit: int = 50
) -> list[dict]:
with imapclient.IMAPClient(host, ssl=True) as client:
client.login(username, password)
client.select_folder(folder, readonly=False)
uids = client.search(["UNSEEN"])
if not uids:
return []
# Process most recent first
uids = uids[-limit:]
messages = client.fetch(uids, ["RFC822"])
emails = []
for uid, data in messages.items():
raw = data[b"RFC822"]
msg = email.message_from_bytes(raw)
# Decode subject
subject_parts = decode_header(msg.get("Subject", ""))
subject = ""
for part, charset in subject_parts:
if isinstance(part, bytes):
subject += part.decode(charset or "utf-8", errors="replace")
else:
subject += part
# Extract body
body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
payload = part.get_payload(decode=True)
if payload:
body += payload.decode(
part.get_content_charset() or "utf-8",
errors="replace"
)
else:
payload = msg.get_payload(decode=True)
if payload:
body = payload.decode(
msg.get_content_charset() or "utf-8",
errors="replace"
)
emails.append({
"uid": uid,
"from": msg.get("From", ""),
"subject": subject,
"date": msg.get("Date", ""),
"body": body[:4000] # cap at 4k chars
})
return emails
Email Classification and Routing
class EmailAnalysis(BaseModel):
category: Literal[
"support_request",
"billing_inquiry",
"sales_lead",
"partnership",
"spam",
"internal",
"other"
]
priority: Literal["urgent", "high", "normal", "low"]
sentiment: Literal["positive", "neutral", "negative", "frustrated"]
summary: str # 1-2 sentences
action_required: bool
suggested_action: Optional[str] = None # what to do next
customer_name: Optional[str] = None
issue_type: Optional[str] = None # for support requests
escalate: bool = False
def analyze_email(email_data: dict) -> EmailAnalysis:
text = f"""From: {email_data['from']}
Subject: {email_data['subject']}
Date: {email_data['date']}
{email_data['body']}"""
return extract(
text,
EmailAnalysis,
context="This is a business email. Classify and analyze it."
)
Full Processing Pipeline
def process_inbox(imap_config: dict, output_callback):
"""
Continuously process new emails.
output_callback receives (email_data, analysis) for each message.
"""
emails = fetch_unread_emails(**imap_config)
print(f"Processing {len(emails)} unread emails...")
for email_data in emails:
analysis = analyze_email(email_data)
output_callback(email_data, analysis)
# Log the result
priority_emoji = {"urgent": "🔴", "high": "🟡", "normal": "🟢", "low": "⚪"}
print(
f"{priority_emoji[analysis.priority]} "
f"[{analysis.category}] "
f"{email_data['subject'][:60]}"
)
if analysis.escalate:
print(f" ⚠️ ESCALATE: {analysis.suggested_action}")
# Example: route to different handlers based on category
def route_email(email_data: dict, analysis: EmailAnalysis):
if analysis.category == "support_request":
create_support_ticket(email_data, analysis)
elif analysis.category == "sales_lead":
add_to_crm(email_data, analysis)
elif analysis.category == "billing_inquiry":
forward_to_billing(email_data, analysis)
elif analysis.escalate:
send_slack_alert(email_data, analysis)
Running the Pipeline as a Service
Schedule all three processors to run on a schedule:
# pipeline/runner.py
import schedule
import time
import os
from dotenv import load_dotenv
from .pdf_processor import batch_process_invoices
from .csv_analyzer import generate_csv_report
from .email_processor import process_inbox, route_email
load_dotenv()
IMAP_CONFIG = {
"host": os.getenv("IMAP_HOST"),
"username": os.getenv("IMAP_USER"),
"password": os.getenv("IMAP_PASS"),
}
def run_email_processor():
print("--- Email processing run ---")
process_inbox(IMAP_CONFIG, route_email)
def run_invoice_processor():
print("--- Invoice processing run ---")
batch_process_invoices(
input_dir="/data/incoming-invoices",
output_file="/data/processed-invoices.json"
)
# Check email every 5 minutes
schedule.every(5).minutes.do(run_email_processor)
# Process invoices every hour
schedule.every().hour.do(run_invoice_processor)
print("Pipeline running. Press Ctrl+C to stop.")
while True:
schedule.run_pending()
time.sleep(30)
Run as a systemd service or Docker container for 24/7 operation.
Measuring Accuracy
Before deploying, validate extraction quality on a labeled sample:
def evaluate_extraction(
samples: list[tuple[str, dict]], # (text, expected_output)
schema,
field_weights: dict[str, float] = None
) -> float:
"""
Returns weighted field-level accuracy across samples.
"""
scores = []
for text, expected in samples:
result = extract(text, schema)
result_dict = result.model_dump()
field_scores = []
for field, expected_value in expected.items():
actual_value = result_dict.get(field)
match = str(actual_value).lower().strip() == str(expected_value).lower().strip()
weight = (field_weights or {}).get(field, 1.0)
field_scores.append((int(match), weight))
if field_scores:
total_weight = sum(w for _, w in field_scores)
weighted_score = sum(s * w for s, w in field_scores) / total_weight
scores.append(weighted_score)
return sum(scores) / len(scores) if scores else 0.0
For invoice extraction on typical business invoices, Claude achieves 92-96% field-level accuracy without any fine-tuning. For emails, classification accuracy exceeds 95% on well-defined categories.
Cost at Scale
Processing 1,000 documents per day with Claude Sonnet 4.6:
| Document Type | Avg Tokens In | Avg Tokens Out | Daily Cost (1k docs) | |---------------|--------------|----------------|---------------------| | Invoice PDF | 1,200 | 400 | ~$2.00 | | Contract (20pp) | 8,000 | 600 | ~$12.00 | | Email | 600 | 300 | ~$1.00 | | CSV report | 3,000 | 800 | ~$5.00 |
For most organizations, this is an order of magnitude cheaper than manual processing or custom ML infrastructure.
Next Steps
- Add a confidence score field to your schemas — prompt the LLM to rate its own certainty on ambiguous extractions
- Build a correction feedback loop: flag low-confidence extractions for human review, feed corrections back as few-shot examples
- Add OCR preprocessing for scanned PDFs (
pytesseract+ image preprocessing) - Cache extraction results by document hash to avoid re-processing unchanged files
Related Articles
Automate Your Dev Workflow with LLM Agents: Commits, PRs, and Code Review
Stop writing commit messages and PR descriptions by hand. Learn how to build LLM agents that automate the repetitive parts of your development workflow — from git diffs to actionable code review — using the Claude and OpenAI APIs.
Autonomous AI Agents: Building Self-Running Task Automation That Actually Works
Learn how to build AI agents that plan, execute, and recover from failures without human intervention. Covers the ReAct pattern, tool use, multi-step task execution, and the engineering patterns that separate reliable agents from demo-ware.