跳转至

batch_call() 批量调用方法

📋 概述

batch_call() 方法允许您一次性调用多个工具,提高效率并支持并行执行。这对于需要执行多个相关操作的场景非常有用。

🔧 方法签名

def batch_call(
    self, 
    calls: List[Dict[str, Any]], 
    parallel: bool = True,
    max_workers: Optional[int] = None,
    timeout: Optional[float] = None
) -> List[Dict[str, Any]]:
    """
    批量调用多个工具

    Args:
        calls: 工具调用列表,每个元素包含工具名称和参数
        parallel: 是否并行执行,默认为 True
        max_workers: 最大并行工作线程数
        timeout: 每个调用的超时时间(秒)

    Returns:
        List[Dict[str, Any]]: 调用结果列表
    """

📝 参数说明

calls (List[Dict[str, Any]])

  • 必需参数
  • 工具调用配置列表
  • 每个调用配置包含:
  • tool_name: 工具名称
  • arguments: 工具参数
  • service_name: 可选的服务名称

parallel (bool)

  • 可选参数,默认 True
  • 是否并行执行调用
  • True: 并行执行,提高效率
  • False: 顺序执行,保证执行顺序

max_workers (Optional[int])

  • 可选参数
  • 最大并行工作线程数
  • 默认为 None(自动选择)

timeout (Optional[float])

  • 可选参数
  • 每个工具调用的超时时间(秒)
  • 超时的调用将返回错误结果

📊 调用配置格式

calls = [
    {
        "tool_name": "read_file",
        "arguments": {
            "path": "/path/to/file1.txt"
        },
        "service_name": "filesystem"  # 可选
    },
    {
        "tool_name": "write_file", 
        "arguments": {
            "path": "/path/to/file2.txt",
            "content": "Hello World"
        }
    }
]

📊 返回值格式

[
    {
        "tool_name": "read_file",
        "service_name": "filesystem",
        "success": True,
        "result": {
            "content": "文件内容..."
        },
        "execution_time": 0.123,
        "error": None
    },
    {
        "tool_name": "write_file",
        "service_name": "filesystem", 
        "success": False,
        "result": None,
        "execution_time": 0.056,
        "error": "Permission denied"
    }
]

💡 使用示例

基础批量调用

from mcpstore import MCPStore

# 初始化 MCPStore
store = MCPStore()

# 添加文件系统服务
store.add_service({
    "mcpServers": {
        "filesystem": {
            "command": "npx",
            "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
        }
    }
})

# 定义批量调用
calls = [
    {
        "tool_name": "read_file",
        "arguments": {"path": "/tmp/file1.txt"}
    },
    {
        "tool_name": "read_file", 
        "arguments": {"path": "/tmp/file2.txt"}
    },
    {
        "tool_name": "list_directory",
        "arguments": {"path": "/tmp"}
    }
]

# 执行批量调用
results = store.batch_call(calls)

# 处理结果
for result in results:
    if result['success']:
        print(f"✅ {result['tool_name']}: 执行成功")
    else:
        print(f"❌ {result['tool_name']}: {result['error']}")

顺序执行

# 需要按顺序执行的操作
sequential_calls = [
    {
        "tool_name": "create_directory",
        "arguments": {"path": "/tmp/new_folder"}
    },
    {
        "tool_name": "write_file",
        "arguments": {
            "path": "/tmp/new_folder/config.txt",
            "content": "configuration data"
        }
    },
    {
        "tool_name": "read_file",
        "arguments": {"path": "/tmp/new_folder/config.txt"}
    }
]

# 顺序执行
results = store.batch_call(sequential_calls, parallel=False)

设置超时和并发限制

# 大量文件处理,限制并发数和超时
file_operations = []
for i in range(100):
    file_operations.append({
        "tool_name": "read_file",
        "arguments": {"path": f"/tmp/file_{i}.txt"}
    })

# 限制并发数为5,每个操作超时10秒
results = store.batch_call(
    file_operations,
    parallel=True,
    max_workers=5,
    timeout=10.0
)

# 统计结果
success_count = sum(1 for r in results if r['success'])
print(f"成功: {success_count}/{len(results)}")

🔍 高级用法

