跳转至

插件开发

MCPStore 提供强大的插件化架构,支持多种类型的扩展开发,让您可以根据需求定制和扩展功能。

🔌 插件架构概览

MCPStore 的插件系统基于接口和事件驱动的设计:

┌─────────────────────────────────────────────────────────────┐
│                    MCPStore 核心                            │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │   配置插件   │  │   传输插件   │  │   监控插件   │         │
│  │ConfigPlugin │  │TransportPlug│  │MonitorPlug  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │   适配器插件 │  │   存储插件   │  │   认证插件   │         │
│  │AdapterPlugin│  │StoragePlugin│  │ AuthPlugin  │         │
│  └─────────────┘  └─────────────┘  └─────────────┘         │
└─────────────────────────────────────────────────────────────┘

🎯 插件类型

1. 配置插件 (Configuration Plugins)

扩展配置文件格式支持,如 YAML、TOML 等。

基础接口

from abc import ABC, abstractmethod
from typing import Dict, Any

class ConfigPlugin(ABC):
    """配置插件基础接口"""

    @abstractmethod
    def load_config(self, file_path: str) -> Dict[str, Any]:
        """加载配置文件"""
        pass

    @abstractmethod
    def save_config(self, config: Dict[str, Any], file_path: str) -> bool:
        """保存配置文件"""
        pass

    @abstractmethod
    def validate_config(self, config: Dict[str, Any]) -> bool:
        """验证配置格式"""
        pass

    @property
    @abstractmethod
    def supported_extensions(self) -> List[str]:
        """支持的文件扩展名"""
        pass

YAML 配置插件示例

import yaml
from typing import Dict, Any, List
from mcpstore.plugins.base import ConfigPlugin

