Skip to main content
intermediate
Difficulty: 3/5
Published: 6/17/2025
By: UnlockMCP Team

Business Tool Integration with MCP

Connect AI to your business tools including CRM, project management, accounting software, and communication platforms using Model Context Protocol.

What You'll Learn

  • CRM integration patterns (Salesforce, HubSpot, Pipedrive)
  • Project management tool connections (Asana, Monday.com, Trello)
  • +3 more

Time & Difficulty

Time: 75-90 minutes

Level: Intermediate

What You'll Need

  • Completed: MCP Business API Integration
  • Access to business tools and API credentials
  • +1 more
MCP Business Tools CRM Project Management Integration Automation

Overview

Business tool integration with MCP enables AI to orchestrate workflows across your entire business technology stack. Instead of manually switching between applications and copying data, AI can automatically synchronize information, trigger actions, and maintain consistency across all your business tools.

Business Value: Eliminate context switching between tools, reduce data entry errors by 95%, and enable AI to manage complex multi-step business processes automatically.

Common Business Tool Integration Patterns

Customer Relationship Management

  • Lead lifecycle automation: Sync leads across marketing and sales tools
  • Customer data unification: Maintain consistent customer records
  • Sales pipeline automation: Update deals based on customer interactions

Project Management

  • Task synchronization: Keep tasks aligned across different tools
  • Resource allocation: Automatically assign team members based on availability
  • Progress reporting: Generate status updates from multiple project sources

Financial Management

  • Invoice automation: Generate invoices from project completion
  • Expense tracking: Categorize and approve expenses automatically
  • Financial reporting: Consolidate data from multiple financial sources

Communication & Collaboration

  • Notification automation: Send updates based on business events
  • Meeting scheduling: Coordinate calendars and resources
  • Document workflow: Route documents through approval processes

Setting Up Business Tool Integration

CRM Integration Framework

# crm_integration.py
import os
from typing import Dict, Any, List, Optional
from dataclasses import dataclass
from mcp.server.fastmcp import FastMCP
import requests
from datetime import datetime, timedelta

@dataclass
class Contact:
    id: str
    name: str
    email: str
    phone: Optional[str] = None
    company: Optional[str] = None
    status: str = "active"

@dataclass
class Deal:
    id: str
    title: str
    value: float
    stage: str
    contact_id: str
    probability: int = 50
    close_date: Optional[str] = None

class CRMIntegration:
    def __init__(self, crm_type: str, api_key: str, base_url: str):
        self.crm_type = crm_type
        self.api_key = api_key
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        self._setup_authentication()
    
    def _setup_authentication(self):
        """Setup authentication headers based on CRM type"""
        if self.crm_type == "salesforce":
            self.session.headers.update({
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json'
            })
        elif self.crm_type == "hubspot":
            self.session.headers.update({
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json'
            })
        elif self.crm_type == "pipedrive":
            self.session.params = {'api_token': self.api_key}
    
    def create_contact(self, contact: Contact) -> Dict[str, Any]:
        """Create contact in CRM system"""
        if self.crm_type == "salesforce":
            return self._salesforce_create_contact(contact)
        elif self.crm_type == "hubspot":
            return self._hubspot_create_contact(contact)
        elif self.crm_type == "pipedrive":
            return self._pipedrive_create_contact(contact)
        else:
            return {"error": f"Unsupported CRM type: {self.crm_type}"}
    
    def get_contacts(self, limit: int = 10, filters: Optional[Dict] = None) -> List[Contact]:
        """Retrieve contacts from CRM"""
        if self.crm_type == "salesforce":
            return self._salesforce_get_contacts(limit, filters)
        elif self.crm_type == "hubspot":
            return self._hubspot_get_contacts(limit, filters)
        elif self.crm_type == "pipedrive":
            return self._pipedrive_get_contacts(limit, filters)
        else:
            return []
    
    def create_deal(self, deal: Deal) -> Dict[str, Any]:
        """Create deal/opportunity in CRM"""
        if self.crm_type == "salesforce":
            return self._salesforce_create_deal(deal)
        elif self.crm_type == "hubspot":
            return self._hubspot_create_deal(deal)
        elif self.crm_type == "pipedrive":
            return self._pipedrive_create_deal(deal)
        else:
            return {"error": f"Unsupported CRM type: {self.crm_type}"}
    
    def _salesforce_create_contact(self, contact: Contact) -> Dict[str, Any]:
        """Create contact in Salesforce"""
        data = {
            "LastName": contact.name.split()[-1],
            "FirstName": " ".join(contact.name.split()[:-1]) if len(contact.name.split()) > 1 else "",
            "Email": contact.email,
            "Phone": contact.phone,
            "Account": {"Name": contact.company} if contact.company else None
        }
        
        response = self.session.post(f"{self.base_url}/services/data/v58.0/sobjects/Contact", json=data)
        return response.json()
    
    def _hubspot_create_contact(self, contact: Contact) -> Dict[str, Any]:
        """Create contact in HubSpot"""
        properties = {
            "email": contact.email,
            "firstname": contact.name.split()[0],
            "lastname": " ".join(contact.name.split()[1:]) if len(contact.name.split()) > 1 else "",
            "phone": contact.phone,
            "company": contact.company
        }
        
        data = {"properties": properties}
        response = self.session.post(f"{self.base_url}/crm/v3/objects/contacts", json=data)
        return response.json()
    
    def _pipedrive_create_contact(self, contact: Contact) -> Dict[str, Any]:
        """Create person in Pipedrive"""
        data = {
            "name": contact.name,
            "email": [contact.email],
            "phone": [contact.phone] if contact.phone else [],
            "org_name": contact.company
        }
        
        response = self.session.post(f"{self.base_url}/v1/persons", json=data)
        return response.json()

