跳转至

自定义适配器

MCPStore 提供强大的适配器系统,让您可以轻松集成各种 AI 框架和工具库,实现无缝的工具共享和调用。

🎯 适配器概述

适配器是 MCPStore 与其他 AI 框架之间的桥梁,负责:

  • 格式转换: 将 MCP 工具转换为目标框架的工具格式
  • 参数映射: 处理不同框架间的参数差异
  • 调用代理: 代理工具调用并处理结果
  • 错误处理: 统一错误处理和异常转换

🏗️ 适配器架构

┌─────────────────────────────────────────────────────────────┐
│                    MCPStore 核心                            │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │              适配器层 (Adapter Layer)                   │ │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐      │ │
│  │  │ LangChain   │ │   CrewAI    │ │  AutoGen    │      │ │
│  │  │  Adapter    │ │  Adapter    │ │  Adapter    │      │ │
│  │  └─────────────┘ └─────────────┘ └─────────────┘      │ │
│  │  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐      │ │
│  │  │   Haystack  │ │   Custom    │ │   Future    │      │ │
│  │  │   Adapter   │ │   Adapter   │ │   Adapter   │      │ │
│  │  └─────────────┘ └─────────────┘ └─────────────┘      │ │
│  └─────────────────────────────────────────────────────────┘ │
│  ┌─────────────────────────────────────────────────────────┐ │
│  │               MCPStore 工具层                           │ │
│  │        list_tools() / call_tool()                      │ │
│  └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

🔧 基础适配器接口

AdapterBase 基类

from abc import ABC, abstractmethod
from typing import List, Any, Dict, Type, Optional
from mcpstore.core.models.tool import ToolInfo

class AdapterBase(ABC):
    """适配器基类"""

    def __init__(self, context):
        """
        初始化适配器

        Args:
            context: MCPStoreContext 实例
        """
        self.context = context
        self._tool_cache = {}

    @abstractmethod
    def convert_tools(self) -> List[Any]:
        """
        转换工具为目标框架格式

        Returns:
            目标框架的工具对象列表
        """
        pass

    @abstractmethod
    def create_tool_wrapper(self, tool_info: ToolInfo) -> Any:
        """
        为单个工具创建包装器

        Args:
            tool_info: MCP 工具信息

        Returns:
            目标框架的工具对象
        """
        pass

    def get_tools(self) -> List[ToolInfo]:
        """获取 MCP 工具列表"""
        return self.context.list_tools()

    def call_mcp_tool(self, tool_name: str, args: Dict[str, Any]) -> Any:
        """调用 MCP 工具"""
        return self.context.call_tool(tool_name, args)

    def refresh_tools(self):
        """刷新工具缓存"""
        self._tool_cache.clear()

🚀 LangChain 适配器实现

MCPStore 内置的 LangChain 适配器是最完整的实现示例:

from typing import List, Type, Any, Dict
from pydantic import BaseModel, Field, create_model
from langchain_core.tools import BaseTool
from mcpstore.adapters.base import AdapterBase

