跳转至

最佳实践

基于 MCPStore 的生产环境经验,总结出的最佳实践指南,帮助您构建稳定、高效、可维护的智能体工具系统。

🎯 架构设计最佳实践

1. 上下文选择策略

Store 模式 vs Agent 模式

# ✅ 推荐:单一应用使用 Store 模式
store = MCPStore.setup_store()
context = store.for_store()
context.add_service({"name": "global-tool", "url": "https://api.com/mcp"})

# ✅ 推荐:多智能体系统使用 Agent 模式
agent1_context = store.for_agent("research_agent")
agent1_context.add_service({"name": "research-tools", "url": "https://research.com/mcp"})

agent2_context = store.for_agent("analysis_agent")
agent2_context.add_service({"name": "analysis-tools", "url": "https://analysis.com/mcp"})

# ❌ 避免:在单一应用中混用两种模式
# 这会导致服务管理混乱

数据空间隔离

# ✅ 推荐:不同项目使用独立数据空间
project_a_store = MCPStore.setup_store(mcp_config_file="projects/project_a/mcp.json")
project_b_store = MCPStore.setup_store(mcp_config_file="projects/project_b/mcp.json")

# ✅ 推荐:环境隔离
dev_store = MCPStore.setup_store(mcp_config_file="config/dev/mcp.json")
prod_store = MCPStore.setup_store(mcp_config_file="config/prod/mcp.json")

# ❌ 避免:在同一配置文件中混合不同环境的服务

2. 服务配置最佳实践

配置文件组织

{
  "mcpServers": {
    "weather-api": {
      "url": "https://weather.example.com/mcp",
      "transport": "streamable-http",
      "headers": {
        "Authorization": "Bearer ${WEATHER_API_TOKEN}",
        "User-Agent": "MCPStore/1.0"
      },
      "timeout": 30,
      "description": "天气查询服务 - 提供全球天气信息",
      "tags": ["weather", "external-api"],
      "contact": "weather-team@example.com"
    },
    "local-filesystem": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-filesystem", "/workspace"],
      "env": {
        "LOG_LEVEL": "info"
      },
      "working_dir": "/workspace",
      "description": "本地文件系统操作",
      "tags": ["filesystem", "local"]
    }
  },
  "version": "1.0.0",
  "description": "Production MCPStore configuration",
  "metadata": {
    "environment": "production",
    "team": "ai-platform",
    "last_updated": "2024-01-01T00:00:00Z"
  }
}

环境变量管理

# ✅ 推荐:使用环境变量管理敏感信息
export WEATHER_API_TOKEN="your-secret-token"
export DATABASE_URL="postgresql://user:pass@host:5432/db"
export LOG_LEVEL="info"

# ✅ 推荐:使用 .env 文件(不要提交到版本控制)
echo "WEATHER_API_TOKEN=your-secret-token" > .env
echo ".env" >> .gitignore

3. 监控配置最佳实践

# ✅ 推荐:生产环境监控配置
production_monitoring = {
    "health_check_seconds": 60,        # 生产环境较长间隔
    "tools_update_hours": 4,           # 4小时更新一次工具
    "reconnection_seconds": 120,       # 2分钟重连间隔
    "cleanup_hours": 24,               # 每天清理一次
    "enable_tools_update": True,
    "enable_reconnection": True,
    "update_tools_on_reconnection": True
}

# ✅ 推荐:开发环境监控配置
development_monitoring = {
    "health_check_seconds": 15,        # 开发环境快速检查
    "tools_update_hours": 1,           # 1小时更新一次
    "reconnection_seconds": 30,        # 30秒重连间隔
    "enable_tools_update": True,
    "enable_reconnection": True
}

store = MCPStore.setup_store(
    mcp_config_file="config/prod/mcp.json",
    monitoring=production_monitoring
)

🚀 性能优化最佳实践

1. 缓存策略

# ✅ 推荐:利用缓存优先架构
# 查询操作直接从缓存返回,速度极快
services = store.for_store().list_services()  # < 100ms
tools = store.for_store().list_tools()        # < 100ms

# ✅ 推荐:批量操作减少网络开销
services_config = [
    {"name": "service1", "url": "https://api1.com/mcp"},
    {"name": "service2", "url": "https://api2.com/mcp"},
    {"name": "service3", "url": "https://api3.com/mcp"}
]
result = store.for_store().batch_add_services(services_config)

# ❌ 避免:频繁的单个服务操作
# for config in services_config:
#     store.for_store().add_service(config)  # 多次网络请求

2. 异步操作

import asyncio

