AI/ML

Build an AI Document Processing Pipeline: PDFs, CSVs, and Emails at Scale

FastSox Team2026-04-0710 min read

Organizations accumulate documents faster than humans can process them. Invoices, contracts, support emails, survey responses, financial reports — all sitting in inboxes and file systems, waiting for someone to read them and extract the relevant information.

Before LLMs, automating this required custom ML models trained on domain-specific data, complex regex patterns, or expensive human-in-the-loop services. Now, a well-prompted general-purpose LLM can do it reliably for most document types out of the box.

This tutorial builds a complete document processing pipeline: ingest PDFs, CSVs, and emails — extract structured data — output to a database or downstream system. Production-ready, runs unattended, handles errors gracefully.

Architecture Overview

Input Sources         Processing Layer          Output
─────────────         ─────────────────         ──────
PDF files      ──▶   Text extraction     ──▶
CSV files      ──▶   LLM analysis        ──▶   Structured JSON
Email inbox    ──▶   Schema validation   ──▶   Database / API
                     Error handling

The pipeline follows a simple contract: every document becomes a JSON object with defined fields. What happens downstream — storing in Postgres, calling a webhook, updating a CRM — is decoupled from the extraction logic.

Prerequisites

pip install anthropic pymupdf python-dotenv pydantic imapclient chardet

Create a .env file:

ANTHROPIC_API_KEY=sk-ant-...

Core Extraction Engine

The heart of the pipeline is a reusable extraction function that takes text and a target schema, and returns validated structured data:

# pipeline/extractor.py
import json
import anthropic
from pydantic import BaseModel, ValidationError
from typing import Type, TypeVar

T = TypeVar("T", bound=BaseModel)
client = anthropic.Anthropic()

def extract(
    text: str,
    schema: Type[T],
    context: str = "",
    max_retries: int = 2
) -> T:
    """
    Extract structured data from text using LLM.
    Returns a validated Pydantic model instance.
    """
    schema_json = json.dumps(schema.model_json_schema(), indent=2)

    prompt = f"""{context}

Extract information from the following document and return ONLY valid JSON
matching this schema exactly:

{schema_json}

Rules:
- Return null for fields that are missing or ambiguous
- Dates must be ISO 8601 format (YYYY-MM-DD)
- Monetary amounts as numbers, not strings (e.g. 1250.00 not "$1,250")
- If a field appears multiple times, use the most recent/authoritative value

<document>
{text[:12000]}
</document>

Return only the JSON object, no explanation."""

    for attempt in range(max_retries + 1):
        message = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}]
        )

        raw = message.content[0].text.strip()

        # Strip markdown code blocks if present
        if raw.startswith("```"):
            raw = raw.split("```")[1]
            if raw.startswith("json"):
                raw = raw[4:]
            raw = raw.rsplit("```", 1)[0].strip()

        try:
            data = json.loads(raw)
            return schema(**data)
        except (json.JSONDecodeError, ValidationError) as e:
            if attempt == max_retries:
                raise RuntimeError(
                    f"Extraction failed after {max_retries + 1} attempts: {e}\nRaw output: {raw}"
                ) from e
            # Retry with error context
            prompt += f"\n\nPrevious attempt failed with error: {e}\nPlease fix and try again."

This function handles the messy parts: JSON parsing failures, schema validation, and retry logic. The caller just provides a Pydantic schema and gets back a typed object.

Module 1: PDF Processing

Extracting Text from PDFs

# pipeline/pdf_processor.py
import fitz  # PyMuPDF
from pathlib import Path
from pydantic import BaseModel
from typing import Optional
from .extractor import extract

def pdf_to_text(path: str) -> str:
    """Extract all text from a PDF, preserving some structure."""
    doc = fitz.open(path)
    pages = []

    for page_num, page in enumerate(doc, 1):
        text = page.get_text("text")
        if text.strip():
            pages.append(f"--- Page {page_num} ---\n{text}")

    return "\n\n".join(pages)

Invoice Extraction