class LangChainAdapter(AdapterBase):
    """LangChain 适配器"""

    def convert_tools(self) -> List[BaseTool]:
        """转换为 LangChain Tool 对象"""
        tools = self.get_tools()
        langchain_tools = []

        for tool_info in tools:
            try:
                langchain_tool = self.create_tool_wrapper(tool_info)
                langchain_tools.append(langchain_tool)
            except Exception as e:
                print(f"⚠️ Failed to convert tool {tool_info.name}: {e}")
                continue

        return langchain_tools

    def create_tool_wrapper(self, tool_info) -> BaseTool:
        """创建 LangChain Tool 包装器"""
        # 增强工具描述
        enhanced_description = self._enhance_description(tool_info)

        # 转换参数 Schema
        args_schema = self._convert_schema(tool_info.inputSchema)

        # 创建动态 Tool 类
        class MCPTool(BaseTool):
            name: str = tool_info.name
            description: str = enhanced_description
            args_schema: Type[BaseModel] = args_schema

            def _run(self, **kwargs) -> str:
                """执行工具调用"""
                try:
                    result = self.call_mcp_tool(tool_info.name, kwargs)
                    return self._format_result(result)
                except Exception as e:
                    return f"Error calling tool {tool_info.name}: {str(e)}"

            async def _arun(self, **kwargs) -> str:
                """异步执行工具调用"""
                try:
                    result = await self.context.call_tool_async(tool_info.name, kwargs)
                    return self._format_result(result)
                except Exception as e:
                    return f"Error calling tool {tool_info.name}: {str(e)}"

            def _format_result(self, result: Any) -> str:
                """格式化结果为字符串"""
                if isinstance(result, str):
                    return result
                elif isinstance(result, (dict, list)):
                    import json
                    return json.dumps(result, ensure_ascii=False, indent=2)
                else:
                    return str(result)

        return MCPTool()

    def _enhance_description(self, tool_info) -> str:
        """增强工具描述"""
        description = tool_info.description

        if tool_info.inputSchema and 'properties' in tool_info.inputSchema:
            description += "\n\n参数说明:"
            for param_name, param_info in tool_info.inputSchema['properties'].items():
                param_desc = param_info.get('description', '无描述')
                param_type = param_info.get('type', 'string')
                description += f"\n- {param_name} ({param_type}): {param_desc}"

        return description

    def _convert_schema(self, input_schema: Optional[Dict]) -> Type[BaseModel]:
        """转换 JSON Schema 为 Pydantic 模型"""
        if not input_schema or 'properties' not in input_schema:
            return BaseModel

        fields = {}
        properties = input_schema['properties']
        required = input_schema.get('required', [])

        for field_name, field_schema in properties.items():
            field_type = self._json_type_to_python(field_schema.get('type', 'string'))
            field_description = field_schema.get('description', '')
            is_required = field_name in required

            if is_required:
                fields[field_name] = (field_type, Field(description=field_description))
            else:
                fields[field_name] = (Optional[field_type], Field(None, description=field_description))

        return create_model('ToolArgs', **fields)

    def _json_type_to_python(self, json_type: str) -> Type:
        """JSON 类型转 Python 类型"""
        type_mapping = {
            'string': str,
            'integer': int,
            'number': float,
            'boolean': bool,
            'array': list,
            'object': dict
        }
        return type_mapping.get(json_type, str)

# 使用示例
def get_langchain_tools(store):
    """获取 LangChain 工具"""
    context = store.for_store()
    adapter = LangChainAdapter(context)
    return adapter.convert_tools()

🤖 CrewAI 适配器实现

from typing import List, Any, Dict, Optional
from mcpstore.adapters.base import AdapterBase