async def efficient_tool_calls():
    """高效的异步工具调用"""
    store = MCPStore.setup_store()
    context = store.for_store()

    # ✅ 推荐:并发执行多个工具调用
    tasks = [
        context.call_tool_async("weather_get_current", {"city": "北京"}),
        context.call_tool_async("weather_get_current", {"city": "上海"}),
        context.call_tool_async("weather_get_current", {"city": "广州"})
    ]

    results = await asyncio.gather(*tasks)
    return results

# ❌ 避免:串行执行异步操作
async def inefficient_tool_calls():
    context = store.for_store()
    results = []
    for city in ["北京", "上海", "广州"]:
        result = await context.call_tool_async("weather_get_current", {"city": city})
        results.append(result)
    return results

3. 连接管理

# ✅ 推荐:合理配置连接超时
service_config = {
    "name": "external-api",
    "url": "https://api.example.com/mcp",
    "timeout": 30,  # 30秒超时
    "headers": {
        "Connection": "keep-alive",  # 保持连接
        "Keep-Alive": "timeout=60"
    }
}

# ✅ 推荐:使用连接池
# MCPStore 内部自动管理连接池,无需手动配置

🔒 安全最佳实践

1. 敏感信息管理

# ✅ 推荐:使用环境变量
import os

api_token = os.getenv("API_TOKEN")
if not api_token:
    raise ValueError("API_TOKEN environment variable is required")

service_config = {
    "name": "secure-api",
    "url": "https://secure-api.com/mcp",
    "headers": {
        "Authorization": f"Bearer {api_token}"
    }
}

# ❌ 避免:硬编码敏感信息
# service_config = {
#     "name": "secure-api",
#     "url": "https://secure-api.com/mcp",
#     "headers": {
#         "Authorization": "Bearer hardcoded-token"  # 危险!
#     }
# }

2. 访问控制

# ✅ 推荐:使用 Agent 模式实现访问隔离
def create_restricted_agent(store, agent_id: str, allowed_services: List[str]):
    """创建受限制的 Agent"""
    agent_context = store.for_agent(agent_id)

    # 只添加允许的服务
    for service_name in allowed_services:
        service_config = get_service_config(service_name)
        agent_context.add_service(service_config)

    return agent_context

# 使用示例
research_agent = create_restricted_agent(
    store, 
    "research_agent", 
    ["search-api", "wikipedia", "arxiv"]
)

analysis_agent = create_restricted_agent(
    store,
    "analysis_agent", 
    ["database", "calculator", "chart-generator"]
)

3. 输入验证

def safe_tool_call(context, tool_name: str, args: dict):
    """安全的工具调用"""
    # ✅ 推荐:验证工具名称
    available_tools = {tool.name for tool in context.list_tools()}
    if tool_name not in available_tools:
        raise ValueError(f"Tool {tool_name} not available")

    # ✅ 推荐:验证参数
    if not isinstance(args, dict):
        raise TypeError("Arguments must be a dictionary")

    # ✅ 推荐:参数清理
    cleaned_args = {k: v for k, v in args.items() if not k.startswith('_')}

    try:
        return context.call_tool(tool_name, cleaned_args)
    except Exception as e:
        # ✅ 推荐:记录错误但不暴露敏感信息
        logger.error(f"Tool call failed: {tool_name}")
        raise RuntimeError("Tool execution failed") from e

🔧 错误处理最佳实践

1. 分层错误处理

import logging
from typing import Optional, Any

logger = logging.getLogger(__name__)

class MCPStoreManager:
    """MCPStore 管理器,实现分层错误处理"""

    def __init__(self, config_file: str):
        try:
            self.store = MCPStore.setup_store(mcp_config_file=config_file)
            self.context = self.store.for_store()
        except Exception as e:
            logger.critical(f"Failed to initialize MCPStore: {e}")
            raise

    def safe_add_service(self, config: dict) -> bool:
        """安全添加服务"""
        try:
            self.context.add_service(config)
            logger.info(f"Service {config.get('name')} added successfully")
            return True
        except ValidationError as e:
            logger.error(f"Service configuration invalid: {e}")
            return False
        except ConnectionError as e:
            logger.warning(f"Service connection failed: {e}")
            return False
        except Exception as e:
            logger.error(f"Unexpected error adding service: {e}")
            return False

    def safe_call_tool(self, tool_name: str, args: dict) -> Optional[Any]:
        """安全调用工具"""
        try:
            return self.context.call_tool(tool_name, args)
        except ToolNotFoundError:
            logger.warning(f"Tool {tool_name} not found")
            return None
        except ToolExecutionError as e:
            logger.error(f"Tool {tool_name} execution failed: {e}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error calling tool {tool_name}: {e}")
            return None

2. 重试机制

import time
from functools import wraps