Define what you want to extract as a Pydantic model:

class LineItem(BaseModel):
    description: str
    quantity: Optional[float] = None
    unit_price: Optional[float] = None
    total: float

class Invoice(BaseModel):
    invoice_number: str
    invoice_date: Optional[str] = None       # ISO 8601
    due_date: Optional[str] = None
    vendor_name: str
    vendor_address: Optional[str] = None
    customer_name: Optional[str] = None
    line_items: list[LineItem]
    subtotal: Optional[float] = None
    tax_amount: Optional[float] = None
    total_amount: float
    currency: str = "USD"
    payment_terms: Optional[str] = None

def process_invoice(pdf_path: str) -> Invoice:
    text = pdf_to_text(pdf_path)
    return extract(
        text,
        Invoice,
        context="This is an invoice or bill document."
    )

Contract Key Terms Extraction

from typing import List

class ContractTerms(BaseModel):
    parties: List[str]                        # list of party names
    effective_date: Optional[str] = None
    expiration_date: Optional[str] = None
    auto_renewal: Optional[bool] = None
    notice_period_days: Optional[int] = None  # for termination
    governing_law: Optional[str] = None       # jurisdiction
    payment_amount: Optional[float] = None
    payment_frequency: Optional[str] = None  # monthly, annual, etc.
    key_obligations: List[str] = []           # top 3-5 obligations per party
    termination_conditions: List[str] = []
    confidentiality_clause: bool = False
    non_compete_clause: bool = False

def process_contract(pdf_path: str) -> ContractTerms:
    text = pdf_to_text(pdf_path)
    return extract(
        text,
        ContractTerms,
        context="This is a legal contract or agreement."
    )

Batch PDF Processing

from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import json

def batch_process_invoices(input_dir: str, output_file: str, workers: int = 4):
    pdf_files = list(Path(input_dir).glob("*.pdf"))
    results = []
    errors = []

    with ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_path = {
            executor.submit(process_invoice, str(pdf)): pdf
            for pdf in pdf_files
        }

        for future in as_completed(future_to_path):
            path = future_to_path[future]
            try:
                invoice = future.result()
                results.append({
                    "file": path.name,
                    "data": invoice.model_dump()
                })
                print(f"✓ {path.name}: ${invoice.total_amount}")
            except Exception as e:
                errors.append({"file": path.name, "error": str(e)})
                print(f"✗ {path.name}: {e}")

    output = {"processed": results, "errors": errors}
    with open(output_file, "w") as f:
        json.dump(output, f, indent=2)

    print(f"\n{len(results)} succeeded, {len(errors)} failed → {output_file}")

Module 2: CSV Analysis with Natural Language

Traditional CSV analysis requires writing pandas code for each question. With an LLM, you describe what you want in plain English:

# pipeline/csv_analyzer.py
import pandas as pd
import anthropic
from io import StringIO

client = anthropic.Anthropic()

def analyze_csv(csv_path: str, questions: list[str]) -> dict[str, str]:
    """
    Answer natural language questions about a CSV file.
    Returns a dict mapping each question to its answer.
    """
    df = pd.read_csv(csv_path)

    # Build a compact representation of the data
    summary = f"""Columns: {list(df.columns)}
Shape: {df.shape[0]} rows × {df.shape[1]} columns
Data types: {df.dtypes.to_dict()}
Sample (first 5 rows):
{df.head(5).to_string()}

Statistical summary:
{df.describe().to_string()}"""

    answers = {}
    for question in questions:
        message = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            messages=[{
                "role": "user",
                "content": f"""You are analyzing a CSV dataset.

Dataset summary:
{summary}

Question: {question}

Answer concisely and precisely. Include specific numbers where relevant.
If you need to calculate something, show your reasoning briefly."""
            }]
        )
        answers[question] = message.content[0].text.strip()

    return answers

Automated Report Generation

class ReportSection(BaseModel):
    title: str
    finding: str
    supporting_data: str
    recommendation: Optional[str] = None

