batch_call() 批量调用方法¶
📋 概述¶
batch_call()
方法允许您一次性调用多个工具,提高效率并支持并行执行。这对于需要执行多个相关操作的场景非常有用。
🔧 方法签名¶
def batch_call(
self,
calls: List[Dict[str, Any]],
parallel: bool = True,
max_workers: Optional[int] = None,
timeout: Optional[float] = None
) -> List[Dict[str, Any]]:
"""
批量调用多个工具
Args:
calls: 工具调用列表,每个元素包含工具名称和参数
parallel: 是否并行执行,默认为 True
max_workers: 最大并行工作线程数
timeout: 每个调用的超时时间(秒)
Returns:
List[Dict[str, Any]]: 调用结果列表
"""
📝 参数说明¶
calls (List[Dict[str, Any]])¶
- 必需参数
- 工具调用配置列表
- 每个调用配置包含:
tool_name
: 工具名称arguments
: 工具参数service_name
: 可选的服务名称
parallel (bool)¶
- 可选参数,默认
True
- 是否并行执行调用
True
: 并行执行,提高效率False
: 顺序执行,保证执行顺序
max_workers (Optional[int])¶
- 可选参数
- 最大并行工作线程数
- 默认为
None
(自动选择)
timeout (Optional[float])¶
- 可选参数
- 每个工具调用的超时时间(秒)
- 超时的调用将返回错误结果
📊 调用配置格式¶
calls = [
{
"tool_name": "read_file",
"arguments": {
"path": "/path/to/file1.txt"
},
"service_name": "filesystem" # 可选
},
{
"tool_name": "write_file",
"arguments": {
"path": "/path/to/file2.txt",
"content": "Hello World"
}
}
]
📊 返回值格式¶
[
{
"tool_name": "read_file",
"service_name": "filesystem",
"success": True,
"result": {
"content": "文件内容..."
},
"execution_time": 0.123,
"error": None
},
{
"tool_name": "write_file",
"service_name": "filesystem",
"success": False,
"result": None,
"execution_time": 0.056,
"error": "Permission denied"
}
]
💡 使用示例¶
基础批量调用¶
from mcpstore import MCPStore
# 初始化 MCPStore
store = MCPStore()
# 添加文件系统服务
store.add_service({
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
})
# 定义批量调用
calls = [
{
"tool_name": "read_file",
"arguments": {"path": "/tmp/file1.txt"}
},
{
"tool_name": "read_file",
"arguments": {"path": "/tmp/file2.txt"}
},
{
"tool_name": "list_directory",
"arguments": {"path": "/tmp"}
}
]
# 执行批量调用
results = store.batch_call(calls)
# 处理结果
for result in results:
if result['success']:
print(f"✅ {result['tool_name']}: 执行成功")
else:
print(f"❌ {result['tool_name']}: {result['error']}")
顺序执行¶
# 需要按顺序执行的操作
sequential_calls = [
{
"tool_name": "create_directory",
"arguments": {"path": "/tmp/new_folder"}
},
{
"tool_name": "write_file",
"arguments": {
"path": "/tmp/new_folder/config.txt",
"content": "configuration data"
}
},
{
"tool_name": "read_file",
"arguments": {"path": "/tmp/new_folder/config.txt"}
}
]
# 顺序执行
results = store.batch_call(sequential_calls, parallel=False)
设置超时和并发限制¶
# 大量文件处理,限制并发数和超时
file_operations = []
for i in range(100):
file_operations.append({
"tool_name": "read_file",
"arguments": {"path": f"/tmp/file_{i}.txt"}
})
# 限制并发数为5,每个操作超时10秒
results = store.batch_call(
file_operations,
parallel=True,
max_workers=5,
timeout=10.0
)
# 统计结果
success_count = sum(1 for r in results if r['success'])
print(f"成功: {success_count}/{len(results)}")
🔍 高级用法¶
混合服务调用¶
# 调用不同服务的工具
mixed_calls = [
{
"tool_name": "read_file",
"arguments": {"path": "/tmp/data.json"},
"service_name": "filesystem"
},
{
"tool_name": "search_web",
"arguments": {"query": "MCPStore documentation"},
"service_name": "web_search"
},
{
"tool_name": "send_email",
"arguments": {
"to": "user@example.com",
"subject": "Report",
"body": "Daily report attached"
},
"service_name": "email"
}
]
results = store.batch_call(mixed_calls)
结果处理和错误恢复¶
def process_batch_results(results):
"""处理批量调用结果"""
successful_results = []
failed_calls = []
for result in results:
if result['success']:
successful_results.append(result)
else:
failed_calls.append({
'tool_name': result['tool_name'],
'error': result['error'],
'arguments': result.get('arguments', {})
})
return successful_results, failed_calls
# 执行批量调用
results = store.batch_call(calls)
# 处理结果
successful, failed = process_batch_results(results)
print(f"成功: {len(successful)} 个")
print(f"失败: {len(failed)} 个")
# 重试失败的调用
if failed:
print("重试失败的调用...")
retry_calls = [
{
"tool_name": call['tool_name'],
"arguments": call['arguments']
}
for call in failed
]
retry_results = store.batch_call(retry_calls)
动态批量调用构建¶
class BatchCallBuilder:
def __init__(self):
self.calls = []
def add_call(self, tool_name, arguments, service_name=None):
"""添加一个调用"""
self.calls.append({
"tool_name": tool_name,
"arguments": arguments,
"service_name": service_name
})
return self
def add_file_read(self, path):
"""添加文件读取调用"""
return self.add_call("read_file", {"path": path})
def add_file_write(self, path, content):
"""添加文件写入调用"""
return self.add_call("write_file", {"path": path, "content": content})
def execute(self, store, **kwargs):
"""执行批量调用"""
return store.batch_call(self.calls, **kwargs)
# 使用构建器
builder = BatchCallBuilder()
builder.add_file_read("/tmp/input.txt") \
.add_file_write("/tmp/output.txt", "processed data") \
.add_call("list_directory", {"path": "/tmp"})
results = builder.execute(store, parallel=True, timeout=30)
⚠️ 错误处理¶
异常处理¶
from mcpstore.exceptions import BatchCallError
try:
results = store.batch_call(calls)
except BatchCallError as e:
print(f"批量调用失败: {e}")
# 检查部分结果
if hasattr(e, 'partial_results'):
print(f"部分结果: {len(e.partial_results)} 个")
except Exception as e:
print(f"未知错误: {e}")
超时处理¶
# 设置较短的超时时间
results = store.batch_call(calls, timeout=5.0)
# 检查超时的调用
timeout_calls = [
r for r in results
if not r['success'] and 'timeout' in str(r['error']).lower()
]
if timeout_calls:
print(f"有 {len(timeout_calls)} 个调用超时")
📊 性能监控¶
import time
def monitor_batch_performance(store, calls):
"""监控批量调用性能"""
start_time = time.time()
# 并行执行
parallel_results = store.batch_call(calls, parallel=True)
parallel_time = time.time() - start_time
start_time = time.time()
# 顺序执行
sequential_results = store.batch_call(calls, parallel=False)
sequential_time = time.time() - start_time
print(f"并行执行时间: {parallel_time:.2f}s")
print(f"顺序执行时间: {sequential_time:.2f}s")
print(f"性能提升: {sequential_time/parallel_time:.2f}x")
return parallel_results
# 性能测试
results = monitor_batch_performance(store, calls)
🔗 相关方法¶
call_tool()
- 单个工具调用use_tool()
- 便捷工具使用list_tools()
- 列出可用工具
📚 最佳实践¶
- 合理设置并发数:避免过多并发导致资源竞争
- 设置适当超时:防止长时间等待
- 错误处理:妥善处理失败的调用
- 结果验证:检查每个调用的执行结果
- 性能监控:监控批量调用的性能表现
更新时间: 2025-01-09
版本: 1.0.0