class YAMLConfigPlugin(ConfigPlugin):
    """YAML 配置插件"""

    def load_config(self, file_path: str) -> Dict[str, Any]:
        """加载 YAML 配置文件"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                config = yaml.safe_load(f)
            return self._convert_to_mcp_format(config)
        except Exception as e:
            raise ConfigLoadError(f"Failed to load YAML config: {e}")

    def save_config(self, config: Dict[str, Any], file_path: str) -> bool:
        """保存为 YAML 格式"""
        try:
            yaml_config = self._convert_from_mcp_format(config)
            with open(file_path, 'w', encoding='utf-8') as f:
                yaml.dump(yaml_config, f, default_flow_style=False, 
                         allow_unicode=True, indent=2)
            return True
        except Exception as e:
            print(f"Failed to save YAML config: {e}")
            return False

    def validate_config(self, config: Dict[str, Any]) -> bool:
        """验证 YAML 配置"""
        required_fields = ['mcpServers']
        return all(field in config for field in required_fields)

    @property
    def supported_extensions(self) -> List[str]:
        return ['.yaml', '.yml']

    def _convert_to_mcp_format(self, yaml_config: Dict[str, Any]) -> Dict[str, Any]:
        """将 YAML 格式转换为 MCP 标准格式"""
        # 实现格式转换逻辑
        return {
            "mcpServers": yaml_config.get("services", {}),
            "version": yaml_config.get("version", "1.0.0")
        }

    def _convert_from_mcp_format(self, mcp_config: Dict[str, Any]) -> Dict[str, Any]:
        """将 MCP 格式转换为 YAML 格式"""
        return {
            "version": mcp_config.get("version", "1.0.0"),
            "services": mcp_config.get("mcpServers", {})
        }

# 注册插件
from mcpstore.plugins import register_config_plugin
register_config_plugin(YAMLConfigPlugin())

2. 传输插件 (Transport Plugins)

支持新的传输协议,如 WebSocket、gRPC 等。

基础接口

from abc import ABC, abstractmethod
from typing import Any, Dict, List

class TransportPlugin(ABC):
    """传输插件基础接口"""

    @abstractmethod
    def create_client(self, config: Dict[str, Any]) -> Any:
        """创建客户端连接"""
        pass

    @abstractmethod
    async def call_tool(self, client: Any, tool_name: str, args: Dict[str, Any]) -> Any:
        """调用工具"""
        pass

    @abstractmethod
    async def list_tools(self, client: Any) -> List[Dict[str, Any]]:
        """获取工具列表"""
        pass

    @abstractmethod
    async def close_client(self, client: Any) -> None:
        """关闭客户端连接"""
        pass

    @property
    @abstractmethod
    def transport_type(self) -> str:
        """传输类型标识"""
        pass

WebSocket 传输插件示例

import asyncio
import websockets
import json
from typing import Any, Dict, List
from mcpstore.plugins.base import TransportPlugin

class WebSocketTransportPlugin(TransportPlugin):
    """WebSocket 传输插件"""

    def create_client(self, config: Dict[str, Any]) -> Any:
        """创建 WebSocket 客户端"""
        return {
            'url': config['url'],
            'headers': config.get('headers', {}),
            'connection': None
        }

    async def call_tool(self, client: Any, tool_name: str, args: Dict[str, Any]) -> Any:
        """通过 WebSocket 调用工具"""
        if not client['connection']:
            client['connection'] = await websockets.connect(
                client['url'], 
                extra_headers=client['headers']
            )

        # 构造 MCP 请求
        request = {
            "jsonrpc": "2.0",
            "id": f"call_{tool_name}_{asyncio.get_event_loop().time()}",
            "method": "tools/call",
            "params": {
                "name": tool_name,
                "arguments": args
            }
        }

        # 发送请求
        await client['connection'].send(json.dumps(request))

        # 接收响应
        response = await client['connection'].recv()
        result = json.loads(response)

        if 'error' in result:
            raise ToolCallError(result['error']['message'])

        return result.get('result')

    async def list_tools(self, client: Any) -> List[Dict[str, Any]]:
        """获取工具列表"""
        if not client['connection']:
            client['connection'] = await websockets.connect(
                client['url'],
                extra_headers=client['headers']
            )

        request = {
            "jsonrpc": "2.0",
            "id": "list_tools",
            "method": "tools/list"
        }

        await client['connection'].send(json.dumps(request))
        response = await client['connection'].recv()
        result = json.loads(response)

        return result.get('result', {}).get('tools', [])

    async def close_client(self, client: Any) -> None:
        """关闭 WebSocket 连接"""
        if client['connection']:
            await client['connection'].close()
            client['connection'] = None

    @property
    def transport_type(self) -> str:
        return "websocket"

# 注册插件
from mcpstore.plugins import register_transport_plugin
register_transport_plugin(WebSocketTransportPlugin())

3. 监控插件 (Monitoring Plugins)

扩展监控和告警功能。

基础接口

from abc import ABC, abstractmethod
from typing import Dict, Any
from datetime import datetime

class MonitoringPlugin(ABC):
    """监控插件基础接口"""

    @abstractmethod
    def on_service_status_change(self, service_name: str, old_status: str, new_status: str):
        """服务状态变更事件"""
        pass

    @abstractmethod
    def on_tool_call(self, tool_name: str, args: Dict[str, Any], result: Any, duration: float):
        """工具调用事件"""
        pass

    @abstractmethod
    def on_error(self, error_type: str, error_message: str, context: Dict[str, Any]):
        """错误事件"""
        pass

    @abstractmethod
    def get_metrics(self) -> Dict[str, Any]:
        """获取监控指标"""
        pass

Prometheus 监控插件示例

from prometheus_client import Counter, Histogram, Gauge, start_http_server
from typing import Dict, Any
from mcpstore.plugins.base import MonitoringPlugin

class PrometheusMonitoringPlugin(MonitoringPlugin):
    """Prometheus 监控插件"""

    def __init__(self, port: int = 8000):
        self.port = port

        # 定义指标
        self.service_status_changes = Counter(
            'mcpstore_service_status_changes_total',
            'Total service status changes',
            ['service_name', 'old_status', 'new_status']
        )

        self.tool_calls = Counter(
            'mcpstore_tool_calls_total',
            'Total tool calls',
            ['tool_name', 'status']
        )

        self.tool_call_duration = Histogram(
            'mcpstore_tool_call_duration_seconds',
            'Tool call duration',
            ['tool_name']
        )

        self.active_services = Gauge(
            'mcpstore_active_services',
            'Number of active services'
        )

        self.errors = Counter(
            'mcpstore_errors_total',
            'Total errors',
            ['error_type']
        )

        # 启动 Prometheus HTTP 服务器
        start_http_server(self.port)

    def on_service_status_change(self, service_name: str, old_status: str, new_status: str):
        """记录服务状态变更"""
        self.service_status_changes.labels(
            service_name=service_name,
            old_status=old_status,
            new_status=new_status
        ).inc()

        # 更新活跃服务数量
        if new_status == 'healthy':
            self.active_services.inc()
        elif old_status == 'healthy':
            self.active_services.dec()

    def on_tool_call(self, tool_name: str, args: Dict[str, Any], result: Any, duration: float):
        """记录工具调用"""
        status = 'success' if result is not None else 'error'

        self.tool_calls.labels(
            tool_name=tool_name,
            status=status
        ).inc()

        self.tool_call_duration.labels(
            tool_name=tool_name
        ).observe(duration)

    def on_error(self, error_type: str, error_message: str, context: Dict[str, Any]):
        """记录错误"""
        self.errors.labels(error_type=error_type).inc()

    def get_metrics(self) -> Dict[str, Any]:
        """获取当前指标"""
        return {
            'prometheus_port': self.port,
            'metrics_endpoint': f'http://localhost:{self.port}/metrics'
        }

# 注册插件
from mcpstore.plugins import register_monitoring_plugin
register_monitoring_plugin(PrometheusMonitoringPlugin())

4. 适配器插件 (Adapter Plugins)

集成其他 AI 框架,如 CrewAI、AutoGen 等。

CrewAI 适配器示例

from typing import List, Any, Dict
from mcpstore.plugins.base import AdapterPlugin

class CrewAIAdapter(AdapterPlugin):
    """CrewAI 适配器插件"""

    def __init__(self, context):
        self.context = context

    def to_crewai_tools(self) -> List[Any]:
        """转换为 CrewAI Tool 对象"""
        from crewai_tools import BaseTool

        tools = self.context.list_tools()
        crewai_tools = []

        for tool in tools:
            crewai_tool = self._create_crewai_tool(tool)
            crewai_tools.append(crewai_tool)

        return crewai_tools

    def _create_crewai_tool(self, tool_info) -> Any:
        """创建 CrewAI Tool 对象"""
        from crewai_tools import BaseTool
        from pydantic import BaseModel, Field

        # 动态创建参数模型
        if tool_info.inputSchema:
            args_schema = self._create_pydantic_model(tool_info.inputSchema)
        else:
            args_schema = BaseModel

        class MCPTool(BaseTool):
            name: str = tool_info.name
            description: str = tool_info.description
            args_schema: type = args_schema

            def _run(self, **kwargs) -> str:
                # 调用 MCPStore 工具
                result = self.context.call_tool(tool_info.name, kwargs)
                return str(result)

        return MCPTool()

    def _create_pydantic_model(self, schema: Dict[str, Any]) -> type:
        """从 JSON Schema 创建 Pydantic 模型"""
        from pydantic import BaseModel, Field, create_model

        fields = {}
        properties = schema.get('properties', {})
        required = schema.get('required', [])

        for field_name, field_schema in properties.items():
            field_type = self._json_type_to_python(field_schema.get('type', 'string'))
            field_description = field_schema.get('description', '')
            field_required = field_name in required

            if field_required:
                fields[field_name] = (field_type, Field(description=field_description))
            else:
                fields[field_name] = (field_type, Field(None, description=field_description))

        return create_model('ToolArgs', **fields)

    def _json_type_to_python(self, json_type: str) -> type:
        """JSON 类型转 Python 类型"""
        type_mapping = {
            'string': str,
            'integer': int,
            'number': float,
            'boolean': bool,
            'array': list,
            'object': dict
        }
        return type_mapping.get(json_type, str)

# 使用示例
def setup_crewai_integration(store):
    """设置 CrewAI 集成"""
    from crewai import Agent, Task, Crew

    # 获取 MCPStore 工具
    context = store.for_store()
    adapter = CrewAIAdapter(context)
    tools = adapter.to_crewai_tools()

    # 创建 CrewAI Agent
    agent = Agent(
        role='Research Assistant',
        goal='Help with research tasks using MCP tools',
        backstory='An AI assistant with access to various tools',
        tools=tools,
        verbose=True
    )

    # 创建任务
    task = Task(
        description='Use the available tools to complete the research',
        agent=agent
    )

    # 创建团队
    crew = Crew(
        agents=[agent],
        tasks=[task],
        verbose=True
    )

    return crew

🔧 插件注册和管理

插件注册系统

class PluginManager:
    """插件管理器"""

    def __init__(self):
        self.config_plugins: Dict[str, ConfigPlugin] = {}
        self.transport_plugins: Dict[str, TransportPlugin] = {}
        self.monitoring_plugins: List[MonitoringPlugin] = []
        self.adapter_plugins: Dict[str, AdapterPlugin] = {}

    def register_config_plugin(self, plugin: ConfigPlugin):
        """注册配置插件"""
        for ext in plugin.supported_extensions:
            self.config_plugins[ext] = plugin

    def register_transport_plugin(self, plugin: TransportPlugin):
        """注册传输插件"""
        self.transport_plugins[plugin.transport_type] = plugin

    def register_monitoring_plugin(self, plugin: MonitoringPlugin):
        """注册监控插件"""
        self.monitoring_plugins.append(plugin)

    def get_config_plugin(self, file_extension: str) -> ConfigPlugin:
        """获取配置插件"""
        return self.config_plugins.get(file_extension)

    def get_transport_plugin(self, transport_type: str) -> TransportPlugin:
        """获取传输插件"""
        return self.transport_plugins.get(transport_type)

    def notify_monitoring_plugins(self, event_type: str, **kwargs):
        """通知监控插件"""
        for plugin in self.monitoring_plugins:
            if event_type == 'service_status_change':
                plugin.on_service_status_change(**kwargs)
            elif event_type == 'tool_call':
                plugin.on_tool_call(**kwargs)
            elif event_type == 'error':
                plugin.on_error(**kwargs)

# 全局插件管理器
plugin_manager = PluginManager()

# 便捷注册函数
def register_config_plugin(plugin: ConfigPlugin):
    plugin_manager.register_config_plugin(plugin)

def register_transport_plugin(plugin: TransportPlugin):
    plugin_manager.register_transport_plugin(plugin)

def register_monitoring_plugin(plugin: MonitoringPlugin):
    plugin_manager.register_monitoring_plugin(plugin)

插件发现和加载

import importlib
import pkgutil
from pathlib import Path

class PluginLoader:
    """插件加载器"""

    def __init__(self, plugin_dirs: List[str] = None):
        self.plugin_dirs = plugin_dirs or ['mcpstore_plugins', 'plugins']

    def load_plugins(self):
        """加载所有插件"""
        for plugin_dir in self.plugin_dirs:
            self._load_plugins_from_directory(plugin_dir)

    def _load_plugins_from_directory(self, plugin_dir: str):
        """从目录加载插件"""
        try:
            # 尝试作为包导入
            package = importlib.import_module(plugin_dir)

            # 遍历包中的模块
            for importer, modname, ispkg in pkgutil.iter_modules(package.__path__):
                full_name = f"{plugin_dir}.{modname}"
                try:
                    importlib.import_module(full_name)
                    print(f"✅ Loaded plugin: {full_name}")
                except Exception as e:
                    print(f"❌ Failed to load plugin {full_name}: {e}")

        except ImportError:
            # 尝试从文件系统路径加载
            plugin_path = Path(plugin_dir)
            if plugin_path.exists():
                self._load_plugins_from_path(plugin_path)

    def _load_plugins_from_path(self, plugin_path: Path):
        """从文件系统路径加载插件"""
        for plugin_file in plugin_path.glob("*.py"):
            if plugin_file.name.startswith("__"):
                continue

            spec = importlib.util.spec_from_file_location(
                plugin_file.stem, plugin_file
            )
            module = importlib.util.module_from_spec(spec)

            try:
                spec.loader.exec_module(module)
                print(f"✅ Loaded plugin: {plugin_file.name}")
            except Exception as e:
                print(f"❌ Failed to load plugin {plugin_file.name}: {e}")

# 使用示例
loader = PluginLoader()
loader.load_plugins()

📦 插件打包和分发

插件包结构

my_mcpstore_plugin/
├── setup.py
├── README.md
├── my_plugin/
│   ├── __init__.py
│   ├── config_plugin.py
│   ├── transport_plugin.py
│   └── monitoring_plugin.py
└── tests/
    ├── test_config_plugin.py
    └── test_transport_plugin.py

setup.py 示例

from setuptools import setup, find_packages

setup(
    name="my-mcpstore-plugin",
    version="1.0.0",
    description="Custom MCPStore plugin",
    author="Your Name",
    author_email="your.email@example.com",
    packages=find_packages(),
    install_requires=[
        "mcpstore>=0.5.0",
        # 其他依赖
    ],
    entry_points={
        'mcpstore.plugins': [
            'my_config = my_plugin.config_plugin:MyConfigPlugin',
            'my_transport = my_plugin.transport_plugin:MyTransportPlugin',
            'my_monitoring = my_plugin.monitoring_plugin:MyMonitoringPlugin',
        ]
    },
    classifiers=[
        "Development Status :: 4 - Beta",
        "Intended Audience :: Developers",
        "License :: OSI Approved :: MIT License",
        "Programming Language :: Python :: 3.8",
        "Programming Language :: Python :: 3.9",
        "Programming Language :: Python :: 3.10",
        "Programming Language :: Python :: 3.11",
    ],
)

🧪 插件测试

测试框架

import pytest
from mcpstore.plugins.base import ConfigPlugin
from my_plugin.config_plugin import YAMLConfigPlugin

class TestYAMLConfigPlugin:
    """YAML 配置插件测试"""

    def setup_method(self):
        self.plugin = YAMLConfigPlugin()

    def test_supported_extensions(self):
        """测试支持的扩展名"""
        assert '.yaml' in self.plugin.supported_extensions
        assert '.yml' in self.plugin.supported_extensions

    def test_load_config(self, tmp_path):
        """测试配置加载"""
        # 创建测试配置文件
        config_file = tmp_path / "test.yaml"
        config_file.write_text("""