class DataReport(BaseModel):
    executive_summary: str
    key_metrics: dict[str, str]
    sections: list[ReportSection]
    data_quality_notes: list[str]

def generate_csv_report(csv_path: str) -> DataReport:
    df = pd.read_csv(csv_path)

    # Include a meaningful sample in the prompt
    sample_size = min(50, len(df))
    csv_sample = df.sample(sample_size).to_csv(index=False)

    text = f"""CSV file with {len(df)} rows and {len(df.columns)} columns.

Full column statistics:
{df.describe(include='all').to_string()}

Representative sample ({sample_size} rows):
{csv_sample}"""

    from .extractor import extract
    return extract(
        text,
        DataReport,
        context="Generate a business intelligence report from this dataset."
    )

Module 3: Email Processing

IMAP Inbox Integration

# pipeline/email_processor.py
import imapclient
import email
from email.header import decode_header
from pydantic import BaseModel
from typing import Optional, Literal
from .extractor import extract

def fetch_unread_emails(
    host: str,
    username: str,
    password: str,
    folder: str = "INBOX",
    limit: int = 50
) -> list[dict]:
    with imapclient.IMAPClient(host, ssl=True) as client:
        client.login(username, password)
        client.select_folder(folder, readonly=False)

        uids = client.search(["UNSEEN"])
        if not uids:
            return []

        # Process most recent first
        uids = uids[-limit:]
        messages = client.fetch(uids, ["RFC822"])

        emails = []
        for uid, data in messages.items():
            raw = data[b"RFC822"]
            msg = email.message_from_bytes(raw)

            # Decode subject
            subject_parts = decode_header(msg.get("Subject", ""))
            subject = ""
            for part, charset in subject_parts:
                if isinstance(part, bytes):
                    subject += part.decode(charset or "utf-8", errors="replace")
                else:
                    subject += part

            # Extract body
            body = ""
            if msg.is_multipart():
                for part in msg.walk():
                    if part.get_content_type() == "text/plain":
                        payload = part.get_payload(decode=True)
                        if payload:
                            body += payload.decode(
                                part.get_content_charset() or "utf-8",
                                errors="replace"
                            )
            else:
                payload = msg.get_payload(decode=True)
                if payload:
                    body = payload.decode(
                        msg.get_content_charset() or "utf-8",
                        errors="replace"
                    )

            emails.append({
                "uid": uid,
                "from": msg.get("From", ""),
                "subject": subject,
                "date": msg.get("Date", ""),
                "body": body[:4000]  # cap at 4k chars
            })

        return emails

Email Classification and Routing

class EmailAnalysis(BaseModel):
    category: Literal[
        "support_request",
        "billing_inquiry",
        "sales_lead",
        "partnership",
        "spam",
        "internal",
        "other"
    ]
    priority: Literal["urgent", "high", "normal", "low"]
    sentiment: Literal["positive", "neutral", "negative", "frustrated"]
    summary: str                              # 1-2 sentences
    action_required: bool
    suggested_action: Optional[str] = None   # what to do next
    customer_name: Optional[str] = None
    issue_type: Optional[str] = None         # for support requests
    escalate: bool = False

def analyze_email(email_data: dict) -> EmailAnalysis:
    text = f"""From: {email_data['from']}
Subject: {email_data['subject']}
Date: {email_data['date']}

{email_data['body']}"""

    return extract(
        text,
        EmailAnalysis,
        context="This is a business email. Classify and analyze it."
    )

Full Processing Pipeline

def process_inbox(imap_config: dict, output_callback):
    """
    Continuously process new emails.
    output_callback receives (email_data, analysis) for each message.
    """
    emails = fetch_unread_emails(**imap_config)
    print(f"Processing {len(emails)} unread emails...")

    for email_data in emails:
        analysis = analyze_email(email_data)

        output_callback(email_data, analysis)

        # Log the result
        priority_emoji = {"urgent": "🔴", "high": "🟡", "normal": "🟢", "low": "⚪"}
        print(
            f"{priority_emoji[analysis.priority]} "
            f"[{analysis.category}] "
            f"{email_data['subject'][:60]}"
        )
        if analysis.escalate:
            print(f"  ⚠️  ESCALATE: {analysis.suggested_action}")