# Initialize CRM integration
crm = CRMIntegration(
    crm_type=os.getenv('CRM_TYPE', 'hubspot'),
    api_key=os.getenv('CRM_API_KEY'),
    base_url=os.getenv('CRM_BASE_URL')
)

mcp = FastMCP("BusinessToolIntegration")

@mcp.tool()
def create_crm_contact(name: str, email: str, phone: Optional[str] = None, company: Optional[str] = None) -> Dict[str, Any]:
    """Create new contact in CRM system"""
    contact = Contact(
        id="",  # Will be assigned by CRM
        name=name,
        email=email,
        phone=phone,
        company=company
    )
    return crm.create_contact(contact)

@mcp.tool()
def sync_contact_across_tools(contact_id: str, target_tools: List[str]) -> Dict[str, Any]:
    """Sync contact information across multiple business tools"""
    results = {}
    
    # Get contact from primary CRM
    contacts = crm.get_contacts(filters={"id": contact_id})
    if not contacts:
        return {"error": "Contact not found"}
    
    contact = contacts[0]
    
    # Sync to specified tools
    for tool in target_tools:
        if tool == "email_marketing":
            results[tool] = sync_to_email_marketing(contact)
        elif tool == "support_system":
            results[tool] = sync_to_support_system(contact)
        elif tool == "billing_system":
            results[tool] = sync_to_billing_system(contact)
        else:
            results[tool] = {"error": f"Unknown tool: {tool}"}
    
    return results

Project Management Integration

# project_management_integration.py
from dataclasses import dataclass
from datetime import datetime, date
from enum import Enum

class TaskStatus(Enum):
    TODO = "todo"
    IN_PROGRESS = "in_progress"
    REVIEW = "review"
    DONE = "done"

