跳转至

FastMCP 深度集成

📋 概述

MCPStore 基于 FastMCP 构建,提供了与 FastMCP 的深度集成。本文档详细介绍如何充分利用 FastMCP 的高级特性,以及如何在 MCPStore 中扩展和自定义 FastMCP 功能。

🏗️ FastMCP 集成架构

graph TB
    subgraph "MCPStore 层"
        A[MCPStore API]
        B[Service Manager]
        C[Tool Manager]
        D[Connection Manager]
    end

    subgraph "FastMCP 适配层"
        E[FastMCP Adapter]
        F[Protocol Handler]
        G[Message Router]
        H[Session Manager]
    end

    subgraph "FastMCP 核心"
        I[FastMCP Client]
        J[FastMCP Server]
        K[Transport Layer]
        L[Protocol Engine]
    end

    subgraph "MCP 协议"
        M[JSON-RPC 2.0]
        N[WebSocket]
        O[HTTP/SSE]
        P[Stdio]
    end

    A --> E
    B --> F
    C --> G
    D --> H

    E --> I
    F --> J
    G --> K
    H --> L

    I --> M
    J --> N
    K --> O
    L --> P

🔧 FastMCP 客户端集成

高级客户端配置

from fastmcp import FastMCPClient
from fastmcp.transport import StdioTransport, WebSocketTransport
from fastmcp.protocol import MCPProtocol
import asyncio

class AdvancedFastMCPClient:
    """高级 FastMCP 客户端"""

    def __init__(self, service_config):
        self.service_config = service_config
        self.client = None
        self.transport = None
        self.protocol = None
        self.session_id = None

        # 高级配置
        self.retry_config = {
            'max_retries': 3,
            'retry_delay': 1.0,
            'exponential_backoff': True
        }

        self.timeout_config = {
            'connect_timeout': 30.0,
            'request_timeout': 60.0,
            'keepalive_timeout': 300.0
        }

        # 事件处理器
        self.event_handlers = {}

    async def initialize(self):
        """初始化客户端"""
        # 创建传输层
        self.transport = await self._create_transport()

        # 创建协议层
        self.protocol = MCPProtocol(
            transport=self.transport,
            timeout=self.timeout_config['request_timeout']
        )

        # 创建客户端
        self.client = FastMCPClient(
            protocol=self.protocol,
            retry_config=self.retry_config
        )

        # 设置事件处理器
        self._setup_event_handlers()

        # 建立连接
        await self.client.connect()

        # 初始化会话
        self.session_id = await self._initialize_session()

        print(f"✅ FastMCP 客户端初始化完成,会话ID: {self.session_id}")

    async def _create_transport(self):
        """创建传输层"""
        transport_type = self.service_config.get('transport', 'stdio')

        if transport_type == 'stdio':
            return StdioTransport(
                command=self.service_config['command'],
                args=self.service_config.get('args', []),
                env=self.service_config.get('env', {}),
                timeout=self.timeout_config['connect_timeout']
            )

        elif transport_type == 'websocket':
            return WebSocketTransport(
                url=self.service_config['url'],
                headers=self.service_config.get('headers', {}),
                timeout=self.timeout_config['connect_timeout']
            )

        else:
            raise ValueError(f"Unsupported transport type: {transport_type}")

    def _setup_event_handlers(self):
        """设置事件处理器"""
        # 连接事件
        self.client.on('connected', self._on_connected)
        self.client.on('disconnected', self._on_disconnected)
        self.client.on('error', self._on_error)

        # 协议事件
        self.client.on('notification', self._on_notification)
        self.client.on('request', self._on_request)

        # 工具事件
        self.client.on('tool_list_changed', self._on_tool_list_changed)
        self.client.on('resource_updated', self._on_resource_updated)

    async def _initialize_session(self):
        """初始化会话"""
        # 发送初始化请求
        init_result = await self.client.initialize({
            'protocolVersion': '2024-11-05',
            'capabilities': {
                'tools': {},
                'resources': {},
                'prompts': {},
                'logging': {}
            },
            'clientInfo': {
                'name': 'MCPStore',
                'version': '1.0.0'
            }
        })

        return init_result.get('sessionId')

    async def call_tool_advanced(self, tool_name, arguments, **options):
        """高级工具调用"""
        # 构造调用请求
        request = {
            'method': 'tools/call',
            'params': {
                'name': tool_name,
                'arguments': arguments
            }
        }

        # 添加高级选项
        if 'timeout' in options:
            request['timeout'] = options['timeout']

        if 'priority' in options:
            request['priority'] = options['priority']

        if 'trace_id' in options:
            request['trace_id'] = options['trace_id']

        # 执行调用
        try:
            result = await self.client.request(request)

            # 处理结果
            return self._process_tool_result(result)

        except Exception as e:
            # 错误处理
            return self._handle_tool_error(tool_name, arguments, e)

    def _process_tool_result(self, result):
        """处理工具结果"""
        if result.get('isError'):
            return {
                'success': False,
                'error': result.get('content', [{}])[0].get('text', 'Unknown error'),
                'error_code': result.get('errorCode')
            }
        else:
            content = result.get('content', [])
            if content:
                return {
                    'success': True,
                    'result': content[0].get('text', ''),
                    'metadata': result.get('metadata', {})
                }
            else:
                return {
                    'success': True,
                    'result': None
                }

    def _handle_tool_error(self, tool_name, arguments, error):
        """处理工具错误"""
        error_info = {
            'success': False,
            'tool_name': tool_name,
            'arguments': arguments,
            'error': str(error),
            'error_type': type(error).__name__
        }

        # 触发错误事件
        self._trigger_event('tool_error', error_info)

        return error_info

    # 事件处理器
    async def _on_connected(self, event):
        """连接建立事件"""
        print(f"🔗 FastMCP 客户端已连接")
        self._trigger_event('connected', event)

    async def _on_disconnected(self, event):
        """连接断开事件"""
        print(f"🔌 FastMCP 客户端已断开")
        self._trigger_event('disconnected', event)

    async def _on_error(self, event):
        """错误事件"""
        print(f"❌ FastMCP 客户端错误: {event}")
        self._trigger_event('error', event)

    async def _on_notification(self, notification):
        """通知事件"""
        print(f"📢 收到通知: {notification}")
        self._trigger_event('notification', notification)

    async def _on_request(self, request):
        """请求事件"""
        print(f"📨 收到请求: {request}")
        self._trigger_event('request', request)

    async def _on_tool_list_changed(self, event):
        """工具列表变更事件"""
        print(f"🛠️ 工具列表已更新")
        self._trigger_event('tool_list_changed', event)

    async def _on_resource_updated(self, event):
        """资源更新事件"""
        print(f"📦 资源已更新: {event}")
        self._trigger_event('resource_updated', event)

    def on(self, event_name, handler):
        """注册事件处理器"""
        if event_name not in self.event_handlers:
            self.event_handlers[event_name] = []
        self.event_handlers[event_name].append(handler)

    def _trigger_event(self, event_name, event_data):
        """触发事件"""
        handlers = self.event_handlers.get(event_name, [])
        for handler in handlers:
            try:
                if asyncio.iscoroutinefunction(handler):
                    asyncio.create_task(handler(event_data))
                else:
                    handler(event_data)
            except Exception as e:
                print(f"⚠️ 事件处理器错误: {e}")

    async def close(self):
        """关闭客户端"""
        if self.client:
            await self.client.close()
        if self.transport:
            await self.transport.close()