class CrewAIAdapter(AdapterBase):
    """CrewAI 适配器"""

    def convert_tools(self) -> List[Any]:
        """转换为 CrewAI Tool 对象"""
        try:
            from crewai_tools import BaseTool
        except ImportError:
            raise ImportError("CrewAI not installed. Run: pip install crewai")

        tools = self.get_tools()
        crewai_tools = []

        for tool_info in tools:
            try:
                crewai_tool = self.create_tool_wrapper(tool_info)
                crewai_tools.append(crewai_tool)
            except Exception as e:
                print(f"⚠️ Failed to convert tool {tool_info.name}: {e}")
                continue

        return crewai_tools

    def create_tool_wrapper(self, tool_info) -> Any:
        """创建 CrewAI Tool 包装器"""
        from crewai_tools import BaseTool
        from pydantic import BaseModel, Field

        # 创建参数模型
        args_schema = self._create_args_schema(tool_info.inputSchema)

        class MCPCrewTool(BaseTool):
            name: str = tool_info.name
            description: str = self._enhance_description(tool_info)
            args_schema: type = args_schema

            def _run(self, **kwargs) -> str:
                """执行工具调用"""
                try:
                    result = self.call_mcp_tool(tool_info.name, kwargs)
                    return self._format_result(result)
                except Exception as e:
                    return f"Error: {str(e)}"

            def _format_result(self, result: Any) -> str:
                """格式化结果"""
                if isinstance(result, str):
                    return result
                elif isinstance(result, (dict, list)):
                    import json
                    return json.dumps(result, ensure_ascii=False, indent=2)
                else:
                    return str(result)

        return MCPCrewTool()

    def _create_args_schema(self, input_schema: Optional[Dict]) -> type:
        """创建参数 Schema"""
        from pydantic import BaseModel, Field, create_model

        if not input_schema or 'properties' not in input_schema:
            return BaseModel

        fields = {}
        properties = input_schema['properties']
        required = input_schema.get('required', [])

        for field_name, field_schema in properties.items():
            field_type = self._json_type_to_python(field_schema.get('type', 'string'))
            field_description = field_schema.get('description', '')
            is_required = field_name in required

            if is_required:
                fields[field_name] = (field_type, Field(description=field_description))
            else:
                fields[field_name] = (Optional[field_type], Field(None, description=field_description))

        return create_model('CrewToolArgs', **fields)

    def _enhance_description(self, tool_info) -> str:
        """增强工具描述"""
        description = tool_info.description

        if tool_info.inputSchema and 'properties' in tool_info.inputSchema:
            description += "\n\nParameters:"
            for param_name, param_info in tool_info.inputSchema['properties'].items():
                param_desc = param_info.get('description', 'No description')
                param_type = param_info.get('type', 'string')
                description += f"\n- {param_name} ({param_type}): {param_desc}"

        return description

    def _json_type_to_python(self, json_type: str) -> type:
        """JSON 类型转 Python 类型"""
        type_mapping = {
            'string': str,
            'integer': int,
            'number': float,
            'boolean': bool,
            'array': list,
            'object': dict
        }
        return type_mapping.get(json_type, str)

# 使用示例
def setup_crewai_agent(store):
    """设置 CrewAI Agent"""
    from crewai import Agent, Task, Crew

    # 获取工具
    context = store.for_store()
    adapter = CrewAIAdapter(context)
    tools = adapter.convert_tools()

    # 创建 Agent
    agent = Agent(
        role='Research Assistant',
        goal='Help with research and analysis tasks',
        backstory='An AI assistant with access to various MCP tools',
        tools=tools,
        verbose=True
    )

    return agent

🔧 AutoGen 适配器实现

from typing import List, Any, Dict, Callable
from mcpstore.adapters.base import AdapterBase

class AutoGenAdapter(AdapterBase):
    """AutoGen 适配器"""

    def convert_tools(self) -> List[Dict[str, Any]]:
        """转换为 AutoGen 工具格式"""
        tools = self.get_tools()
        autogen_tools = []

        for tool_info in tools:
            try:
                autogen_tool = self.create_tool_wrapper(tool_info)
                autogen_tools.append(autogen_tool)
            except Exception as e:
                print(f"⚠️ Failed to convert tool {tool_info.name}: {e}")
                continue

        return autogen_tools

    def create_tool_wrapper(self, tool_info) -> Dict[str, Any]:
        """创建 AutoGen 工具包装器"""

        def tool_function(**kwargs) -> str:
            """工具执行函数"""
            try:
                result = self.call_mcp_tool(tool_info.name, kwargs)
                return self._format_result(result)
            except Exception as e:
                return f"Error calling {tool_info.name}: {str(e)}"

        # 构造 AutoGen 工具描述
        tool_spec = {
            "type": "function",
            "function": {
                "name": tool_info.name,
                "description": tool_info.description,
                "parameters": self._convert_schema_to_autogen(tool_info.inputSchema)
            }
        }

        return {
            "spec": tool_spec,
            "function": tool_function
        }

    def _convert_schema_to_autogen(self, input_schema: Optional[Dict]) -> Dict[str, Any]:
        """转换 Schema 为 AutoGen 格式"""
        if not input_schema:
            return {
                "type": "object",
                "properties": {},
                "required": []
            }

        # AutoGen 使用 OpenAI 函数调用格式
        return {
            "type": "object",
            "properties": input_schema.get("properties", {}),
            "required": input_schema.get("required", [])
        }

    def _format_result(self, result: Any) -> str:
        """格式化结果"""
        if isinstance(result, str):
            return result
        elif isinstance(result, (dict, list)):
            import json
            return json.dumps(result, ensure_ascii=False, indent=2)
        else:
            return str(result)

    def register_tools_with_agent(self, agent) -> None:
        """将工具注册到 AutoGen Agent"""
        tools = self.convert_tools()

        for tool in tools:
            # 注册函数
            agent.register_function(
                function_map={tool["spec"]["function"]["name"]: tool["function"]}
            )

