链式调用机制¶
📋 概述¶
MCPStore 的链式调用机制允许您将多个工具调用串联起来,形成复杂的工作流。通过链式调用,可以实现数据在工具间的流转,构建强大的自动化流程。
🏗️ 链式调用架构¶
graph LR
A[输入数据] --> B[工具1]
B --> C[中间结果]
C --> D[工具2]
D --> E[中间结果]
E --> F[工具3]
F --> G[最终结果]
H[错误处理] --> B
H --> D
H --> F
I[上下文管理] --> B
I --> D
I --> F
🔧 基础链式调用¶
简单链式调用¶
from mcpstore import MCPStore
# 初始化 MCPStore
store = MCPStore()
# 添加服务
store.add_service({
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
})
# 基础链式调用示例
def simple_file_chain(store, content, filename):
"""简单的文件处理链"""
# 步骤1: 写入文件
write_result = store.call_tool("write_file", {
"path": f"/tmp/{filename}",
"content": content
})
if not write_result.get("success"):
raise Exception(f"写入文件失败: {write_result}")
# 步骤2: 读取文件验证
read_result = store.call_tool("read_file", {
"path": f"/tmp/{filename}"
})
if not read_result.get("success"):
raise Exception(f"读取文件失败: {read_result}")
# 步骤3: 获取文件信息
stat_result = store.call_tool("get_file_info", {
"path": f"/tmp/{filename}"
})
return {
"write_result": write_result,
"read_result": read_result,
"stat_result": stat_result,
"content_verified": read_result.get("content") == content
}
# 使用简单链式调用
try:
result = simple_file_chain(store, "Hello, World!", "test.txt")
print(f"✅ 链式调用成功: {result['content_verified']}")
except Exception as e:
print(f"❌ 链式调用失败: {e}")
链式调用类¶
class ToolChain:
"""工具链类"""
def __init__(self, store):
self.store = store
self.steps = []
self.context = {}
self.results = []
def add_step(self, tool_name, arguments=None, transform=None, condition=None):
"""添加链式步骤
Args:
tool_name: 工具名称
arguments: 工具参数(可以是函数,用于动态生成)
transform: 结果转换函数
condition: 执行条件函数
"""
step = {
"tool_name": tool_name,
"arguments": arguments or {},
"transform": transform,
"condition": condition
}
self.steps.append(step)
return self
def execute(self, initial_context=None):
"""执行工具链"""
if initial_context:
self.context.update(initial_context)
self.results = []
for i, step in enumerate(self.steps):
try:
# 检查执行条件
if step["condition"] and not step["condition"](self.context):
print(f"⏭️ 跳过步骤 {i+1}: 条件不满足")
continue
# 准备参数
if callable(step["arguments"]):
arguments = step["arguments"](self.context)
else:
arguments = step["arguments"]
print(f"🔧 执行步骤 {i+1}: {step['tool_name']}")
# 调用工具
result = self.store.call_tool(step["tool_name"], arguments)
# 转换结果
if step["transform"]:
result = step["transform"](result, self.context)
# 保存结果
self.results.append(result)
# 更新上下文
self.context[f"step_{i+1}_result"] = result
self.context["last_result"] = result
print(f"✅ 步骤 {i+1} 完成")
except Exception as e:
print(f"❌ 步骤 {i+1} 失败: {e}")
self.results.append({"error": str(e)})
# 可以选择继续或停止
if self._should_stop_on_error(step, e):
raise e
return self.results
def _should_stop_on_error(self, step, error):
"""判断是否应该在错误时停止"""
# 可以根据步骤配置或错误类型决定
return True # 默认停止
# 使用工具链
chain = ToolChain(store)
# 构建文件处理链
chain.add_step(
"write_file",
arguments=lambda ctx: {
"path": f"/tmp/{ctx['filename']}",
"content": ctx["content"]
}
).add_step(
"read_file",
arguments=lambda ctx: {"path": f"/tmp/{ctx['filename']}"},
transform=lambda result, ctx: {
**result,
"content_match": result.get("content") == ctx["content"]
}
).add_step(
"list_directory",
arguments={"path": "/tmp"},
condition=lambda ctx: ctx["last_result"].get("content_match", False)
)
# 执行链
try:
results = chain.execute({
"filename": "chain_test.txt",
"content": "This is a chain test!"
})
print(f"🎯 链式调用完成,共 {len(results)} 个步骤")
except Exception as e:
print(f"💥 链式调用失败: {e}")
🔄 高级链式调用¶
并行链式调用¶
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
class ParallelToolChain:
"""并行工具链"""
def __init__(self, store, max_workers=3):
self.store = store
self.max_workers = max_workers
self.parallel_groups = []
self.sequential_steps = []
def add_parallel_group(self, steps):
"""添加并行执行组"""
self.parallel_groups.append(steps)
return self
def add_sequential_step(self, tool_name, arguments=None):
"""添加顺序执行步骤"""
self.sequential_steps.append({
"tool_name": tool_name,
"arguments": arguments or {}
})
return self
def execute(self, context=None):
"""执行并行链"""
context = context or {}
all_results = []
# 执行并行组
for group_index, group in enumerate(self.parallel_groups):
print(f"🔀 执行并行组 {group_index + 1}")
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有并行任务
future_to_step = {}
for step in group:
future = executor.submit(
self._execute_step,
step,
context.copy()
)
future_to_step[future] = step
# 收集结果
group_results = []
for future in as_completed(future_to_step):
step = future_to_step[future]
try:
result = future.result()
group_results.append(result)
print(f"✅ 并行步骤完成: {step['tool_name']}")
except Exception as e:
print(f"❌ 并行步骤失败: {step['tool_name']} - {e}")
group_results.append({"error": str(e)})
all_results.append(group_results)
# 更新上下文
context[f"parallel_group_{group_index + 1}"] = group_results
# 执行顺序步骤
for step_index, step in enumerate(self.sequential_steps):
print(f"➡️ 执行顺序步骤 {step_index + 1}: {step['tool_name']}")
try:
result = self._execute_step(step, context)
all_results.append(result)
context[f"sequential_step_{step_index + 1}"] = result
print(f"✅ 顺序步骤完成: {step['tool_name']}")
except Exception as e:
print(f"❌ 顺序步骤失败: {step['tool_name']} - {e}")
all_results.append({"error": str(e)})
break
return all_results
def _execute_step(self, step, context):
"""执行单个步骤"""
arguments = step["arguments"]
if callable(arguments):
arguments = arguments(context)
return self.store.call_tool(step["tool_name"], arguments)
# 使用并行链
parallel_chain = ParallelToolChain(store, max_workers=3)
# 添加并行文件操作组
parallel_chain.add_parallel_group([
{
"tool_name": "write_file",
"arguments": {"path": "/tmp/file1.txt", "content": "Content 1"}
},
{
"tool_name": "write_file",
"arguments": {"path": "/tmp/file2.txt", "content": "Content 2"}
},
{
"tool_name": "write_file",
"arguments": {"path": "/tmp/file3.txt", "content": "Content 3"}
}
])
# 添加顺序验证步骤
parallel_chain.add_sequential_step(
"list_directory",
{"path": "/tmp"}
)
# 执行并行链
results = parallel_chain.execute()
print(f"🎯 并行链完成,结果: {len(results)} 组")
条件分支链¶
class ConditionalChain:
"""条件分支链"""
def __init__(self, store):
self.store = store
self.branches = {}
self.default_branch = None
def add_branch(self, condition, steps, name=None):
"""添加条件分支
Args:
condition: 条件函数,接收上下文,返回布尔值
steps: 该分支的步骤列表
name: 分支名称
"""
branch_name = name or f"branch_{len(self.branches) + 1}"
self.branches[branch_name] = {
"condition": condition,
"steps": steps
}
return self
def set_default_branch(self, steps):
"""设置默认分支"""
self.default_branch = steps
return self
def execute(self, context=None):
"""执行条件链"""
context = context or {}
# 查找匹配的分支
selected_branch = None
selected_name = None
for branch_name, branch in self.branches.items():
if branch["condition"](context):
selected_branch = branch["steps"]
selected_name = branch_name
break
# 如果没有匹配的分支,使用默认分支
if selected_branch is None:
if self.default_branch:
selected_branch = self.default_branch
selected_name = "default"
else:
raise Exception("没有匹配的分支且未设置默认分支")
print(f"🎯 选择分支: {selected_name}")
# 执行选中的分支
results = []
for i, step in enumerate(selected_branch):
try:
print(f"🔧 执行分支步骤 {i+1}: {step['tool_name']}")
arguments = step["arguments"]
if callable(arguments):
arguments = arguments(context)
result = self.store.call_tool(step["tool_name"], arguments)
results.append(result)
# 更新上下文
context[f"branch_step_{i+1}"] = result
context["last_result"] = result
print(f"✅ 分支步骤 {i+1} 完成")
except Exception as e:
print(f"❌ 分支步骤 {i+1} 失败: {e}")
results.append({"error": str(e)})
break
return {
"selected_branch": selected_name,
"results": results
}
# 使用条件分支链
conditional_chain = ConditionalChain(store)
# 添加文件大小检查分支
conditional_chain.add_branch(
condition=lambda ctx: ctx.get("file_size", 0) > 1000,
steps=[
{
"tool_name": "write_file",
"arguments": lambda ctx: {
"path": f"/tmp/large_{ctx['filename']}",
"content": ctx["content"]
}
}
],
name="large_file"
).add_branch(
condition=lambda ctx: ctx.get("file_size", 0) <= 1000,
steps=[
{
"tool_name": "write_file",
"arguments": lambda ctx: {
"path": f"/tmp/small_{ctx['filename']}",
"content": ctx["content"]
}
}
],
name="small_file"
).set_default_branch([
{
"tool_name": "write_file",
"arguments": lambda ctx: {
"path": f"/tmp/default_{ctx['filename']}",
"content": ctx["content"]
}
}
])
# 执行条件链
test_context = {
"filename": "test.txt",
"content": "A" * 500, # 500字符
"file_size": 500
}
result = conditional_chain.execute(test_context)
print(f"🎯 条件链完成,选择分支: {result['selected_branch']}")
🔄 链式调用模式¶
管道模式¶
class Pipeline:
"""管道模式链式调用"""
def __init__(self, store):
self.store = store
self.processors = []
def add_processor(self, processor):
"""添加处理器"""
self.processors.append(processor)
return self
def process(self, initial_data):
"""处理数据"""
data = initial_data
for i, processor in enumerate(self.processors):
try:
print(f"🔄 管道步骤 {i+1}: {processor.__name__}")
data = processor(self.store, data)
print(f"✅ 管道步骤 {i+1} 完成")
except Exception as e:
print(f"❌ 管道步骤 {i+1} 失败: {e}")
raise e
return data
# 定义处理器函数
def write_to_file(store, data):
"""写入文件处理器"""
result = store.call_tool("write_file", {
"path": data["file_path"],
"content": data["content"]
})
return {
**data,
"write_result": result,
"file_written": True
}
def read_and_verify(store, data):
"""读取验证处理器"""
result = store.call_tool("read_file", {
"path": data["file_path"]
})
return {
**data,
"read_result": result,
"content_verified": result.get("content") == data["content"]
}
def get_file_stats(store, data):
"""获取文件统计处理器"""
result = store.call_tool("get_file_info", {
"path": data["file_path"]
})
return {
**data,
"stats_result": result,
"file_size": result.get("size", 0)
}
# 使用管道
pipeline = Pipeline(store)
pipeline.add_processor(write_to_file) \
.add_processor(read_and_verify) \
.add_processor(get_file_stats)
# 处理数据
initial_data = {
"file_path": "/tmp/pipeline_test.txt",
"content": "Pipeline test content"
}
try:
final_data = pipeline.process(initial_data)
print(f"🎯 管道处理完成: {final_data['content_verified']}")
except Exception as e:
print(f"💥 管道处理失败: {e}")
工作流模式¶
from enum import Enum
class WorkflowStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class WorkflowStep:
"""工作流步骤"""
def __init__(self, name, tool_name, arguments=None, dependencies=None):
self.name = name
self.tool_name = tool_name
self.arguments = arguments or {}
self.dependencies = dependencies or []
self.status = WorkflowStatus.PENDING
self.result = None
self.error = None
class Workflow:
"""工作流引擎"""
def __init__(self, store):
self.store = store
self.steps = {}
self.execution_order = []
def add_step(self, step):
"""添加工作流步骤"""
self.steps[step.name] = step
return self
def execute(self, context=None):
"""执行工作流"""
context = context or {}
# 计算执行顺序
self._calculate_execution_order()
print(f"🚀 开始执行工作流,共 {len(self.execution_order)} 个步骤")
for step_name in self.execution_order:
step = self.steps[step_name]
try:
# 检查依赖
if not self._check_dependencies(step):
step.status = WorkflowStatus.FAILED
step.error = "依赖步骤未完成"
print(f"❌ 步骤 {step_name} 依赖检查失败")
continue
# 执行步骤
print(f"🔧 执行步骤: {step_name}")
step.status = WorkflowStatus.RUNNING
# 准备参数
arguments = step.arguments
if callable(arguments):
arguments = arguments(context)
# 调用工具
result = self.store.call_tool(step.tool_name, arguments)
step.result = result
step.status = WorkflowStatus.COMPLETED
# 更新上下文
context[step_name] = result
print(f"✅ 步骤 {step_name} 完成")
except Exception as e:
step.status = WorkflowStatus.FAILED
step.error = str(e)
print(f"❌ 步骤 {step_name} 失败: {e}")
# 可以选择继续或停止
if self._should_stop_on_failure(step):
break
return self._get_workflow_result()
def _calculate_execution_order(self):
"""计算执行顺序(拓扑排序)"""
visited = set()
order = []
def visit(step_name):
if step_name in visited:
return
visited.add(step_name)
step = self.steps[step_name]
# 先访问依赖
for dep in step.dependencies:
if dep in self.steps:
visit(dep)
order.append(step_name)
for step_name in self.steps:
visit(step_name)
self.execution_order = order
def _check_dependencies(self, step):
"""检查步骤依赖"""
for dep_name in step.dependencies:
if dep_name not in self.steps:
return False
dep_step = self.steps[dep_name]
if dep_step.status != WorkflowStatus.COMPLETED:
return False
return True
def _should_stop_on_failure(self, step):
"""判断是否应该在失败时停止"""
# 可以根据步骤配置决定
return True # 默认停止
def _get_workflow_result(self):
"""获取工作流结果"""
completed = sum(1 for step in self.steps.values() if step.status == WorkflowStatus.COMPLETED)
failed = sum(1 for step in self.steps.values() if step.status == WorkflowStatus.FAILED)
return {
"total_steps": len(self.steps),
"completed": completed,
"failed": failed,
"success_rate": completed / len(self.steps) * 100,
"steps": {name: {
"status": step.status.value,
"result": step.result,
"error": step.error
} for name, step in self.steps.items()}
}
# 使用工作流
workflow = Workflow(store)
# 添加工作流步骤
workflow.add_step(WorkflowStep(
name="create_directory",
tool_name="create_directory",
arguments={"path": "/tmp/workflow_test"}
))
workflow.add_step(WorkflowStep(
name="write_config",
tool_name="write_file",
arguments={
"path": "/tmp/workflow_test/config.txt",
"content": "workflow configuration"
},
dependencies=["create_directory"]
))
workflow.add_step(WorkflowStep(
name="write_data",
tool_name="write_file",
arguments={
"path": "/tmp/workflow_test/data.txt",
"content": "workflow data"
},
dependencies=["create_directory"]
))
workflow.add_step(WorkflowStep(
name="list_files",
tool_name="list_directory",
arguments={"path": "/tmp/workflow_test"},
dependencies=["write_config", "write_data"]
))
# 执行工作流
result = workflow.execute()
print(f"🎯 工作流完成,成功率: {result['success_rate']:.1f}%")
🔗 相关文档¶
📚 最佳实践¶
- 模块化设计:将复杂流程分解为独立的步骤
- 错误处理:为每个步骤提供适当的错误处理
- 上下文管理:合理管理步骤间的数据传递
- 依赖管理:明确定义步骤间的依赖关系
- 并行优化:识别可以并行执行的步骤
- 监控日志:记录链式调用的执行过程和结果
更新时间: 2025-01-09
版本: 1.0.0