# 使用高级 FastMCP 客户端
async def test_advanced_client():
    """测试高级客户端"""

    service_config = {
        'transport': 'stdio',
        'command': 'npx',
        'args': ['-y', '@modelcontextprotocol/server-filesystem', '/tmp']
    }

    client = AdvancedFastMCPClient(service_config)

    # 注册事件处理器
    client.on('connected', lambda event: print("🎉 客户端连接成功"))
    client.on('tool_error', lambda error: print(f"🚨 工具错误: {error}"))

    try:
        # 初始化客户端
        await client.initialize()

        # 高级工具调用
        result = await client.call_tool_advanced(
            'read_file',
            {'path': '/tmp/test.txt'},
            timeout=30.0,
            priority='high',
            trace_id='test-trace-001'
        )

        print(f"📄 工具调用结果: {result}")

    finally:
        await client.close()

# 运行测试
# asyncio.run(test_advanced_client())

FastMCP 服务器集成

from fastmcp import FastMCPServer
from fastmcp.tools import Tool
from fastmcp.resources import Resource
import asyncio

class MCPStoreServer:
    """MCPStore 服务器"""

    def __init__(self, mcpstore):
        self.mcpstore = mcpstore
        self.server = FastMCPServer(
            name="MCPStore Server",
            version="1.0.0"
        )

        # 注册工具和资源
        self._register_tools()
        self._register_resources()
        self._setup_handlers()

    def _register_tools(self):
        """注册工具"""

        # 获取服务列表工具
        @self.server.tool("list_services")
        async def list_services() -> str:
            """列出所有可用的服务"""
            services = self.mcpstore.list_services()
            return f"可用服务: {[s['name'] for s in services]}"

        # 获取工具列表工具
        @self.server.tool("list_tools")
        async def list_tools(service_name: str = None) -> str:
            """列出工具"""
            tools = self.mcpstore.list_tools(service_name=service_name)
            return f"可用工具: {[t['name'] for t in tools]}"

        # 调用工具
        @self.server.tool("call_tool")
        async def call_tool(tool_name: str, arguments: dict) -> str:
            """调用指定工具"""
            try:
                result = self.mcpstore.call_tool(tool_name, arguments)
                return f"工具调用成功: {result}"
            except Exception as e:
                return f"工具调用失败: {str(e)}"

        # 批量调用工具
        @self.server.tool("batch_call")
        async def batch_call(calls: list) -> str:
            """批量调用工具"""
            try:
                results = self.mcpstore.batch_call(calls)
                successful = sum(1 for r in results if r.get('success'))
                return f"批量调用完成: {successful}/{len(results)} 成功"
            except Exception as e:
                return f"批量调用失败: {str(e)}"

    def _register_resources(self):
        """注册资源"""

        # 服务状态资源
        @self.server.resource("services/status")
        async def services_status() -> dict:
            """获取服务状态"""
            services = self.mcpstore.list_services()
            status_info = {}

            for service in services:
                try:
                    status = self.mcpstore.get_service_status(service['name'])
                    status_info[service['name']] = status
                except Exception as e:
                    status_info[service['name']] = f"error: {str(e)}"

            return status_info

        # 工具统计资源
        @self.server.resource("tools/statistics")
        async def tools_statistics() -> dict:
            """获取工具统计信息"""
            tools = self.mcpstore.list_tools()

            stats = {
                'total_tools': len(tools),
                'tools_by_service': {},
                'tools_by_category': {}
            }

            for tool in tools:
                service_name = tool.get('service_name', 'unknown')
                category = tool.get('category', 'uncategorized')

                stats['tools_by_service'][service_name] = stats['tools_by_service'].get(service_name, 0) + 1
                stats['tools_by_category'][category] = stats['tools_by_category'].get(category, 0) + 1

            return stats

    def _setup_handlers(self):
        """设置处理器"""

        @self.server.request_handler("custom/health_check")
        async def health_check(request):
            """健康检查处理器"""
            return {
                'status': 'healthy',
                'timestamp': time.time(),
                'services_count': len(self.mcpstore.list_services()),
                'tools_count': len(self.mcpstore.list_tools())
            }

        @self.server.notification_handler("custom/service_update")
        async def service_update(notification):
            """服务更新通知处理器"""
            service_name = notification.get('service_name')
            action = notification.get('action')

            print(f"📢 服务更新通知: {service_name} - {action}")

            # 可以在这里触发相应的操作
            if action == 'restart':
                try:
                    self.mcpstore.restart_service(service_name)
                    print(f"✅ 服务 {service_name} 重启成功")
                except Exception as e:
                    print(f"❌ 服务 {service_name} 重启失败: {e}")

    async def start(self, transport_config):
        """启动服务器"""
        await self.server.start(transport_config)
        print(f"🚀 MCPStore 服务器已启动")

    async def stop(self):
        """停止服务器"""
        await self.server.stop()
        print(f"🛑 MCPStore 服务器已停止")