# 使用示例
def setup_autogen_agent(store):
    """设置 AutoGen Agent"""
    try:
        from autogen import ConversableAgent
    except ImportError:
        raise ImportError("AutoGen not installed. Run: pip install pyautogen")

    # 创建 Agent
    agent = ConversableAgent(
        name="assistant",
        system_message="You are a helpful assistant with access to various tools.",
        llm_config={
            "config_list": [{"model": "gpt-4", "api_key": "your-api-key"}]
        }
    )

    # 注册 MCP 工具
    context = store.for_store()
    adapter = AutoGenAdapter(context)
    adapter.register_tools_with_agent(agent)

    return agent

🎨 自定义适配器开发

1. 简单函数式适配器

def create_simple_adapter(context):
    """创建简单的函数式适配器"""

    def get_tool_functions() -> Dict[str, Callable]:
        """获取工具函数字典"""
        tools = context.list_tools()
        tool_functions = {}

        for tool_info in tools:
            def create_tool_func(tool_name):
                def tool_func(**kwargs):
                    return context.call_tool(tool_name, kwargs)
                return tool_func

            tool_functions[tool_info.name] = create_tool_func(tool_info.name)

        return tool_functions

    def get_tool_descriptions() -> Dict[str, str]:
        """获取工具描述字典"""
        tools = context.list_tools()
        return {tool.name: tool.description for tool in tools}

    return {
        'functions': get_tool_functions(),
        'descriptions': get_tool_descriptions()
    }

# 使用示例
adapter = create_simple_adapter(store.for_store())
functions = adapter['functions']
descriptions = adapter['descriptions']

# 调用工具
result = functions['weather_get_current'](city="北京")

2. 类型安全适配器

from typing import TypeVar, Generic, Protocol
from dataclasses import dataclass

T = TypeVar('T')

class ToolProtocol(Protocol):
    """工具协议定义"""
    name: str
    description: str

    def execute(self, **kwargs) -> Any:
        """执行工具"""
        ...

@dataclass
class TypedTool:
    """类型化工具包装器"""
    name: str
    description: str
    input_type: type
    output_type: type
    executor: Callable

    def execute(self, **kwargs) -> Any:
        """执行工具并进行类型检查"""
        # 输入类型验证
        if self.input_type != Any:
            try:
                validated_input = self.input_type(**kwargs)
                kwargs = validated_input.dict()
            except Exception as e:
                raise ValueError(f"Input validation failed: {e}")

        # 执行工具
        result = self.executor(**kwargs)

        # 输出类型验证
        if self.output_type != Any and not isinstance(result, self.output_type):
            try:
                result = self.output_type(result)
            except Exception:
                pass  # 类型转换失败时保持原结果

        return result