混合服务调用

# 调用不同服务的工具
mixed_calls = [
    {
        "tool_name": "read_file",
        "arguments": {"path": "/tmp/data.json"},
        "service_name": "filesystem"
    },
    {
        "tool_name": "search_web",
        "arguments": {"query": "MCPStore documentation"},
        "service_name": "web_search"
    },
    {
        "tool_name": "send_email",
        "arguments": {
            "to": "user@example.com",
            "subject": "Report",
            "body": "Daily report attached"
        },
        "service_name": "email"
    }
]

results = store.batch_call(mixed_calls)

结果处理和错误恢复

def process_batch_results(results):
    """处理批量调用结果"""
    successful_results = []
    failed_calls = []

    for result in results:
        if result['success']:
            successful_results.append(result)
        else:
            failed_calls.append({
                'tool_name': result['tool_name'],
                'error': result['error'],
                'arguments': result.get('arguments', {})
            })

    return successful_results, failed_calls

# 执行批量调用
results = store.batch_call(calls)

# 处理结果
successful, failed = process_batch_results(results)

print(f"成功: {len(successful)} 个")
print(f"失败: {len(failed)} 个")

# 重试失败的调用
if failed:
    print("重试失败的调用...")
    retry_calls = [
        {
            "tool_name": call['tool_name'],
            "arguments": call['arguments']
        }
        for call in failed
    ]
    retry_results = store.batch_call(retry_calls)

动态批量调用构建

class BatchCallBuilder:
    def __init__(self):
        self.calls = []

    def add_call(self, tool_name, arguments, service_name=None):
        """添加一个调用"""
        self.calls.append({
            "tool_name": tool_name,
            "arguments": arguments,
            "service_name": service_name
        })
        return self

    def add_file_read(self, path):
        """添加文件读取调用"""
        return self.add_call("read_file", {"path": path})

    def add_file_write(self, path, content):
        """添加文件写入调用"""
        return self.add_call("write_file", {"path": path, "content": content})

    def execute(self, store, **kwargs):
        """执行批量调用"""
        return store.batch_call(self.calls, **kwargs)

# 使用构建器
builder = BatchCallBuilder()
builder.add_file_read("/tmp/input.txt") \
       .add_file_write("/tmp/output.txt", "processed data") \
       .add_call("list_directory", {"path": "/tmp"})

results = builder.execute(store, parallel=True, timeout=30)

⚠️ 错误处理

异常处理

from mcpstore.exceptions import BatchCallError

try:
    results = store.batch_call(calls)
except BatchCallError as e:
    print(f"批量调用失败: {e}")
    # 检查部分结果
    if hasattr(e, 'partial_results'):
        print(f"部分结果: {len(e.partial_results)} 个")
except Exception as e:
    print(f"未知错误: {e}")

超时处理

# 设置较短的超时时间
results = store.batch_call(calls, timeout=5.0)

# 检查超时的调用
timeout_calls = [
    r for r in results 
    if not r['success'] and 'timeout' in str(r['error']).lower()
]

if timeout_calls:
    print(f"有 {len(timeout_calls)} 个调用超时")

📊 性能监控

import time

def monitor_batch_performance(store, calls):
    """监控批量调用性能"""
    start_time = time.time()

    # 并行执行
    parallel_results = store.batch_call(calls, parallel=True)
    parallel_time = time.time() - start_time

    start_time = time.time()

    # 顺序执行
    sequential_results = store.batch_call(calls, parallel=False)
    sequential_time = time.time() - start_time

    print(f"并行执行时间: {parallel_time:.2f}s")
    print(f"顺序执行时间: {sequential_time:.2f}s")
    print(f"性能提升: {sequential_time/parallel_time:.2f}x")

    return parallel_results

# 性能测试
results = monitor_batch_performance(store, calls)

🔗 相关方法

📚 最佳实践

  1. 合理设置并发数:避免过多并发导致资源竞争
  2. 设置适当超时:防止长时间等待
  3. 错误处理:妥善处理失败的调用
  4. 结果验证:检查每个调用的执行结果
  5. 性能监控:监控批量调用的性能表现

更新时间: 2025-01-09
版本: 1.0.0