def retry_on_failure(max_retries: int = 3, delay: float = 1.0):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except (ConnectionError, TimeoutError) as e:
                    last_exception = e
                    if attempt < max_retries - 1:
                        logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
                        time.sleep(delay * (2 ** attempt))  # 指数退避
                    else:
                        logger.error(f"All {max_retries} attempts failed")
                except Exception as e:
                    # 非网络错误不重试
                    logger.error(f"Non-retryable error: {e}")
                    raise

            raise last_exception
        return wrapper
    return decorator

class RobustMCPStore:
    """带重试机制的 MCPStore"""

    def __init__(self, config_file: str):
        self.store = MCPStore.setup_store(mcp_config_file=config_file)
        self.context = self.store.for_store()

    @retry_on_failure(max_retries=3, delay=1.0)
    def add_service(self, config: dict):
        """带重试的服务添加"""
        return self.context.add_service(config)

    @retry_on_failure(max_retries=2, delay=0.5)
    def call_tool(self, tool_name: str, args: dict):
        """带重试的工具调用"""
        return self.context.call_tool(tool_name, args)

📊 监控和日志最佳实践

1. 结构化日志

import json
import logging
from datetime import datetime

class StructuredLogger:
    """结构化日志记录器"""

    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)

        # 配置格式化器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )

        handler = logging.StreamHandler()
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def log_service_event(self, event_type: str, service_name: str, **kwargs):
        """记录服务事件"""
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": event_type,
            "service_name": service_name,
            "details": kwargs
        }
        self.logger.info(json.dumps(log_data))

    def log_tool_call(self, tool_name: str, args: dict, result: Any, duration: float):
        """记录工具调用"""
        log_data = {
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": "tool_call",
            "tool_name": tool_name,
            "args_count": len(args),
            "success": result is not None,
            "duration_ms": round(duration * 1000, 2)
        }
        self.logger.info(json.dumps(log_data))

# 使用示例
logger = StructuredLogger("mcpstore")

def monitored_tool_call(context, tool_name: str, args: dict):
    """带监控的工具调用"""
    start_time = time.time()
    try:
        result = context.call_tool(tool_name, args)
        duration = time.time() - start_time
        logger.log_tool_call(tool_name, args, result, duration)
        return result
    except Exception as e:
        duration = time.time() - start_time
        logger.log_tool_call(tool_name, args, None, duration)
        raise

2. 健康检查端点

from fastapi import FastAPI, HTTPException
from typing import Dict, Any

def create_health_check_app(store: MCPStore) -> FastAPI:
    """创建健康检查应用"""
    app = FastAPI(title="MCPStore Health Check")

    @app.get("/health")
    async def health_check() -> Dict[str, Any]:
        """基础健康检查"""
        try:
            context = store.for_store()
            services = context.list_services()
            tools = context.list_tools()

            return {
                "status": "healthy",
                "timestamp": datetime.utcnow().isoformat(),
                "services_count": len(services),
                "tools_count": len(tools)
            }
        except Exception as e:
            raise HTTPException(status_code=503, detail=f"Health check failed: {e}")

    @app.get("/health/detailed")
    async def detailed_health_check() -> Dict[str, Any]:
        """详细健康检查"""
        try:
            context = store.for_store()
            health_result = context.check_services()

            return {
                "status": "healthy" if health_result["success"] else "unhealthy",
                "timestamp": datetime.utcnow().isoformat(),
                "details": health_result
            }
        except Exception as e:
            raise HTTPException(status_code=503, detail=f"Detailed health check failed: {e}")

    return app

🧪 测试最佳实践

1. 单元测试

import pytest
from unittest.mock import Mock, patch
from mcpstore import MCPStore

class TestMCPStoreIntegration:
    """MCPStore 集成测试"""

    @pytest.fixture
    def mock_store(self):
        """模拟 MCPStore"""
        with patch('mcpstore.MCPStore.setup_store') as mock_setup:
            mock_store = Mock()
            mock_context = Mock()
            mock_store.for_store.return_value = mock_context
            mock_setup.return_value = mock_store
            yield mock_store, mock_context

    def test_service_registration(self, mock_store):
        """测试服务注册"""
        store, context = mock_store

        # 配置模拟
        context.add_service.return_value = context
        context.list_services.return_value = [
            Mock(name="test-service", status="healthy")
        ]

        # 执行测试
        context.add_service({"name": "test-service", "url": "https://test.com/mcp"})
        services = context.list_services()

        # 验证结果
        assert len(services) == 1
        assert services[0].name == "test-service"
        context.add_service.assert_called_once()

    def test_tool_calling(self, mock_store):
        """测试工具调用"""
        store, context = mock_store

        # 配置模拟
        context.call_tool.return_value = {"result": "success"}

        # 执行测试
        result = context.call_tool("test_tool", {"param": "value"})

        # 验证结果
        assert result["result"] == "success"
        context.call_tool.assert_called_once_with("test_tool", {"param": "value"})