class TypeSafeAdapter(AdapterBase):
    """类型安全适配器"""

    def convert_tools(self) -> List[TypedTool]:
        """转换为类型安全工具"""
        tools = self.get_tools()
        typed_tools = []

        for tool_info in tools:
            typed_tool = self.create_tool_wrapper(tool_info)
            typed_tools.append(typed_tool)

        return typed_tools

    def create_tool_wrapper(self, tool_info) -> TypedTool:
        """创建类型化工具包装器"""
        input_type = self._create_input_type(tool_info.inputSchema)
        output_type = Any  # 可以根据需要推断输出类型

        def executor(**kwargs):
            return self.call_mcp_tool(tool_info.name, kwargs)

        return TypedTool(
            name=tool_info.name,
            description=tool_info.description,
            input_type=input_type,
            output_type=output_type,
            executor=executor
        )

    def _create_input_type(self, input_schema: Optional[Dict]) -> type:
        """从 Schema 创建输入类型"""
        if not input_schema:
            return Any

        from pydantic import create_model

        fields = {}
        properties = input_schema.get('properties', {})
        required = input_schema.get('required', [])

        for field_name, field_schema in properties.items():
            field_type = self._json_type_to_python(field_schema.get('type', 'string'))
            is_required = field_name in required

            if is_required:
                fields[field_name] = (field_type, ...)
            else:
                fields[field_name] = (Optional[field_type], None)

        return create_model('InputModel', **fields)

🔄 适配器注册和使用

适配器注册系统

class AdapterRegistry:
    """适配器注册表"""

    def __init__(self):
        self._adapters: Dict[str, type] = {}

    def register(self, name: str, adapter_class: type):
        """注册适配器"""
        self._adapters[name] = adapter_class

    def get(self, name: str) -> Optional[type]:
        """获取适配器类"""
        return self._adapters.get(name)

    def list_adapters(self) -> List[str]:
        """列出所有适配器"""
        return list(self._adapters.keys())

    def create_adapter(self, name: str, context) -> Optional[AdapterBase]:
        """创建适配器实例"""
        adapter_class = self.get(name)
        if adapter_class:
            return adapter_class(context)
        return None

# 全局注册表
adapter_registry = AdapterRegistry()

# 注册内置适配器
adapter_registry.register('langchain', LangChainAdapter)
adapter_registry.register('crewai', CrewAIAdapter)
adapter_registry.register('autogen', AutoGenAdapter)

# 便捷函数
def register_adapter(name: str, adapter_class: type):
    """注册适配器"""
    adapter_registry.register(name, adapter_class)

def get_adapter(name: str, context) -> Optional[AdapterBase]:
    """获取适配器实例"""
    return adapter_registry.create_adapter(name, context)

统一适配器接口

class MCPStoreContext:
    """扩展 MCPStoreContext 以支持适配器"""

    def for_framework(self, framework: str) -> Optional[AdapterBase]:
        """获取指定框架的适配器"""
        return get_adapter(framework, self)

    def for_langchain(self) -> LangChainAdapter:
        """获取 LangChain 适配器(向后兼容)"""
        return LangChainAdapter(self)

    def for_crewai(self) -> CrewAIAdapter:
        """获取 CrewAI 适配器"""
        return CrewAIAdapter(self)

    def for_autogen(self) -> AutoGenAdapter:
        """获取 AutoGen 适配器"""
        return AutoGenAdapter(self)

# 使用示例
store = MCPStore.setup_store()

# 统一接口
langchain_tools = store.for_store().for_framework('langchain').convert_tools()
crewai_tools = store.for_store().for_framework('crewai').convert_tools()

# 专用接口
langchain_tools = store.for_store().for_langchain().list_tools()
crewai_agent = setup_crewai_agent(store)

📊 适配器性能优化

1. 工具缓存

class CachedAdapter(AdapterBase):
    """带缓存的适配器"""

    def __init__(self, context):
        super().__init__(context)
        self._converted_tools_cache = None
        self._cache_timestamp = None

    def convert_tools(self) -> List[Any]:
        """带缓存的工具转换"""
        current_time = time.time()

        # 检查缓存是否有效(5分钟过期)
        if (self._converted_tools_cache is not None and 
            self._cache_timestamp is not None and
            current_time - self._cache_timestamp < 300):
            return self._converted_tools_cache

        # 重新转换工具
        tools = super().convert_tools()
        self._converted_tools_cache = tools
        self._cache_timestamp = current_time

        return tools

    def refresh_cache(self):
        """手动刷新缓存"""
        self._converted_tools_cache = None
        self._cache_timestamp = None

