MCP for Business Document Processing
Enable AI to read, analyze, and extract insights from business documents including PDFs, contracts, reports, and office files using Model Context Protocol.
What You'll Learn
- PDF text extraction and analysis
- Office document processing (Word, Excel, PowerPoint)
- +3 more
Time & Difficulty
Time: 60-75 minutes
Level: Intermediate
What You'll Need
- Completed: Building Your First MCP Server
- Python libraries: PyPDF2, python-docx, openpyxl
- +1 more
Overview
Business document processing with MCP transforms how organizations handle contracts, reports, invoices, and other critical documents. Instead of manual review and data extraction, AI can automatically read, analyze, and extract insights from documents while maintaining accuracy and compliance.
Business Value: Eliminate hours of manual document review, reduce human error in data extraction, and enable real-time document analysis for faster business decisions.
Common Business Document Processing Use Cases
Contract Management
- Contract analysis: Extract key terms, dates, and obligations
- Compliance monitoring: Identify renewal dates and requirements
- Risk assessment: Flag unusual clauses or terms
Financial Documents
- Invoice processing: Extract vendor, amounts, and due dates
- Financial reports: Analyze trends and key metrics
- Expense reports: Categorize and validate expenses
HR Documents
- Resume screening: Extract qualifications and experience
- Policy compliance: Ensure document adherence to standards
- Employee records: Organize and analyze personnel files
Setting Up Document Processing with MCP
Installing Required Dependencies
# Add document processing libraries
uv add PyPDF2 python-docx openpyxl pillow pytesseract
uv add pdfplumber # Alternative PDF library with better table extraction
uv add python-magic # File type detection
# For OCR capabilities (requires system installation of tesseract)
# macOS: brew install tesseract
# Ubuntu: sudo apt-get install tesseract-ocr
# Windows: Download from GitHub releases
Basic Document Processing Server
# document_server.py
import os
from pathlib import Path
from typing import Dict, Any, List, Optional
import PyPDF2
import docx
import openpyxl
import pytesseract
from PIL import Image
import pdfplumber
import magic
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("DocumentProcessingServer")
class DocumentProcessor:
def __init__(self):
self.supported_types = {
'pdf': self.process_pdf,
'docx': self.process_word,
'xlsx': self.process_excel,
'pptx': self.process_powerpoint,
'png': self.process_image,
'jpg': self.process_image,
'jpeg': self.process_image
}
def detect_file_type(self, file_path: str) -> str:
"""Detect file type using python-magic"""
try:
mime_type = magic.from_file(file_path, mime=True)
extension = Path(file_path).suffix.lower().lstrip('.')
return extension
except:
return Path(file_path).suffix.lower().lstrip('.')
def process_document(self, file_path: str) -> Dict[str, Any]:
"""Process document based on file type"""
if not os.path.exists(file_path):
return {"error": f"File not found: {file_path}"}
file_type = self.detect_file_type(file_path)
if file_type not in self.supported_types:
return {"error": f"Unsupported file type: {file_type}"}
try:
return self.supported_types[file_type](file_path)
except Exception as e:
return {"error": f"Processing failed: {str(e)}"}
def process_pdf(self, file_path: str) -> Dict[str, Any]:
"""Extract text and metadata from PDF"""
result = {
"file_type": "pdf",
"text_content": "",
"metadata": {},
"page_count": 0,
"tables": []
}
# Use pdfplumber for better text and table extraction
with pdfplumber.open(file_path) as pdf:
result["page_count"] = len(pdf.pages)
# Extract text from all pages
text_content = []
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text_content.append(page_text)
# Extract tables
tables = page.extract_tables()
for table in tables:
result["tables"].append({
"page": page.page_number,
"data": table
})
result["text_content"] = "\n\n".join(text_content)
# Get metadata using PyPDF2
try:
with open(file_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
if pdf_reader.metadata:
result["metadata"] = {
"title": pdf_reader.metadata.get('/Title', ''),
"author": pdf_reader.metadata.get('/Author', ''),
"subject": pdf_reader.metadata.get('/Subject', ''),
"creator": pdf_reader.metadata.get('/Creator', ''),
"creation_date": str(pdf_reader.metadata.get('/CreationDate', ''))
}
except:
pass
return result
def process_word(self, file_path: str) -> Dict[str, Any]:
"""Extract text and metadata from Word documents"""
doc = docx.Document(file_path)
# Extract text content
text_content = []
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text_content.append(paragraph.text)
# Extract tables
tables = []
for table in doc.tables:
table_data = []
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
table_data.append(row_data)
tables.append(table_data)
return {
"file_type": "docx",
"text_content": "\n".join(text_content),
"tables": tables,
"paragraph_count": len(doc.paragraphs),
"table_count": len(doc.tables)
}
def process_excel(self, file_path: str) -> Dict[str, Any]:
"""Extract data from Excel spreadsheets"""
workbook = openpyxl.load_workbook(file_path, data_only=True)
result = {
"file_type": "xlsx",
"sheets": {},
"sheet_names": workbook.sheetnames
}
for sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
sheet_data = []
for row in sheet.iter_rows(values_only=True):
# Skip empty rows
if any(cell is not None for cell in row):
sheet_data.append([str(cell) if cell is not None else "" for cell in row])
result["sheets"][sheet_name] = {
"data": sheet_data,
"row_count": len(sheet_data),
"column_count": len(sheet_data[0]) if sheet_data else 0
}
return result
def process_powerpoint(self, file_path: str) -> Dict[str, Any]:
"""Extract text from PowerPoint presentations"""
from pptx import Presentation
presentation = Presentation(file_path)
slides_content = []
for i, slide in enumerate(presentation.slides):
slide_text = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_text.append(shape.text)
slides_content.append({
"slide_number": i + 1,
"text": "\n".join(slide_text)
})
return {
"file_type": "pptx",
"slides": slides_content,
"slide_count": len(presentation.slides),
"text_content": "\n\n".join([slide["text"] for slide in slides_content])
}
def process_image(self, file_path: str) -> Dict[str, Any]:
"""Extract text from images using OCR"""
try:
image = Image.open(file_path)
text = pytesseract.image_to_string(image)
return {
"file_type": "image",
"text_content": text,
"image_size": image.size,
"image_mode": image.mode
}
except Exception as e:
return {
"file_type": "image",
"error": f"OCR failed: {str(e)}",
"text_content": ""
}
# Initialize document processor
doc_processor = DocumentProcessor()
@mcp.tool()
def process_document(file_path: str) -> Dict[str, Any]:
"""Process any supported business document and extract content"""
return doc_processor.process_document(file_path)
@mcp.tool()
def analyze_contract(file_path: str) -> Dict[str, Any]:
"""Analyze contract document for key terms and dates"""
doc_result = doc_processor.process_document(file_path)
if "error" in doc_result:
return doc_result
text = doc_result.get("text_content", "")
# Basic contract analysis (enhance with business-specific patterns)
analysis = {
"document_type": "contract",
"key_terms": extract_contract_terms(text),
"important_dates": extract_dates(text),
"parties": extract_parties(text),
"financial_terms": extract_financial_terms(text),
"risk_indicators": identify_risk_indicators(text)
}
return {**doc_result, "analysis": analysis}
@mcp.tool()
def extract_invoice_data(file_path: str) -> Dict[str, Any]:
"""Extract structured data from invoice documents"""
doc_result = doc_processor.process_document(file_path)
if "error" in doc_result:
return doc_result
text = doc_result.get("text_content", "")
# Invoice-specific extraction
invoice_data = {
"document_type": "invoice",
"vendor_info": extract_vendor_info(text),
"invoice_number": extract_invoice_number(text),
"amount": extract_amount(text),
"due_date": extract_due_date(text),
"line_items": extract_line_items(doc_result.get("tables", []))
}
return {**doc_result, "invoice_data": invoice_data}
@mcp.tool()
def batch_process_documents(directory_path: str, file_pattern: str = "*") -> List[Dict[str, Any]]:
"""Process multiple documents in a directory"""
directory = Path(directory_path)
if not directory.exists():
return [{"error": f"Directory not found: {directory_path}"}]
results = []
for file_path in directory.glob(file_pattern):
if file_path.is_file():
result = doc_processor.process_document(str(file_path))
result["file_name"] = file_path.name
result["file_path"] = str(file_path)
results.append(result)
return results
# Business-specific analysis functions
def extract_contract_terms(text: str) -> List[str]:
"""Extract key contract terms and clauses"""
import re
# Common contract term patterns
patterns = [
r'term of (\d+) (?:years?|months?|days?)',
r'effective date[:\s]+([^\n]+)',
r'termination[:\s]+([^\n]+)',
r'payment terms?[:\s]+([^\n]+)',
r'liability[:\s]+([^\n]+)'
]
terms = []
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
terms.extend(matches)
return terms
def extract_dates(text: str) -> List[str]:
"""Extract dates from document text"""
import re
date_patterns = [
r'\b\d{1,2}/\d{1,2}/\d{4}\b',
r'\b\d{1,2}-\d{1,2}-\d{4}\b',
r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}\b'
]
dates = []
for pattern in date_patterns:
dates.extend(re.findall(pattern, text, re.IGNORECASE))
return dates
def extract_parties(text: str) -> List[str]:
"""Extract contracting parties"""
import re
# Look for company names and legal entities
patterns = [
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\s+(?:LLC|Inc|Corp|Ltd|Company)',
r'between\s+([^,]+),?\s+and\s+([^,\n]+)',
]
parties = []
for pattern in patterns:
matches = re.findall(pattern, text)
if matches:
parties.extend([match for match in matches if isinstance(match, str)])
return parties
def extract_financial_terms(text: str) -> List[str]:
"""Extract financial amounts and terms"""
import re
# Currency patterns
patterns = [
r'\$[\d,]+\.?\d*',
r'amount of\s+([^,\n]+)',
r'price[:\s]+([^,\n]+)',
r'fee[:\s]+([^,\n]+)'
]
financial_terms = []
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
financial_terms.extend(matches)
return financial_terms
def identify_risk_indicators(text: str) -> List[str]:
"""Identify potential risk indicators in contracts"""
risk_keywords = [
'penalty', 'damages', 'breach', 'default', 'termination',
'indemnification', 'liability', 'force majeure', 'arbitration'
]
found_risks = []
for keyword in risk_keywords:
if keyword.lower() in text.lower():
found_risks.append(keyword)
return found_risks
def extract_vendor_info(text: str) -> Dict[str, str]:
"""Extract vendor information from invoice"""
import re
# Basic vendor extraction patterns
vendor_info = {}
# Look for common invoice headers
patterns = {
'company': r'(?:from|bill to|vendor)[:\s]*([^\n]+)',
'address': r'(\d+[^,\n]+(?:street|st|avenue|ave|road|rd|drive|dr)[^,\n]*)',
'phone': r'(\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4})',
'email': r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})'
}
for field, pattern in patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
vendor_info[field] = match.group(1).strip()
return vendor_info
def extract_invoice_number(text: str) -> str:
"""Extract invoice number"""
import re
patterns = [
r'invoice[#\s]*:?\s*([A-Z0-9\-]+)',
r'inv[#\s]*:?\s*([A-Z0-9\-]+)',
r'#\s*([A-Z0-9\-]+)'
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
return ""
def extract_amount(text: str) -> str:
"""Extract total amount from invoice"""
import re
patterns = [
r'total[:\s]*\$?([\d,]+\.?\d*)',
r'amount due[:\s]*\$?([\d,]+\.?\d*)',
r'balance[:\s]*\$?([\d,]+\.?\d*)'
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
return ""
def extract_due_date(text: str) -> str:
"""Extract due date from invoice"""
import re
patterns = [
r'due date[:\s]*([^\n]+)',
r'payment due[:\s]*([^\n]+)',
r'due[:\s]*(\d{1,2}/\d{1,2}/\d{4})'
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1).strip()
return ""
def extract_line_items(tables: List[Dict]) -> List[Dict]:
"""Extract line items from invoice tables"""
line_items = []
for table in tables:
data = table.get("data", [])
if not data:
continue
# Assume first row is headers
headers = [str(cell).lower() for cell in data[0]] if data else []
for row in data[1:]:
if len(row) >= len(headers):
item = {}
for i, header in enumerate(headers):
if i < len(row):
item[header] = str(row[i])
line_items.append(item)
return line_items
if __name__ == "__main__":
mcp.run()
Cloud Storage Integration
SharePoint Integration
# sharepoint_integration.py
from office365.runtime.auth.authentication_context import AuthenticationContext
from office365.sharepoint.client_context import ClientContext
from office365.sharepoint.files.file import File
class SharePointDocumentProcessor:
def __init__(self, site_url: str, username: str, password: str):
self.site_url = site_url
self.username = username
self.password = password
self.ctx = None
self._authenticate()
def _authenticate(self):
"""Authenticate with SharePoint"""
auth_ctx = AuthenticationContext(self.site_url)
auth_ctx.acquire_token_for_user(self.username, self.password)
self.ctx = ClientContext(self.site_url, auth_ctx)
def download_and_process_document(self, file_url: str, local_path: str) -> Dict[str, Any]:
"""Download document from SharePoint and process it"""
try:
# Download file
with open(local_path, "wb") as local_file:
file = self.ctx.web.get_file_by_server_relative_url(file_url)
file.download(local_file)
self.ctx.execute_query()
# Process downloaded file
result = doc_processor.process_document(local_path)
result["source"] = "sharepoint"
result["sharepoint_url"] = file_url
# Clean up local file
os.remove(local_path)
return result
except Exception as e:
return {"error": f"SharePoint processing failed: {str(e)}"}
@mcp.tool()
def process_sharepoint_document(site_url: str, file_url: str, username: str, password: str) -> Dict[str, Any]:
"""Process document directly from SharePoint"""
processor = SharePointDocumentProcessor(site_url, username, password)
local_path = f"/tmp/{os.path.basename(file_url)}"
return processor.download_and_process_document(file_url, local_path)
Google Drive Integration
# google_drive_integration.py
from googleapiclient.discovery import build
from google.oauth2.credentials import Credentials
import io
class GoogleDriveDocumentProcessor:
def __init__(self, credentials_path: str):
self.credentials = Credentials.from_authorized_user_file(credentials_path)
self.service = build('drive', 'v3', credentials=self.credentials)
def download_and_process_document(self, file_id: str) -> Dict[str, Any]:
"""Download and process document from Google Drive"""
try:
# Get file metadata
file_metadata = self.service.files().get(fileId=file_id).execute()
# Download file content
request = self.service.files().get_media(fileId=file_id)
file_content = io.BytesIO()
downloader = MediaIoBaseDownload(file_content, request)
done = False
while done is False:
status, done = downloader.next_chunk()
# Save to temporary file
temp_path = f"/tmp/{file_metadata['name']}"
with open(temp_path, 'wb') as f:
f.write(file_content.getvalue())
# Process document
result = doc_processor.process_document(temp_path)
result["source"] = "google_drive"
result["file_metadata"] = file_metadata
# Clean up
os.remove(temp_path)
return result
except Exception as e:
return {"error": f"Google Drive processing failed: {str(e)}"}
@mcp.tool()
def process_google_drive_document(file_id: str) -> Dict[str, Any]:
"""Process document from Google Drive"""
processor = GoogleDriveDocumentProcessor("path/to/credentials.json")
return processor.download_and_process_document(file_id)
Document Workflow Automation
Contract Review Workflow
@mcp.tool()
def automated_contract_review(file_path: str, review_checklist: List[str]) -> Dict[str, Any]:
"""Perform automated contract review against checklist"""
# Process the contract
analysis = analyze_contract(file_path)
if "error" in analysis:
return analysis
# Check against business requirements
review_results = {
"contract_analysis": analysis,
"checklist_results": {},
"overall_score": 0,
"recommendations": []
}
text = analysis.get("text_content", "").lower()
for item in review_checklist:
if item.lower() in text:
review_results["checklist_results"][item] = "✅ Found"
review_results["overall_score"] += 1
else:
review_results["checklist_results"][item] = "❌ Missing"
review_results["recommendations"].append(f"Consider adding: {item}")
# Calculate percentage
review_results["completion_percentage"] = (
review_results["overall_score"] / len(review_checklist) * 100
)
return review_results
@mcp.tool()
def generate_document_summary(file_path: str, summary_type: str = "executive") -> Dict[str, Any]:
"""Generate business summary of document content"""
doc_result = process_document(file_path)
if "error" in doc_result:
return doc_result
text = doc_result.get("text_content", "")
if summary_type == "executive":
summary = generate_executive_summary(text)
elif summary_type == "technical":
summary = generate_technical_summary(text)
elif summary_type == "financial":
summary = generate_financial_summary(text)
else:
summary = generate_general_summary(text)
return {
"original_document": doc_result,
"summary_type": summary_type,
"summary": summary,
"key_points": extract_key_points(text),
"action_items": extract_action_items(text)
}
def generate_executive_summary(text: str) -> str:
"""Generate executive summary focusing on business impact"""
# Implement business-focused summarization logic
# This is a simplified version - in practice, you might use NLP libraries
sentences = text.split('.')
key_sentences = []
keywords = ['revenue', 'profit', 'cost', 'budget', 'timeline', 'objective', 'goal', 'strategy']
for sentence in sentences:
if any(keyword in sentence.lower() for keyword in keywords):
key_sentences.append(sentence.strip())
return '. '.join(key_sentences[:5]) # Top 5 business-relevant sentences
def extract_key_points(text: str) -> List[str]:
"""Extract key business points from document"""
import re
# Look for numbered lists, bullet points, and important statements
patterns = [
r'(?:^|\n)\s*(?:\d+\.|\•|\*)\s*([^\n]+)',
r'(?:important|key|critical|essential)[:\s]*([^\n]+)',
r'(?:objective|goal|target)[:\s]*([^\n]+)'
]
key_points = []
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.MULTILINE)
key_points.extend([match.strip() for match in matches if len(match.strip()) > 10])
return key_points[:10] # Top 10 key points
def extract_action_items(text: str) -> List[str]:
"""Extract action items and next steps"""
import re
action_patterns = [
r'(?:action|todo|next step|follow up)[:\s]*([^\n]+)',
r'(?:must|should|need to|required to)\s+([^\n]+)',
r'(?:^|\n)\s*(?:\d+\.|\•|\*)\s*(?:complete|finish|implement|execute)\s+([^\n]+)'
]
actions = []
for pattern in action_patterns:
matches = re.findall(pattern, text, re.IGNORECASE | re.MULTILINE)
actions.extend([match.strip() for match in matches if len(match.strip()) > 5])
return actions[:5] # Top 5 action items
Business Document Templates
Document Classification
@mcp.tool()
def classify_document(file_path: str) -> Dict[str, Any]:
"""Automatically classify document type based on content"""
doc_result = process_document(file_path)
if "error" in doc_result:
return doc_result
text = doc_result.get("text_content", "").lower()
# Classification rules
classification_rules = {
"contract": ["agreement", "terms and conditions", "hereby agrees", "party"],
"invoice": ["invoice", "amount due", "payment terms", "bill to"],
"report": ["executive summary", "analysis", "findings", "recommendations"],
"policy": ["policy", "procedure", "guidelines", "compliance"],
"proposal": ["proposal", "scope of work", "deliverables", "timeline"],
"manual": ["instructions", "how to", "step by step", "procedure"]
}
classification_scores = {}
for doc_type, keywords in classification_rules.items():
score = sum(1 for keyword in keywords if keyword in text)
classification_scores[doc_type] = score
# Get the highest scoring classification
best_match = max(classification_scores, key=classification_scores.get)
confidence = classification_scores[best_match] / len(classification_rules[best_match])
return {
**doc_result,
"classification": {
"document_type": best_match,
"confidence": confidence,
"all_scores": classification_scores
}
}
Performance Optimization
Batch Processing
@mcp.tool()
def batch_process_with_analysis(directory_path: str, analysis_type: str = "classification") -> Dict[str, Any]:
"""Process multiple documents with specific analysis"""
results = batch_process_documents(directory_path)
analyzed_results = []
summary_stats = {
"total_documents": len(results),
"successful_processes": 0,
"failed_processes": 0,
"document_types": {},
"total_pages": 0
}
for result in results:
if "error" not in result:
summary_stats["successful_processes"] += 1
summary_stats["total_pages"] += result.get("page_count", 0)
# Add specific analysis
if analysis_type == "classification":
classification = classify_document(result["file_path"])
result["classification"] = classification.get("classification", {})
doc_type = result["classification"].get("document_type", "unknown")
summary_stats["document_types"][doc_type] = summary_stats["document_types"].get(doc_type, 0) + 1
elif analysis_type == "contract":
if "contract" in result.get("text_content", "").lower():
result["contract_analysis"] = analyze_contract(result["file_path"])
else:
summary_stats["failed_processes"] += 1
analyzed_results.append(result)
return {
"results": analyzed_results,
"summary": summary_stats
}
Testing Document Processing
# test_document_processing.py
def test_document_types():
"""Test processing of different document types"""
test_files = {
"sample.pdf": "pdf",
"contract.docx": "docx",
"data.xlsx": "xlsx",
"presentation.pptx": "pptx",
"scanned.png": "png"
}
for filename, expected_type in test_files.items():
if os.path.exists(filename):
result = process_document(filename)
if "error" not in result:
print(f"✅ {filename}: {result['file_type']}")
else:
print(f"❌ {filename}: {result['error']}")
else:
print(f"⚠️ {filename}: File not found")
if __name__ == "__main__":
test_document_types()
Next Steps
Now that you can process business documents, explore:
- Business Tool Integration - Connect document workflows to business systems
- Contract Analysis Automation - Advanced contract processing
- Business Intelligence Reports - Automated report generation
- MCP Security for Enterprise - Secure document handling
Ready to implement? Start by identifying your most time-consuming document processing tasks and automate them first for maximum impact.
Related Guides
A Developer's Guide to MCP Security: Beyond the Basics
Centralize your understanding of MCP security with this comprehensive guide. Learn practical steps for authenticating servers, preventing prompt injection, validating URIs, and managing secrets.
Building Your First MCP Server with Python
A step-by-step tutorial on how to create and run a basic Model Context Protocol (MCP) server using the Python SDK, FastMCP.
Connect Claude to Your Business Files with MCP
Step-by-step guide to setting up Claude AI to read, analyze, and work with your business documents and spreadsheets automatically.
Want More Step-by-Step Guides?
Get weekly implementation guides and practical MCP tutorials delivered to your inbox.
Subscribe for Weekly Guides