class Priority(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    URGENT = "urgent"

@dataclass
class Task:
    id: str
    title: str
    description: str
    assignee: Optional[str] = None
    due_date: Optional[date] = None
    status: TaskStatus = TaskStatus.TODO
    priority: Priority = Priority.MEDIUM
    project_id: Optional[str] = None
    tags: List[str] = None

@dataclass
class Project:
    id: str
    name: str
    description: str
    owner: str
    status: str = "active"
    start_date: Optional[date] = None
    end_date: Optional[date] = None

class ProjectManagementIntegration:
    def __init__(self, tool_type: str, api_key: str, base_url: str):
        self.tool_type = tool_type
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self._setup_authentication()
    
    def _setup_authentication(self):
        """Setup authentication for different PM tools"""
        if self.tool_type == "asana":
            self.session.headers.update({
                'Authorization': f'Bearer {self.api_key}',
                'Content-Type': 'application/json'
            })
        elif self.tool_type == "monday":
            self.session.headers.update({
                'Authorization': self.api_key,
                'Content-Type': 'application/json'
            })
        elif self.tool_type == "trello":
            self.session.params = {'key': self.api_key.split(':')[0], 'token': self.api_key.split(':')[1]}
    
    def create_task(self, task: Task) -> Dict[str, Any]:
        """Create task in project management tool"""
        if self.tool_type == "asana":
            return self._asana_create_task(task)
        elif self.tool_type == "monday":
            return self._monday_create_task(task)
        elif self.tool_type == "trello":
            return self._trello_create_task(task)
        else:
            return {"error": f"Unsupported PM tool: {self.tool_type}"}
    
    def update_task_status(self, task_id: str, status: TaskStatus) -> Dict[str, Any]:
        """Update task status across PM tools"""
        if self.tool_type == "asana":
            return self._asana_update_task(task_id, {"completed": status == TaskStatus.DONE})
        elif self.tool_type == "monday":
            return self._monday_update_task(task_id, {"status": status.value})
        elif self.tool_type == "trello":
            return self._trello_update_task(task_id, status)
        else:
            return {"error": f"Unsupported PM tool: {self.tool_type}"}
    
    def get_project_progress(self, project_id: str) -> Dict[str, Any]:
        """Get project progress and analytics"""
        tasks = self.get_tasks(project_id=project_id)
        
        total_tasks = len(tasks)
        completed_tasks = sum(1 for task in tasks if task.status == TaskStatus.DONE)
        in_progress_tasks = sum(1 for task in tasks if task.status == TaskStatus.IN_PROGRESS)
        
        progress_percentage = (completed_tasks / total_tasks * 100) if total_tasks > 0 else 0
        
        return {
            "project_id": project_id,
            "total_tasks": total_tasks,
            "completed_tasks": completed_tasks,
            "in_progress_tasks": in_progress_tasks,
            "progress_percentage": progress_percentage,
            "overdue_tasks": self._count_overdue_tasks(tasks)
        }
    
    def _count_overdue_tasks(self, tasks: List[Task]) -> int:
        """Count overdue tasks"""
        today = date.today()
        return sum(1 for task in tasks 
                  if task.due_date and task.due_date < today and task.status != TaskStatus.DONE)

# Initialize project management integration
pm = ProjectManagementIntegration(
    tool_type=os.getenv('PM_TOOL_TYPE', 'asana'),
    api_key=os.getenv('PM_API_KEY'),
    base_url=os.getenv('PM_BASE_URL')
)

@mcp.tool()
def create_project_task(title: str, description: str, assignee: Optional[str] = None, 
                       due_date: Optional[str] = None, priority: str = "medium") -> Dict[str, Any]:
    """Create new task in project management system"""
    task = Task(
        id="",  # Will be assigned by PM tool
        title=title,
        description=description,
        assignee=assignee,
        due_date=datetime.strptime(due_date, "%Y-%m-%d").date() if due_date else None,
        priority=Priority(priority)
    )
    return pm.create_task(task)

@mcp.tool()
def get_team_workload(team_members: List[str]) -> Dict[str, Any]:
    """Analyze team workload across project management tools"""
    workload_analysis = {}
    
    for member in team_members:
        tasks = pm.get_tasks(assignee=member)
        
        workload_analysis[member] = {
            "total_tasks": len(tasks),
            "active_tasks": sum(1 for task in tasks if task.status in [TaskStatus.TODO, TaskStatus.IN_PROGRESS]),
            "overdue_tasks": pm._count_overdue_tasks(tasks),
            "high_priority_tasks": sum(1 for task in tasks if task.priority == Priority.HIGH),
            "capacity_status": "overloaded" if len([t for t in tasks if t.status != TaskStatus.DONE]) > 10 else "normal"
        }
    
    return workload_analysis

@mcp.tool()
def automate_task_assignment(project_id: str, task_title: str, required_skills: List[str]) -> Dict[str, Any]:
    """Automatically assign task to best available team member"""
    # Get team members and their current workload
    team_workload = get_team_workload(pm.get_project_members(project_id))
    
    # Simple assignment logic (enhance with skills matching)
    best_assignee = min(team_workload.keys(), 
                       key=lambda member: team_workload[member]["active_tasks"])
    
    # Create and assign task
    task = Task(
        id="",
        title=task_title,
        description=f"Auto-assigned task requiring skills: {', '.join(required_skills)}",
        assignee=best_assignee,
        project_id=project_id
    )
    
    result = pm.create_task(task)
    result["assignment_reason"] = f"Assigned to {best_assignee} (lowest current workload)"
    
    return result

Accounting Software Integration

# accounting_integration.py
from decimal import Decimal
from dataclasses import dataclass

@dataclass
class Invoice:
    id: str
    customer_id: str
    amount: Decimal
    due_date: date
    status: str = "draft"
    line_items: List[Dict] = None

@dataclass
class Expense:
    id: str
    amount: Decimal
    category: str
    date: date
    description: str
    receipt_url: Optional[str] = None

class AccountingIntegration:
    def __init__(self, accounting_system: str, api_key: str, base_url: str):
        self.accounting_system = accounting_system
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self._setup_authentication()
    
    def create_invoice_from_project(self, project_id: str, hourly_rate: Decimal) -> Dict[str, Any]:
        """Create invoice based on project time tracking"""
        # Get project details and time entries
        project_data = pm.get_project(project_id)
        time_entries = pm.get_time_entries(project_id)
        
        total_hours = sum(entry.get("hours", 0) for entry in time_entries)
        total_amount = total_hours * hourly_rate
        
        invoice_data = {
            "customer_id": project_data.get("client_id"),
            "amount": total_amount,
            "due_date": (datetime.now() + timedelta(days=30)).date(),
            "line_items": [{
                "description": f"Project work: {project_data.get('name')}",
                "quantity": total_hours,
                "rate": hourly_rate,
                "amount": total_amount
            }]
        }
        
        if self.accounting_system == "quickbooks":
            return self._quickbooks_create_invoice(invoice_data)
        elif self.accounting_system == "xero":
            return self._xero_create_invoice(invoice_data)
        else:
            return {"error": f"Unsupported accounting system: {self.accounting_system}"}
    
    def categorize_expense(self, expense_description: str, amount: Decimal) -> str:
        """Auto-categorize expenses based on description"""
        categories = {
            "office": ["office", "supplies", "equipment", "furniture"],
            "travel": ["travel", "hotel", "flight", "taxi", "uber", "gas"],
            "meals": ["restaurant", "food", "lunch", "dinner", "catering"],
            "software": ["software", "subscription", "saas", "license"],
            "marketing": ["advertising", "marketing", "promotion", "ads"]
        }
        
        description_lower = expense_description.lower()
        
        for category, keywords in categories.items():
            if any(keyword in description_lower for keyword in keywords):
                return category
        
        return "general"

# Initialize accounting integration
accounting = AccountingIntegration(
    accounting_system=os.getenv('ACCOUNTING_SYSTEM', 'quickbooks'),
    api_key=os.getenv('ACCOUNTING_API_KEY'),
    base_url=os.getenv('ACCOUNTING_BASE_URL')
)

@mcp.tool()
def generate_project_invoice(project_id: str, hourly_rate: float) -> Dict[str, Any]:
    """Generate invoice for completed project work"""
    return accounting.create_invoice_from_project(project_id, Decimal(str(hourly_rate)))

@mcp.tool()
def process_expense_receipt(receipt_image_path: str) -> Dict[str, Any]:
    """Process expense receipt and create expense entry"""
    # Use document processing to extract receipt data
    from document_server import doc_processor
    
    receipt_data = doc_processor.process_document(receipt_image_path)
    
    if "error" in receipt_data:
        return receipt_data
    
    # Extract expense information
    text = receipt_data.get("text_content", "")
    
    # Basic extraction (enhance with better OCR and parsing)
    amount_match = re.search(r'\$?([\d,]+\.?\d*)', text)
    amount = Decimal(amount_match.group(1).replace(',', '')) if amount_match else Decimal('0')
    
    expense = Expense(
        id="",
        amount=amount,
        category=accounting.categorize_expense(text, amount),
        date=date.today(),
        description=f"Expense from receipt: {os.path.basename(receipt_image_path)}",
        receipt_url=receipt_image_path
    )
    
    return accounting.create_expense(expense)

Communication Platform Integration

# communication_integration.py
class CommunicationIntegration:
    def __init__(self):
        self.slack_token = os.getenv('SLACK_BOT_TOKEN')
        self.teams_webhook = os.getenv('TEAMS_WEBHOOK_URL')
        self.discord_token = os.getenv('DISCORD_BOT_TOKEN')
    
    def send_project_update(self, project_id: str, message: str, channels: List[str]) -> Dict[str, Any]:
        """Send project update to multiple communication channels"""
        results = {}
        
        for channel in channels:
            if channel.startswith('#'):  # Slack channel
                results[channel] = self._send_slack_message(channel, message)
            elif 'teams' in channel:  # Teams channel
                results[channel] = self._send_teams_message(message)
            elif channel.startswith('discord:'):  # Discord channel
                channel_id = channel.replace('discord:', '')
                results[channel] = self._send_discord_message(channel_id, message)
        
        return results
    
    def create_automated_notifications(self, trigger_event: str, notification_config: Dict) -> Dict[str, Any]:
        """Set up automated notifications based on business events"""
        notification_rules = {
            "task_completed": {
                "message_template": "✅ Task '{task_title}' has been completed by {assignee}",
                "channels": notification_config.get("channels", [])
            },
            "deal_closed": {
                "message_template": "🎉 Deal '{deal_title}' closed for ${deal_value}",
                "channels": notification_config.get("channels", [])
            },
            "invoice_paid": {
                "message_template": "💰 Invoice #{invoice_number} has been paid (${amount})",
                "channels": notification_config.get("channels", [])
            }
        }
        
        if trigger_event in notification_rules:
            return {"status": "configured", "rule": notification_rules[trigger_event]}
        else:
            return {"error": f"Unknown trigger event: {trigger_event}"}

comm = CommunicationIntegration()

@mcp.tool()
def send_cross_platform_notification(message: str, channels: List[str], priority: str = "normal") -> Dict[str, Any]:
    """Send notification across multiple communication platforms"""
    if priority == "urgent":
        message = f"🚨 URGENT: {message}"
    elif priority == "high":
        message = f"⚠️ HIGH PRIORITY: {message}"
    
    return comm.send_project_update("", message, channels)

@mcp.tool()
def setup_business_workflow_notifications(workflow_name: str, triggers: List[str], channels: List[str]) -> Dict[str, Any]:
    """Configure automated notifications for business workflows"""
    workflow_config = {
        "workflow_name": workflow_name,
        "triggers": triggers,
        "channels": channels,
        "created_at": datetime.now().isoformat()
    }
    
    # Store configuration (in practice, save to database)
    notification_configs = {}
    notification_configs[workflow_name] = workflow_config
    
    return {"status": "configured", "workflow": workflow_config}

Multi-Tool Workflow Orchestration

Customer Onboarding Workflow

@mcp.tool()
def automate_customer_onboarding(customer_name: str, customer_email: str, plan_type: str) -> Dict[str, Any]:
    """Automate complete customer onboarding across multiple tools"""
    results = {
        "workflow": "customer_onboarding",
        "customer": {"name": customer_name, "email": customer_email, "plan": plan_type},
        "steps": {}
    }
    
    # Step 1: Create customer in CRM
    crm_result = create_crm_contact(customer_name, customer_email)
    results["steps"]["crm_contact"] = crm_result
    
    if "error" not in crm_result:
        customer_id = crm_result.get("id")
        
        # Step 2: Create project for onboarding
        project_result = create_project_task(
            title=f"Onboard {customer_name}",
            description=f"Complete onboarding process for {customer_name} ({plan_type} plan)",
            due_date=(datetime.now() + timedelta(days=7)).strftime("%Y-%m-%d"),
            priority="high"
        )
        results["steps"]["onboarding_project"] = project_result
        
        # Step 3: Create welcome email sequence
        email_result = setup_email_sequence(customer_email, "welcome_series")
        results["steps"]["welcome_emails"] = email_result
        
        # Step 4: Set up billing
        billing_result = setup_customer_billing(customer_id, plan_type)
        results["steps"]["billing_setup"] = billing_result
        
        # Step 5: Send team notification
        notification_result = send_cross_platform_notification(
            f"New customer onboarded: {customer_name} ({plan_type} plan)",
            ["#sales", "#customer-success"],
            priority="normal"
        )
        results["steps"]["team_notification"] = notification_result
        
        # Step 6: Schedule follow-up
        followup_result = schedule_followup_task(customer_id, days=3)
        results["steps"]["followup_scheduled"] = followup_result
    
    return results

@mcp.tool()
def automate_deal_closure_workflow(deal_id: str) -> Dict[str, Any]:
    """Automate workflow when deal is closed"""
    results = {
        "workflow": "deal_closure",
        "deal_id": deal_id,
        "steps": {}
    }
    
    # Get deal information
    deal_info = crm.get_deal(deal_id)
    if not deal_info:
        return {"error": "Deal not found"}
    
    # Step 1: Update deal status in CRM
    crm_update = crm.update_deal_stage(deal_id, "closed_won")
    results["steps"]["crm_update"] = crm_update
    
    # Step 2: Generate invoice
    invoice_result = generate_project_invoice(
        deal_info.get("project_id"),
        deal_info.get("hourly_rate", 100)
    )
    results["steps"]["invoice_generated"] = invoice_result
    
    # Step 3: Create delivery project
    delivery_project = create_project_task(
        title=f"Delivery: {deal_info.get('title')}",
        description=f"Execute delivery for closed deal: {deal_info.get('title')}",
        assignee=deal_info.get("account_manager"),
        priority="high"
    )
    results["steps"]["delivery_project"] = delivery_project
    
    # Step 4: Notify team
    team_notification = send_cross_platform_notification(
        f"🎉 Deal closed: {deal_info.get('title')} - ${deal_info.get('value')}",
        ["#sales", "#delivery"],
        priority="high"
    )
    results["steps"]["team_notification"] = team_notification
    
    # Step 5: Update financial forecasting
    forecast_update = update_revenue_forecast(deal_info.get("value"), deal_info.get("close_date"))
    results["steps"]["forecast_update"] = forecast_update
    
    return results

def setup_email_sequence(email: str, sequence_type: str) -> Dict[str, Any]:
    """Set up automated email sequence"""
    # Implementation depends on email marketing tool
    return {"status": "configured", "email": email, "sequence": sequence_type}

def setup_customer_billing(customer_id: str, plan_type: str) -> Dict[str, Any]:
    """Set up billing for new customer"""
    # Implementation depends on billing system
    return {"status": "configured", "customer_id": customer_id, "plan": plan_type}

def schedule_followup_task(customer_id: str, days: int) -> Dict[str, Any]:
    """Schedule follow-up task"""
    followup_date = (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
    return create_project_task(
        title=f"Follow up with customer {customer_id}",
        description="Check in on customer onboarding progress",
        due_date=followup_date,
        priority="medium"
    )

def update_revenue_forecast(amount: float, close_date: str) -> Dict[str, Any]:
    """Update revenue forecasting with closed deal"""
    # Implementation depends on financial planning tool
    return {"status": "updated", "amount": amount, "date": close_date}

Business Intelligence Dashboard

@mcp.tool()
def generate_business_dashboard() -> Dict[str, Any]:
    """Generate comprehensive business dashboard from all integrated tools"""
    dashboard_data = {
        "generated_at": datetime.now().isoformat(),
        "crm_metrics": {},
        "project_metrics": {},
        "financial_metrics": {},
        "team_metrics": {}
    }
    
    # CRM Metrics
    leads = crm.get_contacts(limit=100, filters={"created_this_month": True})
    deals = crm.get_deals(filters={"stage": "open"})
    
    dashboard_data["crm_metrics"] = {
        "new_leads_this_month": len(leads),
        "open_deals": len(deals),
        "total_pipeline_value": sum(deal.get("value", 0) for deal in deals),
        "average_deal_size": sum(deal.get("value", 0) for deal in deals) / len(deals) if deals else 0
    }
    
    # Project Metrics
    all_projects = pm.get_projects()
    active_projects = [p for p in all_projects if p.get("status") == "active"]
    
    dashboard_data["project_metrics"] = {
        "active_projects": len(active_projects),
        "overdue_tasks": sum(pm._count_overdue_tasks(pm.get_tasks(project_id=p["id"])) for p in active_projects),
        "team_utilization": calculate_team_utilization()
    }
    
    # Financial Metrics
    recent_invoices = accounting.get_invoices(filters={"created_this_month": True})
    
    dashboard_data["financial_metrics"] = {
        "monthly_revenue": sum(inv.get("amount", 0) for inv in recent_invoices if inv.get("status") == "paid"),
        "outstanding_invoices": sum(inv.get("amount", 0) for inv in recent_invoices if inv.get("status") == "sent"),
        "monthly_expenses": accounting.get_monthly_expenses()
    }
    
    return dashboard_data

def calculate_team_utilization() -> Dict[str, float]:
    """Calculate team utilization across projects"""
    team_members = pm.get_team_members()
    utilization = {}
    
    for member in team_members:
        active_tasks = pm.get_tasks(assignee=member["id"], status="active")
        # Simple utilization calculation (enhance with time tracking)
        utilization[member["name"]] = min(len(active_tasks) * 10, 100)  # Cap at 100%
    
    return utilization

Testing Business Tool Integration

# test_business_integration.py
def test_integration_workflow():
    """Test complete business integration workflow"""
    print("Testing Business Tool Integration...")
    
    # Test CRM integration
    test_contact = create_crm_contact(
        name="Test Customer",
        email="[email protected]",
        company="Test Company"
    )
    print(f"✅ CRM Contact: {test_contact.get('id', 'Failed')}")
    
    # Test project management
    test_task = create_project_task(
        title="Test Integration Task",
        description="Testing PM integration",
        priority="medium"
    )
    print(f"✅ PM Task: {test_task.get('id', 'Failed')}")
    
    # Test communication
    notification_result = send_cross_platform_notification(
        "Integration test successful",
        ["#testing"],
        priority="low"
    )
    print(f"✅ Notification: {notification_result.get('status', 'Failed')}")
    
    print("Integration testing complete!")

if __name__ == "__main__":
    test_integration_workflow()

Security and Best Practices

API Rate Limiting

class RateLimitedIntegration:
    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.request_times = deque(maxlen=requests_per_minute)
    
    def make_request_with_limit(self, func, *args, **kwargs):
        """Make API request with rate limiting"""
        now = time.time()
        
        # Remove requests older than 1 minute
        while self.request_times and self.request_times[0] < now - 60:
            self.request_times.popleft()
        
        # Wait if we've hit the limit
        if len(self.request_times) >= self.requests_per_minute:
            sleep_time = 60 - (now - self.request_times[0])
            time.sleep(max(0, sleep_time))
        
        self.request_times.append(now)
        return func(*args, **kwargs)

Credential Management

class SecureCredentialManager:
    def __init__(self):
        self.credentials = {}
        self.load_from_environment()
    
    def load_from_environment(self):
        """Load credentials from environment variables"""
        required_vars = [
            'CRM_API_KEY', 'PM_API_KEY', 'ACCOUNTING_API_KEY',
            'SLACK_BOT_TOKEN', 'TEAMS_WEBHOOK_URL'
        ]
        
        for var in required_vars:
            value = os.getenv(var)
            if value:
                self.credentials[var] = value
            else:
                print(f"Warning: {var} not found in environment")
    
    def get_credential(self, service: str) -> Optional[str]:
        """Safely retrieve credential for service"""
        return self.credentials.get(service)

Next Steps

You’ve now learned to integrate AI with your core business tools. Next, explore:

Ready to transform your business? Start by identifying your most repetitive cross-tool workflows and automate them first for maximum impact.

if name == “main”: mcp.run()

Related Guides

Want More Step-by-Step Guides?

Get weekly implementation guides and practical MCP tutorials delivered to your inbox.

Subscribe for Weekly Guides