version: "1.0.0"
services:
  test-service:
    url: "https://test.com/mcp"
""")

        # 加载配置
        config = self.plugin.load_config(str(config_file))

        # 验证结果
        assert config['version'] == '1.0.0'
        assert 'mcpServers' in config
        assert 'test-service' in config['mcpServers']

    def test_save_config(self, tmp_path):
        """测试配置保存"""
        config = {
            'version': '1.0.0',
            'mcpServers': {
                'test-service': {
                    'url': 'https://test.com/mcp'
                }
            }
        }

        config_file = tmp_path / "output.yaml"
        success = self.plugin.save_config(config, str(config_file))

        assert success
        assert config_file.exists()

        # 验证保存的内容
        loaded_config = self.plugin.load_config(str(config_file))
        assert loaded_config['version'] == config['version']

📚 插件开发最佳实践

1. 接口设计原则

  • 单一职责: 每个插件专注于一个特定功能
  • 松耦合: 插件之间不应有直接依赖
  • 可测试: 提供清晰的接口便于测试

2. 错误处理

class MyPlugin(ConfigPlugin):
    def load_config(self, file_path: str) -> Dict[str, Any]:
        try:
            # 插件逻辑
            return config
        except Exception as e:
            # 记录详细错误信息
            logger.error(f"Plugin {self.__class__.__name__} failed: {e}")
            # 抛出标准化异常
            raise PluginError(f"Failed to load config: {e}") from e

3. 配置管理

class MyPlugin(TransportPlugin):
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.timeout = self.config.get('timeout', 30)
        self.retries = self.config.get('retries', 3)

4. 资源管理

class MyPlugin(TransportPlugin):
    def __init__(self):
        self.connections = {}

    async def close_client(self, client: Any) -> None:
        """确保资源正确释放"""
        try:
            if client['connection']:
                await client['connection'].close()
        finally:
            # 清理资源
            client['connection'] = None

相关文档

下一步