# 使用 MCPStore 服务器
async def run_mcpstore_server():
    """运行 MCPStore 服务器"""
    from mcpstore import MCPStore

    # 初始化 MCPStore
    store = MCPStore()
    store.add_service({
        "mcpServers": {
            "filesystem": {
                "command": "npx",
                "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
            }
        }
    })

    # 创建服务器
    server = MCPStoreServer(store)

    # 配置传输
    transport_config = {
        'type': 'stdio'  # 或 'websocket', 'http'
    }

    try:
        await server.start(transport_config)

        # 保持服务器运行
        await asyncio.Event().wait()

    except KeyboardInterrupt:
        print("🔌 收到中断信号")
    finally:
        await server.stop()

# 运行服务器
# asyncio.run(run_mcpstore_server())

🔄 FastMCP 协议扩展

自定义协议扩展

from fastmcp.protocol import MCPProtocol
from fastmcp.messages import Request, Response, Notification

class ExtendedMCPProtocol(MCPProtocol):
    """扩展的 MCP 协议"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # 注册自定义方法
        self.register_method('mcpstore/batch_call', self._handle_batch_call)
        self.register_method('mcpstore/service_status', self._handle_service_status)
        self.register_method('mcpstore/tool_search', self._handle_tool_search)

        # 注册自定义通知
        self.register_notification('mcpstore/service_changed', self._handle_service_changed)
        self.register_notification('mcpstore/tool_updated', self._handle_tool_updated)

    async def _handle_batch_call(self, request: Request) -> Response:
        """处理批量调用请求"""
        calls = request.params.get('calls', [])

        results = []
        for call in calls:
            try:
                # 执行单个调用
                tool_result = await self._execute_tool_call(
                    call.get('tool_name'),
                    call.get('arguments', {})
                )
                results.append({
                    'success': True,
                    'result': tool_result
                })
            except Exception as e:
                results.append({
                    'success': False,
                    'error': str(e)
                })

        return Response(
            id=request.id,
            result={
                'results': results,
                'total': len(calls),
                'successful': sum(1 for r in results if r['success'])
            }
        )

    async def _handle_service_status(self, request: Request) -> Response:
        """处理服务状态请求"""
        service_name = request.params.get('service_name')

        try:
            # 获取服务状态
            status = await self._get_service_status(service_name)

            return Response(
                id=request.id,
                result={
                    'service_name': service_name,
                    'status': status,
                    'timestamp': time.time()
                }
            )
        except Exception as e:
            return Response(
                id=request.id,
                error={
                    'code': -32000,
                    'message': f"Failed to get service status: {str(e)}"
                }
            )

    async def _handle_tool_search(self, request: Request) -> Response:
        """处理工具搜索请求"""
        query = request.params.get('query', '')
        filters = request.params.get('filters', {})

        try:
            # 执行工具搜索
            tools = await self._search_tools(query, filters)

            return Response(
                id=request.id,
                result={
                    'query': query,
                    'filters': filters,
                    'tools': tools,
                    'count': len(tools)
                }
            )
        except Exception as e:
            return Response(
                id=request.id,
                error={
                    'code': -32000,
                    'message': f"Tool search failed: {str(e)}"
                }
            )

    async def _handle_service_changed(self, notification: Notification):
        """处理服务变更通知"""
        service_name = notification.params.get('service_name')
        change_type = notification.params.get('change_type')

        print(f"📢 服务变更通知: {service_name} - {change_type}")

        # 触发相应的处理逻辑
        if change_type == 'added':
            await self._on_service_added(service_name)
        elif change_type == 'removed':
            await self._on_service_removed(service_name)
        elif change_type == 'updated':
            await self._on_service_updated(service_name)

    async def _handle_tool_updated(self, notification: Notification):
        """处理工具更新通知"""
        tool_name = notification.params.get('tool_name')
        service_name = notification.params.get('service_name')

        print(f"🛠️ 工具更新通知: {tool_name} @ {service_name}")

        # 刷新工具缓存
        await self._refresh_tool_cache(service_name)

    # 辅助方法
    async def _execute_tool_call(self, tool_name, arguments):
        """执行工具调用"""
        # 这里应该调用实际的工具执行逻辑
        pass

    async def _get_service_status(self, service_name):
        """获取服务状态"""
        # 这里应该调用实际的状态获取逻辑
        pass

    async def _search_tools(self, query, filters):
        """搜索工具"""
        # 这里应该调用实际的工具搜索逻辑
        pass

    async def _on_service_added(self, service_name):
        """服务添加处理"""
        pass

    async def _on_service_removed(self, service_name):
        """服务移除处理"""
        pass

    async def _on_service_updated(self, service_name):
        """服务更新处理"""
        pass

    async def _refresh_tool_cache(self, service_name):
        """刷新工具缓存"""
        pass

# 使用扩展协议
class MCPStoreWithExtendedProtocol:
    """使用扩展协议的 MCPStore"""

    def __init__(self, mcpstore):
        self.mcpstore = mcpstore
        self.protocol = None

    async def initialize_with_extended_protocol(self, transport):
        """使用扩展协议初始化"""
        self.protocol = ExtendedMCPProtocol(transport)

        # 设置协议处理器
        self._setup_protocol_handlers()

        await self.protocol.start()

    def _setup_protocol_handlers(self):
        """设置协议处理器"""
        # 将 MCPStore 方法绑定到协议处理器
        self.protocol._execute_tool_call = self._execute_tool_call
        self.protocol._get_service_status = self._get_service_status
        self.protocol._search_tools = self._search_tools

    async def _execute_tool_call(self, tool_name, arguments):
        """执行工具调用"""
        return self.mcpstore.call_tool(tool_name, arguments)

    async def _get_service_status(self, service_name):
        """获取服务状态"""
        return self.mcpstore.get_service_status(service_name)

    async def _search_tools(self, query, filters):
        """搜索工具"""
        # 实现工具搜索逻辑
        tools = self.mcpstore.list_tools()

        # 简单的查询过滤
        filtered_tools = []
        for tool in tools:
            if query.lower() in tool.get('name', '').lower() or \
               query.lower() in tool.get('description', '').lower():
                filtered_tools.append(tool)

        return filtered_tools

🔗 相关文档

📚 FastMCP 集成最佳实践

  1. 协议扩展:合理扩展 MCP 协议,添加自定义功能
  2. 事件处理:充分利用 FastMCP 的事件机制
  3. 错误处理:实现完善的协议级错误处理
  4. 性能优化:使用 FastMCP 的高级特性优化性能
  5. 兼容性:确保扩展功能与标准 MCP 协议兼容
  6. 监控日志:记录协议交互和性能指标

更新时间: 2025-01-09
版本: 1.0.0