系统架构¶
深入了解 MCPStore 的系统架构设计,掌握企业级 MCP 工具管理平台的技术实现。
🏗️ 整体架构概览¶
MCPStore 采用现代化的分层架构设计,确保可扩展性、可维护性和高性能:
graph TB
subgraph "用户接口层 (Interface Layer)"
SDK[Python SDK<br/>同步/异步]
API[REST API<br/>56个端点]
CLI[CLI Tool<br/>Typer CLI]
end
subgraph "上下文层 (Context Layer)"
StoreCtx[Store Context<br/>全局服务管理]
AgentCtx[Agent Context<br/>独立服务空间]
end
subgraph "业务逻辑层 (Business Layer)"
ServiceOps[Service Operations<br/>服务操作]
ToolOps[Tool Operations<br/>工具操作]
Monitoring[Monitoring<br/>监控系统]
end
subgraph "编排层 (Orchestration Layer)"
Orchestrator[MCPOrchestrator<br/>连接和调用管理]
Registry[ServiceRegistry<br/>服务注册和状态]
Lifecycle[LifecycleManager<br/>生命周期管理]
end
subgraph "协议层 (Protocol Layer)"
FastMCP[FastMCP<br/>高性能MCP实现]
MCPProtocol[MCP Protocol<br/>标准协议支持]
end
subgraph "存储层 (Storage Layer)"
Config[MCPConfig<br/>配置管理]
Cache[CacheManager<br/>缓存管理]
Models[Data Models<br/>数据模型]
end
%% 连接关系
SDK --> StoreCtx
SDK --> AgentCtx
API --> StoreCtx
API --> AgentCtx
CLI --> StoreCtx
StoreCtx --> ServiceOps
StoreCtx --> ToolOps
AgentCtx --> ServiceOps
AgentCtx --> ToolOps
ServiceOps --> Orchestrator
ToolOps --> Orchestrator
Monitoring --> Registry
Orchestrator --> Registry
Orchestrator --> Lifecycle
Registry --> Cache
Orchestrator --> FastMCP
FastMCP --> MCPProtocol
Registry --> Config
Lifecycle --> Config
Cache --> Models
%% 样式
classDef interface fill:#e1f5fe
classDef context fill:#f3e5f5
classDef business fill:#e8f5e8
classDef orchestration fill:#fff3e0
classDef protocol fill:#fce4ec
classDef storage fill:#f1f8e9
class SDK,API,CLI interface
class StoreCtx,AgentCtx context
class ServiceOps,ToolOps,Monitoring business
class Orchestrator,Registry,Lifecycle orchestration
class FastMCP,MCPProtocol protocol
class Config,Cache,Models storage
🎯 核心组件详解¶
1. MCPStore 主类 (Entry Point)¶
MCPStore 是系统的主入口,采用静态工厂模式:
class MCPStore:
"""智能体工具服务存储主类"""
@staticmethod
def setup_store(mcp_config_file=None, debug=False, monitoring=None) -> MCPStore:
"""静态工厂方法,推荐的初始化方式"""
def for_store(self) -> MCPStoreContext:
"""获取 Store 级别上下文"""
def for_agent(self, agent_id: str) -> MCPStoreContext:
"""获取 Agent 级别上下文"""
def start_api_server(self, host="0.0.0.0", port=18200):
"""启动内置 HTTP API 服务器"""
设计特点: - 单例模式: 每个配置文件对应一个实例 - 延迟初始化: 组件按需创建和缓存 - 数据空间隔离: 支持多项目独立配置
2. 上下文层 (Context Layer)¶
MCPStoreContext 类¶
上下文层是 MCPStore 的核心创新,提供统一的操作接口:
class MCPStoreContext:
"""MCPStore 操作上下文"""
def __init__(self, store: MCPStore, context_type: ContextType, agent_id: str = None):
self.context_type = context_type # STORE 或 AGENT
self.agent_id = agent_id
self._service_mapper = ServiceNameMapper() # 服务名称映射
# 服务操作
def add_service(self, config) -> 'MCPStoreContext'
def list_services(self) -> List[ServiceInfo]
def restart_service(self, name: str) -> bool
# 工具操作
def list_tools(self) -> List[ToolInfo]
def call_tool(self, tool_name: str, args: dict) -> Any
# LangChain 集成
def for_langchain(self) -> 'LangChainAdapter'
上下文切换机制¶
# Store 模式:全局服务管理
store_context = store.for_store()
store_context.add_service({"name": "global-service", "url": "https://api.com/mcp"})
# Agent 模式:独立服务空间
agent_context = store.for_agent("agent1")
agent_context.add_service({"name": "agent-service", "url": "https://agent.com/mcp"})
# 服务名称自动映射
# Store 看到: ["global-service", "agent-servicebyagent1"]
# Agent 看到: ["agent-service"] # 隐藏后缀
3. 业务逻辑层 (Business Layer)¶
业务逻辑层采用模块化设计,每个模块负责特定功能:
服务操作模块 (service_operations.py)¶
class ServiceOperations:
"""服务操作业务逻辑"""
def add_service(self, config, json_file=None):
"""添加服务,支持多种配置格式"""
def list_services(self) -> List[ServiceInfo]:
"""获取服务列表(缓存查询)"""
def get_service_info(self, name: str):
"""获取服务详细信息"""
def batch_add_services(self, services: List):
"""批量添加服务"""
工具操作模块 (tool_operations.py)¶
class ToolOperations:
"""工具操作业务逻辑"""
def list_tools(self) -> List[ToolInfo]:
"""获取工具列表(缓存查询)"""
def call_tool(self, tool_name: str, args: dict):
"""调用工具(统一接口)"""
def get_tools_with_stats(self) -> Dict[str, Any]:
"""获取工具列表和统计信息"""
监控操作模块 (monitoring_operations.py)¶
class MonitoringOperations:
"""监控系统业务逻辑"""
def check_services(self) -> Dict[str, Any]:
"""执行服务健康检查"""
def get_system_stats(self) -> Dict[str, Any]:
"""获取系统统计信息"""
4. 编排层 (Orchestration Layer)¶
MCPOrchestrator 编排器¶
编排器负责管理 MCP 连接和调用:
class MCPOrchestrator:
"""MCP 编排器 - 连接和调用管理"""
def __init__(self):
self.clients: Dict[str, Any] = {} # 客户端连接池
self.connection_manager = ConnectionManager()
async def call_tool(self, client_id: str, tool_name: str, args: dict):
"""调用工具(异步)"""
def restart_service(self, service_name: str, agent_id: str = None) -> bool:
"""重启服务"""
def get_client_tools(self, client_id: str) -> List[ToolInfo]:
"""获取客户端工具列表"""
ServiceRegistry 服务注册表¶
服务注册表管理服务状态和元数据:
class ServiceRegistry:
"""服务注册表 - 服务状态管理"""
def __init__(self):
self.services: Dict[str, ServiceInfo] = {}
self.tools: Dict[str, List[ToolInfo]] = {}
def register_service(self, service_info: ServiceInfo):
"""注册服务"""
def update_service_status(self, service_name: str, status: ServiceConnectionState):
"""更新服务状态"""
def get_all_services(self) -> List[ServiceInfo]:
"""获取所有服务(缓存查询)"""
5. 数据管理层 (Data Management)¶
配置管理¶
class MCPConfig:
"""MCP 配置管理器"""
def __init__(self, config_file: str):
self.config_file = config_file
self.data_dir = Path(config_file).parent # 数据空间目录
def load_config(self) -> Dict[str, Any]:
"""加载配置文件"""
def save_config(self, config: Dict[str, Any]):
"""保存配置文件"""
def validate_config(self, config: Dict[str, Any]) -> bool:
"""验证配置格式"""
客户端管理¶
class ClientManager:
"""客户端管理器 - Agent-Client 映射"""
def __init__(self, data_dir: Path):
self.agent_clients_file = data_dir / "agent_clients.json"
self.client_services_file = data_dir / "client_services.json"
def get_agent_clients(self, agent_id: str) -> List[str]:
"""获取 Agent 的客户端列表"""
def map_agent_to_client(self, agent_id: str, client_id: str):
"""建立 Agent-Client 映射"""
🔄 数据流架构¶
服务注册流程¶
sequenceDiagram
participant User as 用户
participant Context as MCPStoreContext
participant ServiceOps as ServiceOperations
participant Config as MCPConfig
participant Orchestrator as MCPOrchestrator
participant Registry as ServiceRegistry
participant Lifecycle as LifecycleManager
participant FastMCP as FastMCP Client
User->>Context: add_service(config)
Context->>ServiceOps: add_service(config)
ServiceOps->>Config: validate_config(config)
Config-->>ServiceOps: validation_result
ServiceOps->>Config: save_config(config)
Config-->>ServiceOps: config_saved
ServiceOps->>Orchestrator: create_client(config)
Orchestrator->>FastMCP: create_mcp_client(config)
FastMCP-->>Orchestrator: client_instance
Orchestrator->>Registry: register_service(service_info)
Registry-->>Orchestrator: service_registered
Orchestrator->>Lifecycle: initialize_service(service_name)
Lifecycle->>Registry: set_service_state(INITIALIZING)
Lifecycle->>FastMCP: connect_and_list_tools()
FastMCP-->>Lifecycle: tools_list
Lifecycle->>Registry: update_tools_cache(tools)
Lifecycle->>Registry: set_service_state(HEALTHY)
Registry-->>Context: service_ready
Context-->>User: MCPStoreContext (链式调用)
工具调用流程¶
sequenceDiagram
participant User as 用户
participant Context as MCPStoreContext
participant ToolOps as ToolOperations
participant Mapper as ServiceNameMapper
participant Registry as ServiceRegistry
participant Orchestrator as MCPOrchestrator
participant FastMCP as FastMCP Client
participant Service as MCP Service
User->>Context: call_tool(tool_name, args)
Context->>ToolOps: call_tool(tool_name, args)
alt Agent 模式
ToolOps->>Mapper: map_to_global_name(tool_name)
Mapper-->>ToolOps: global_tool_name
end
ToolOps->>Registry: resolve_tool(tool_name)
Registry-->>ToolOps: service_info, client_id
ToolOps->>Registry: check_service_health(service_name)
Registry-->>ToolOps: health_status
alt 服务不健康
ToolOps->>Registry: trigger_reconnection(service_name)
end
ToolOps->>Orchestrator: call_tool(client_id, tool_name, args)
Orchestrator->>FastMCP: call_tool(tool_name, args)
FastMCP->>Service: MCP Request
Service-->>FastMCP: MCP Response
FastMCP-->>Orchestrator: tool_result
Orchestrator-->>ToolOps: tool_result
ToolOps->>Registry: record_tool_call(tool_name, success, duration)
alt Agent 模式
ToolOps->>Mapper: format_result_for_agent(result)
Mapper-->>ToolOps: formatted_result
end
ToolOps-->>Context: final_result
Context-->>User: final_result
🚀 性能优化架构¶
1. 缓存优先设计¶
MCPStore 采用缓存优先的架构:
# 查询操作:直接从缓存返回
services = store.for_store().list_services() # < 100ms
tools = store.for_store().list_tools() # < 100ms
# 管理操作:触发缓存更新
store.for_store().add_service(config) # 更新缓存
store.for_store().restart_service(name) # 更新状态
2. 异步优先架构¶
所有 I/O 操作都提供异步版本:
# 同步版本(内部调用异步)
result = store.for_store().call_tool(name, args)
# 异步版本(直接异步调用)
result = await store.for_store().call_tool_async(name, args)
3. 连接池管理¶
class ConnectionManager:
"""连接池管理器"""
def __init__(self):
self.http_pool = HTTPConnectionPool()
self.stdio_pool = StdioConnectionPool()
def get_connection(self, service_config):
"""获取连接(复用现有连接)"""
def cleanup_idle_connections(self):
"""清理空闲连接"""
🔐 安全架构¶
1. 多层隔离机制¶
# 数据空间隔离
project_a = MCPStore.setup_store("project_a/mcp.json")
project_b = MCPStore.setup_store("project_b/mcp.json")
# Agent 级别隔离
agent1 = store.for_agent("agent1") # 独立服务空间
agent2 = store.for_agent("agent2") # 独立服务空间
# 配置文件隔离
# project_a/ 和 project_b/ 完全独立
2. 权限控制¶
class ServiceNameMapper:
"""服务名称映射器 - 实现访问控制"""
def map_to_global_name(self, local_name: str, agent_id: str) -> str:
"""本地名称 → 全局名称"""
def map_to_local_name(self, global_name: str, agent_id: str) -> str:
"""全局名称 → 本地名称"""
def filter_agent_services(self, services: List[ServiceInfo], agent_id: str):
"""过滤 Agent 可访问的服务"""
📊 监控架构¶
分层监控策略¶
class MonitoringSystem:
"""分层监控系统"""
def __init__(self, config: dict):
self.health_monitor = HealthMonitor(config["health_check_seconds"])
self.tools_monitor = ToolsUpdateMonitor(config["tools_update_hours"])
def start_monitoring(self):
"""启动监控系统"""
self.health_monitor.start() # 30秒间隔健康检查
self.tools_monitor.start() # 2小时间隔工具更新
监控数据流¶
graph TD
A[服务状态变更] --> B[事件触发]
B --> C[更新注册表]
C --> D[更新缓存]
D --> E[通知监控系统]
E --> F[记录日志]
E --> G[更新统计]
E --> H[触发告警]
🔌 扩展架构¶
插件化设计¶
MCPStore 支持多种插件扩展:
# 配置插件
class ConfigPlugin:
def load_config(self, path: str) -> dict
def validate_config(self, config: dict) -> bool
# 传输插件
class TransportPlugin:
def create_client(self, config: dict) -> Any
def call_tool(self, client: Any, name: str, args: dict) -> Any
# 监控插件
class MonitoringPlugin:
def on_service_status_change(self, service: str, status: str)
def on_tool_call(self, tool: str, args: dict, result: Any)
适配器架构¶
class LangChainAdapter:
"""LangChain 适配器"""
def list_tools(self) -> List[Tool]:
"""转换为 LangChain Tool 对象"""
def _enhance_description(self, tool_info: ToolInfo) -> str:
"""增强工具描述"""
def _convert_schema(self, input_schema: dict) -> Type[BaseModel]:
"""转换参数 Schema"""
🚀 部署架构¶
单机部署¶
# 开发环境
store = MCPStore.setup_store(debug=True)
store.start_api_server(host="127.0.0.1", port=8080, reload=True)
# 生产环境
store = MCPStore.setup_store(
mcp_config_file="production/mcp.json",
monitoring={
"health_check_seconds": 60,
"tools_update_hours": 4
}
)
store.start_api_server(host="0.0.0.0", port=18200)
容器化部署¶
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 18200
CMD ["python", "-m", "mcpstore.cli", "run", "api", "--host", "0.0.0.0"]
微服务架构¶
graph TB
subgraph "负载均衡层"
LB[Nginx<br/>负载均衡器]
SSL[SSL 终端]
end
subgraph "应用层"
API1[MCPStore API<br/>实例1:18200]
API2[MCPStore API<br/>实例2:18201]
API3[MCPStore API<br/>实例3:18202]
end
subgraph "配置层"
Config[配置文件<br/>mcp.json]
Secrets[密钥管理<br/>环境变量]
end
subgraph "监控层"
Prometheus[Prometheus<br/>指标收集]
Grafana[Grafana<br/>监控面板]
Logs[日志聚合<br/>ELK Stack]
end
subgraph "外部服务"
MCP1[MCP Service 1<br/>天气API]
MCP2[MCP Service 2<br/>数据库API]
MCP3[MCP Service 3<br/>文件系统]
end
%% 连接关系
SSL --> LB
LB --> API1
LB --> API2
LB --> API3
API1 --> Config
API2 --> Config
API3 --> Config
API1 --> Secrets
API2 --> Secrets
API3 --> Secrets
API1 --> MCP1
API1 --> MCP2
API2 --> MCP2
API2 --> MCP3
API3 --> MCP1
API3 --> MCP3
API1 --> Prometheus
API2 --> Prometheus
API3 --> Prometheus
Prometheus --> Grafana
API1 --> Logs
API2 --> Logs
API3 --> Logs
%% 样式
classDef lb fill:#e3f2fd
classDef app fill:#e8f5e8
classDef config fill:#fff3e0
classDef monitor fill:#f3e5f5
classDef external fill:#fce4ec
class LB,SSL lb
class API1,API2,API3 app
class Config,Secrets config
class Prometheus,Grafana,Logs monitor
class MCP1,MCP2,MCP3 external
# docker-compose.yml
version: '3.8'
services:
mcpstore-api:
build: .
ports:
- "18200:18200"
volumes:
- ./config:/app/config
environment:
- MCPSTORE_CONFIG=/app/config/mcp.json
nginx:
image: nginx:alpine
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
📈 可扩展性考虑¶
水平扩展¶
- 无状态设计: API 服务器无状态,支持负载均衡
- 配置外部化: 配置文件和数据目录可外部挂载
- 连接池: 支持连接复用和池化管理
垂直扩展¶
- 异步处理: 高并发异步 I/O
- 缓存优化: 多层缓存减少延迟
- 资源管理: 智能资源分配和清理