2. 延迟加载

class LazyAdapter(AdapterBase):
    """延迟加载适配器"""

    def __init__(self, context):
        super().__init__(context)
        self._tool_wrappers = {}

    def convert_tools(self) -> List[Any]:
        """延迟创建工具包装器"""
        tools = self.get_tools()
        lazy_tools = []

        for tool_info in tools:
            lazy_tool = self._create_lazy_wrapper(tool_info)
            lazy_tools.append(lazy_tool)

        return lazy_tools

    def _create_lazy_wrapper(self, tool_info):
        """创建延迟包装器"""
        class LazyTool:
            def __init__(self, adapter, tool_info):
                self.adapter = adapter
                self.tool_info = tool_info
                self._wrapper = None

            def __getattr__(self, name):
                if self._wrapper is None:
                    self._wrapper = self.adapter.create_tool_wrapper(self.tool_info)
                return getattr(self._wrapper, name)

        return LazyTool(self, tool_info)

🧪 适配器测试

测试框架

import pytest
from unittest.mock import Mock, patch
from mcpstore.adapters.base import AdapterBase

class TestCustomAdapter:
    """自定义适配器测试"""

    def setup_method(self):
        """设置测试环境"""
        self.mock_context = Mock()
        self.mock_context.list_tools.return_value = [
            Mock(
                name="test_tool",
                description="Test tool description",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "param1": {"type": "string", "description": "Parameter 1"}
                    },
                    "required": ["param1"]
                }
            )
        ]
        self.mock_context.call_tool.return_value = "test result"

        self.adapter = CustomAdapter(self.mock_context)

    def test_convert_tools(self):
        """测试工具转换"""
        tools = self.adapter.convert_tools()

        assert len(tools) == 1
        assert tools[0].name == "test_tool"
        assert "Test tool description" in tools[0].description

    def test_tool_execution(self):
        """测试工具执行"""
        tools = self.adapter.convert_tools()
        tool = tools[0]

        result = tool.execute(param1="test value")

        assert result == "test result"
        self.mock_context.call_tool.assert_called_once_with(
            "test_tool", {"param1": "test value"}
        )

    def test_error_handling(self):
        """测试错误处理"""
        self.mock_context.call_tool.side_effect = Exception("Test error")

        tools = self.adapter.convert_tools()
        tool = tools[0]

        result = tool.execute(param1="test value")

        assert "Error" in result

📚 最佳实践

1. 错误处理

class RobustAdapter(AdapterBase):
    """健壮的适配器实现"""

    def create_tool_wrapper(self, tool_info):
        """创建健壮的工具包装器"""

        def safe_execute(**kwargs):
            try:
                # 参数验证
                validated_args = self._validate_args(tool_info, kwargs)

                # 调用工具
                result = self.call_mcp_tool(tool_info.name, validated_args)

                # 结果验证
                return self._validate_result(result)

            except ValidationError as e:
                return f"参数验证失败: {e}"
            except ToolCallError as e:
                return f"工具调用失败: {e}"
            except Exception as e:
                return f"未知错误: {e}"

        return safe_execute

2. 性能监控

import time
from functools import wraps

def monitor_performance(func):
    """性能监控装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            print(f"✅ {func.__name__} completed in {duration:.3f}s")
            return result
        except Exception as e:
            duration = time.time() - start_time
            print(f"❌ {func.__name__} failed in {duration:.3f}s: {e}")
            raise
    return wrapper

class MonitoredAdapter(AdapterBase):
    """带性能监控的适配器"""

    @monitor_performance
    def convert_tools(self):
        return super().convert_tools()

    @monitor_performance
    def create_tool_wrapper(self, tool_info):
        return super().create_tool_wrapper(tool_info)

相关文档

下一步