# Example: route to different handlers based on category
def route_email(email_data: dict, analysis: EmailAnalysis):
    if analysis.category == "support_request":
        create_support_ticket(email_data, analysis)
    elif analysis.category == "sales_lead":
        add_to_crm(email_data, analysis)
    elif analysis.category == "billing_inquiry":
        forward_to_billing(email_data, analysis)
    elif analysis.escalate:
        send_slack_alert(email_data, analysis)

Running the Pipeline as a Service

Schedule all three processors to run on a schedule:

# pipeline/runner.py
import schedule
import time
import os
from dotenv import load_dotenv
from .pdf_processor import batch_process_invoices
from .csv_analyzer import generate_csv_report
from .email_processor import process_inbox, route_email

load_dotenv()

IMAP_CONFIG = {
    "host": os.getenv("IMAP_HOST"),
    "username": os.getenv("IMAP_USER"),
    "password": os.getenv("IMAP_PASS"),
}

def run_email_processor():
    print("--- Email processing run ---")
    process_inbox(IMAP_CONFIG, route_email)

def run_invoice_processor():
    print("--- Invoice processing run ---")
    batch_process_invoices(
        input_dir="/data/incoming-invoices",
        output_file="/data/processed-invoices.json"
    )

# Check email every 5 minutes
schedule.every(5).minutes.do(run_email_processor)

# Process invoices every hour
schedule.every().hour.do(run_invoice_processor)

print("Pipeline running. Press Ctrl+C to stop.")
while True:
    schedule.run_pending()
    time.sleep(30)

Run as a systemd service or Docker container for 24/7 operation.

Measuring Accuracy

Before deploying, validate extraction quality on a labeled sample:

def evaluate_extraction(
    samples: list[tuple[str, dict]],  # (text, expected_output)
    schema,
    field_weights: dict[str, float] = None
) -> float:
    """
    Returns weighted field-level accuracy across samples.
    """
    scores = []

    for text, expected in samples:
        result = extract(text, schema)
        result_dict = result.model_dump()

        field_scores = []
        for field, expected_value in expected.items():
            actual_value = result_dict.get(field)
            match = str(actual_value).lower().strip() == str(expected_value).lower().strip()
            weight = (field_weights or {}).get(field, 1.0)
            field_scores.append((int(match), weight))

        if field_scores:
            total_weight = sum(w for _, w in field_scores)
            weighted_score = sum(s * w for s, w in field_scores) / total_weight
            scores.append(weighted_score)

    return sum(scores) / len(scores) if scores else 0.0

For invoice extraction on typical business invoices, Claude achieves 92-96% field-level accuracy without any fine-tuning. For emails, classification accuracy exceeds 95% on well-defined categories.

Cost at Scale

Processing 1,000 documents per day with Claude Sonnet 4.6:

| Document Type | Avg Tokens In | Avg Tokens Out | Daily Cost (1k docs) | |---------------|--------------|----------------|---------------------| | Invoice PDF | 1,200 | 400 | ~$2.00 | | Contract (20pp) | 8,000 | 600 | ~$12.00 | | Email | 600 | 300 | ~$1.00 | | CSV report | 3,000 | 800 | ~$5.00 |

For most organizations, this is an order of magnitude cheaper than manual processing or custom ML infrastructure.

Next Steps

  • Add a confidence score field to your schemas — prompt the LLM to rate its own certainty on ambiguous extractions
  • Build a correction feedback loop: flag low-confidence extractions for human review, feed corrections back as few-shot examples
  • Add OCR preprocessing for scanned PDFs (pytesseract + image preprocessing)
  • Cache extraction results by document hash to avoid re-processing unchanged files
#ai#llm#automation#document-processing#python#pdf#data-extraction

Related Articles