2. 集成测试

import pytest
import tempfile
import json
from pathlib import Path

class TestMCPStoreIntegration:
    """MCPStore 真实集成测试"""

    @pytest.fixture
    def temp_config(self):
        """创建临时配置文件"""
        config = {
            "mcpServers": {
                "test-service": {
                    "command": "echo",
                    "args": ["Hello, MCP!"]
                }
            }
        }

        with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
            json.dump(config, f)
            config_path = f.name

        yield config_path

        # 清理
        Path(config_path).unlink(missing_ok=True)

    def test_real_service_integration(self, temp_config):
        """测试真实服务集成"""
        store = MCPStore.setup_store(mcp_config_file=temp_config)
        context = store.for_store()

        # 测试服务列表
        services = context.list_services()
        assert len(services) >= 0  # 可能没有服务,但不应该出错

        # 测试工具列表
        tools = context.list_tools()
        assert isinstance(tools, list)

📦 部署最佳实践

1. 容器化部署

# Dockerfile
FROM python:3.11-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非 root 用户
RUN useradd -m -u 1000 mcpstore && \
    chown -R mcpstore:mcpstore /app
USER mcpstore

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:18200/health || exit 1

# 暴露端口
EXPOSE 18200

# 启动命令
CMD ["python", "-m", "mcpstore.cli", "run", "api", "--host", "0.0.0.0"]

2. 生产环境配置

# docker-compose.yml
version: '3.8'

services:
  mcpstore:
    build: .
    ports:
      - "18200:18200"
    volumes:
      - ./config:/app/config:ro
      - ./logs:/app/logs
    environment:
      - MCPSTORE_CONFIG=/app/config/mcp.json
      - LOG_LEVEL=info
      - PYTHONUNBUFFERED=1
    restart: unless-stopped
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:18200/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf:ro
      - ./ssl:/etc/nginx/ssl:ro
    depends_on:
      - mcpstore
    restart: unless-stopped

3. 监控和告警

# monitoring.py
import psutil
import time
from prometheus_client import start_http_server, Gauge, Counter, Histogram

class MCPStoreMetrics:
    """MCPStore 指标收集器"""

    def __init__(self):
        # 定义指标
        self.active_services = Gauge('mcpstore_active_services', 'Number of active services')
        self.tool_calls = Counter('mcpstore_tool_calls_total', 'Total tool calls', ['tool_name', 'status'])
        self.response_time = Histogram('mcpstore_response_time_seconds', 'Response time', ['operation'])
        self.memory_usage = Gauge('mcpstore_memory_usage_bytes', 'Memory usage in bytes')
        self.cpu_usage = Gauge('mcpstore_cpu_usage_percent', 'CPU usage percentage')

        # 启动指标服务器
        start_http_server(8000)

    def update_system_metrics(self):
        """更新系统指标"""
        process = psutil.Process()
        self.memory_usage.set(process.memory_info().rss)
        self.cpu_usage.set(process.cpu_percent())

    def record_tool_call(self, tool_name: str, success: bool, duration: float):
        """记录工具调用指标"""
        status = 'success' if success else 'error'
        self.tool_calls.labels(tool_name=tool_name, status=status).inc()
        self.response_time.labels(operation='tool_call').observe(duration)

# 使用示例
metrics = MCPStoreMetrics()

def monitored_tool_call(context, tool_name: str, args: dict):
    """带指标收集的工具调用"""
    start_time = time.time()
    try:
        result = context.call_tool(tool_name, args)
        duration = time.time() - start_time
        metrics.record_tool_call(tool_name, True, duration)
        return result
    except Exception as e:
        duration = time.time() - start_time
        metrics.record_tool_call(tool_name, False, duration)
        raise

📋 检查清单

🚀 部署前检查

  • 配置文件格式正确且已验证
  • 环境变量已正确设置
  • 敏感信息未硬编码
  • 监控配置适合环境(开发/生产)
  • 日志级别配置正确
  • 健康检查端点正常工作
  • 错误处理机制完善
  • 资源限制已配置
  • 备份和恢复策略已制定

🔧 性能优化检查

  • 使用缓存优先架构
  • 异步操作替代同步操作
  • 批量操作减少网络开销
  • 连接池配置合理
  • 超时设置适当
  • 重试机制已实现
  • 资源清理机制完善

🔒 安全检查

  • 敏感信息使用环境变量
  • 访问控制机制已实现
  • 输入验证完善
  • 错误信息不暴露敏感数据
  • 日志记录不包含敏感信息
  • HTTPS 配置正确(生产环境)
  • 防火墙规则已配置

相关文档

下一步