完整示例集合¶
📋 概述¶
本文档提供了 MCPStore 的完整使用示例,涵盖从基础操作到高级功能的各种场景。这些示例可以帮助您快速上手并掌握 MCPStore 的各种功能。
🚀 基础示例¶
示例1: 快速开始¶
from mcpstore import MCPStore
# 1. 初始化 MCPStore
store = MCPStore()
# 2. 添加文件系统服务
store.add_service({
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
})
# 3. 列出可用工具
tools = store.list_tools()
print(f"📋 可用工具: {len(tools)} 个")
for tool in tools[:3]: # 显示前3个
print(f" - {tool['name']}: {tool.get('description', '无描述')}")
# 4. 调用工具
result = store.call_tool("list_directory", {"path": "/tmp"})
print(f"📁 目录内容: {result}")
# 5. 使用便捷方法
content = store.use_tool("read_file", path="/tmp/test.txt")
print(f"📄 文件内容: {content}")
示例2: 多服务管理¶
from mcpstore import MCPStore
# 初始化 MCPStore
store = MCPStore()
# 添加多个服务
services_config = {
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
},
"web_search": {
"command": "python",
"args": ["-m", "web_search_server"]
},
"database": {
"command": "python",
"args": ["-m", "database_server", "--port", "5432"]
}
}
}
store.add_service(services_config)
# 检查所有服务状态
services = store.list_services()
print("🔍 服务状态检查:")
for service in services:
try:
status = store.get_service_status(service['name'])
tools_count = len(store.list_tools(service_name=service['name']))
print(f" ✅ {service['name']}: {status} ({tools_count} 个工具)")
except Exception as e:
print(f" ❌ {service['name']}: 错误 - {e}")
# 按服务调用工具
print("\n🛠️ 工具调用示例:")
# 文件操作
file_result = store.call_tool("filesystem_write_file", {
"path": "/tmp/example.txt",
"content": "Hello MCPStore!"
})
print(f"📝 文件写入: {file_result.get('success', False)}")
# Web搜索
search_result = store.call_tool("web_search_search", {
"query": "MCPStore documentation"
})
print(f"🔍 搜索结果: {len(search_result.get('results', []))} 条")
# 数据库查询
db_result = store.call_tool("database_query", {
"sql": "SELECT COUNT(*) FROM users"
})
print(f"💾 数据库查询: {db_result}")
🔄 批量操作示例¶
示例3: 批量文件处理¶
from mcpstore import MCPStore
import time
store = MCPStore()
store.add_service({
"mcpServers": {
"filesystem": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
}
}
})
# 批量创建文件
def batch_file_creation_example():
"""批量文件创建示例"""
print("📁 批量文件创建示例")
# 准备批量调用
batch_calls = []
for i in range(10):
batch_calls.append({
"tool_name": "write_file",
"arguments": {
"path": f"/tmp/batch_file_{i}.txt",
"content": f"这是批量创建的文件 {i}"
}
})
# 执行批量调用
start_time = time.time()
results = store.batch_call(batch_calls)
execution_time = time.time() - start_time
# 统计结果
successful = sum(1 for r in results if r.get('success'))
print(f"✅ 批量创建完成: {successful}/{len(results)} 成功")
print(f"⏱️ 执行时间: {execution_time:.2f}s")
return results
# 批量读取和处理
def batch_file_processing_example():
"""批量文件处理示例"""
print("\n📖 批量文件处理示例")
# 首先列出所有文件
dir_result = store.call_tool("list_directory", {"path": "/tmp"})
files = [f for f in dir_result.get('files', []) if f.startswith('batch_file_')]
# 批量读取文件
read_calls = []
for filename in files[:5]: # 只处理前5个文件
read_calls.append({
"tool_name": "read_file",
"arguments": {"path": f"/tmp/{filename}"}
})
# 执行批量读取
read_results = store.batch_call(read_calls)
# 处理读取结果
total_content_length = 0
for i, result in enumerate(read_results):
if result.get('success'):
content = result.get('content', '')
total_content_length += len(content)
print(f" 📄 文件 {i+1}: {len(content)} 字符")
print(f"📊 总内容长度: {total_content_length} 字符")
# 执行示例
batch_file_creation_example()
batch_file_processing_example()
示例4: 混合服务批量调用¶
def mixed_service_batch_example():
"""混合服务批量调用示例"""
print("🔀 混合服务批量调用示例")
# 准备混合调用
mixed_calls = [
# 文件操作
{
"tool_name": "write_file",
"arguments": {
"path": "/tmp/report.txt",
"content": "Daily Report\n============\n"
}
},
# Web搜索
{
"tool_name": "web_search",
"arguments": {"query": "MCPStore latest news"}
},
# 数据库查询
{
"tool_name": "database_query",
"arguments": {"sql": "SELECT COUNT(*) as user_count FROM users"}
},
# 文件读取
{
"tool_name": "read_file",
"arguments": {"path": "/tmp/report.txt"}
}
]
# 执行混合批量调用
start_time = time.time()
results = store.batch_call(mixed_calls, parallel=True)
execution_time = time.time() - start_time
# 处理结果
print(f"⚡ 混合调用完成,耗时: {execution_time:.2f}s")
for i, result in enumerate(results):
call = mixed_calls[i]
if result.get('success'):
print(f" ✅ {call['tool_name']}: 成功")
else:
print(f" ❌ {call['tool_name']}: 失败 - {result.get('error')}")
# 执行混合服务示例
mixed_service_batch_example()
🔗 链式调用示例¶
示例5: 文件处理工作流¶
from mcpstore.chaining import ToolChain
def file_workflow_example():
"""文件处理工作流示例"""
print("🔄 文件处理工作流示例")
# 创建工具链
chain = ToolChain(store)
# 构建工作流
chain.add_step(
"create_directory",
arguments={"path": "/tmp/workflow_demo"}
).add_step(
"write_file",
arguments=lambda ctx: {
"path": "/tmp/workflow_demo/input.txt",
"content": "Original content for processing"
}
).add_step(
"read_file",
arguments={"path": "/tmp/workflow_demo/input.txt"},
transform=lambda result, ctx: {
**result,
"processed_content": result.get('content', '').upper()
}
).add_step(
"write_file",
arguments=lambda ctx: {
"path": "/tmp/workflow_demo/output.txt",
"content": ctx['last_result']['processed_content']
}
).add_step(
"list_directory",
arguments={"path": "/tmp/workflow_demo"}
)
# 执行工作流
try:
results = chain.execute()
print(f"✅ 工作流完成,共 {len(results)} 个步骤")
# 显示最终结果
final_result = results[-1]
if final_result.get('success'):
files = final_result.get('files', [])
print(f"📁 生成的文件: {files}")
except Exception as e:
print(f"❌ 工作流失败: {e}")
file_workflow_example()
示例6: 数据处理管道¶
from mcpstore.chaining import Pipeline
def data_processing_pipeline_example():
"""数据处理管道示例"""
print("\n🔧 数据处理管道示例")
# 定义处理器函数
def fetch_data(store, context):
"""获取数据"""
result = store.call_tool("database_query", {
"sql": "SELECT name, email FROM users LIMIT 10"
})
return {
**context,
"raw_data": result.get('rows', []),
"record_count": len(result.get('rows', []))
}
def validate_data(store, context):
"""验证数据"""
raw_data = context['raw_data']
valid_records = []
for record in raw_data:
if record.get('email') and '@' in record['email']:
valid_records.append(record)
return {
**context,
"valid_data": valid_records,
"validation_rate": len(valid_records) / len(raw_data) * 100
}
def save_processed_data(store, context):
"""保存处理后的数据"""
valid_data = context['valid_data']
# 转换为CSV格式
csv_content = "name,email\n"
for record in valid_data:
csv_content += f"{record['name']},{record['email']}\n"
# 保存到文件
result = store.call_tool("write_file", {
"path": "/tmp/processed_users.csv",
"content": csv_content
})
return {
**context,
"output_file": "/tmp/processed_users.csv",
"save_success": result.get('success', False)
}
# 创建管道
pipeline = Pipeline(store)
pipeline.add_processor(fetch_data) \
.add_processor(validate_data) \
.add_processor(save_processed_data)
# 执行管道
try:
initial_context = {"pipeline_id": "data_processing_001"}
final_result = pipeline.process(initial_context)
print(f"📊 数据处理完成:")
print(f" 原始记录: {final_result['record_count']}")
print(f" 有效记录: {len(final_result['valid_data'])}")
print(f" 验证率: {final_result['validation_rate']:.1f}%")
print(f" 输出文件: {final_result['output_file']}")
except Exception as e:
print(f"❌ 数据处理失败: {e}")
data_processing_pipeline_example()
🔧 高级功能示例¶
示例7: 监控和性能分析¶
from mcpstore.monitoring import MonitoringDashboard
from mcpstore.performance import PerformanceBenchmark
def monitoring_example():
"""监控和性能分析示例"""
print("📊 监控和性能分析示例")
# 启动监控
dashboard = MonitoringDashboard(store)
dashboard.start_monitoring(interval=5)
# 执行一些操作来生成监控数据
print("🔄 执行操作生成监控数据...")
for i in range(20):
try:
# 随机选择操作
import random
operations = [
lambda: store.call_tool("list_directory", {"path": "/tmp"}),
lambda: store.call_tool("read_file", {"path": "/tmp/test.txt"}),
lambda: store.call_tool("write_file", {
"path": f"/tmp/monitor_test_{i}.txt",
"content": f"Monitor test {i}"
})
]
operation = random.choice(operations)
operation()
time.sleep(0.5) # 短暂延迟
except Exception as e:
print(f"⚠️ 操作 {i} 失败: {e}")
# 等待一段时间收集数据
time.sleep(10)
# 显示监控仪表板
dashboard.print_dashboard()
# 停止监控
dashboard.stop_monitoring()
# 性能基准测试
print("\n🏃 性能基准测试:")
benchmark = PerformanceBenchmark(store)
# 测试简单调用
def simple_call_test():
store.call_tool("list_directory", {"path": "/tmp"})
# 测试批量调用
def batch_call_test():
calls = [
{"tool_name": "list_directory", "arguments": {"path": "/tmp"}}
for _ in range(3)
]
store.batch_call(calls)
# 运行基准测试
benchmark.run_benchmark("简单调用", simple_call_test, iterations=30)
benchmark.run_benchmark("批量调用", batch_call_test, iterations=10)
# 显示结果
benchmark.print_results()
monitoring_example()
示例8: 错误处理和恢复¶
from mcpstore.error_handling import RetryManager, RetryConfig, FallbackManager
def error_handling_example():
"""错误处理和恢复示例"""
print("\n🛡️ 错误处理和恢复示例")
# 配置重试机制
retry_config = RetryConfig(
max_attempts=3,
strategy=RetryStrategy.EXPONENTIAL,
base_delay=1.0,
exceptions=(Exception,)
)
retry_manager = RetryManager(retry_config)
# 模拟可能失败的操作
def unreliable_operation():
"""不可靠的操作(有时会失败)"""
import random
if random.random() < 0.7: # 70% 失败率
raise Exception("模拟的网络错误")
return store.call_tool("list_directory", {"path": "/tmp"})
# 使用重试机制
try:
print("🔄 尝试不可靠操作(带重试)...")
result = retry_manager.execute(unreliable_operation)
print(f"✅ 操作成功: {len(result.get('files', []))} 个文件")
except Exception as e:
print(f"❌ 操作最终失败: {e}")
# 配置降级机制
fallback_manager = FallbackManager()
# 添加缓存降级策略
from mcpstore.error_handling import CacheFallback, DefaultValueFallback
fallback_manager.add_strategy(CacheFallback(cache_duration=300))
fallback_manager.add_strategy(DefaultValueFallback({"files": [], "fallback": True}))
# 使用降级机制
def get_directory_listing():
"""获取目录列表(可能失败)"""
# 模拟服务不可用
raise Exception("服务暂时不可用")
try:
print("\n🔄 尝试获取目录列表(带降级)...")
result = fallback_manager.execute_with_fallback(get_directory_listing)
if result.get('fallback'):
print("📦 使用了降级策略")
else:
print(f"✅ 正常获取: {len(result.get('files', []))} 个文件")
except Exception as e:
print(f"❌ 所有策略都失败: {e}")
error_handling_example()
🎯 实际应用场景¶
示例9: 自动化报告生成¶
def automated_report_example():
"""自动化报告生成示例"""
print("📋 自动化报告生成示例")
from datetime import datetime
# 报告生成工作流
def generate_daily_report():
"""生成日常报告"""
# 1. 收集系统信息
system_info = store.call_tool("get_system_info", {})
# 2. 查询数据库统计
db_stats = store.call_tool("database_query", {
"sql": "SELECT COUNT(*) as total_users, MAX(created_at) as last_signup FROM users"
})
# 3. 检查文件系统使用情况
disk_usage = store.call_tool("get_disk_usage", {"path": "/tmp"})
# 4. 生成报告内容
report_content = f"""
日常系统报告
=============
生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
系统信息:
- CPU使用率: {system_info.get('cpu_percent', 'N/A')}%
- 内存使用率: {system_info.get('memory_percent', 'N/A')}%
数据库统计:
- 总用户数: {db_stats.get('total_users', 'N/A')}
- 最后注册: {db_stats.get('last_signup', 'N/A')}
磁盘使用:
- 已用空间: {disk_usage.get('used_gb', 'N/A')} GB
- 可用空间: {disk_usage.get('free_gb', 'N/A')} GB
报告生成完成。
"""
# 5. 保存报告
report_filename = f"/tmp/daily_report_{datetime.now().strftime('%Y%m%d')}.txt"
save_result = store.call_tool("write_file", {
"path": report_filename,
"content": report_content
})
if save_result.get('success'):
print(f"✅ 报告已保存到: {report_filename}")
# 6. 可选:发送邮件通知
# email_result = store.call_tool("send_email", {
# "to": "admin@example.com",
# "subject": "日常系统报告",
# "body": "请查看附件中的系统报告",
# "attachment": report_filename
# })
return report_filename
# 执行报告生成
try:
report_file = generate_daily_report()
print(f"📊 报告生成完成: {report_file}")
except Exception as e:
print(f"❌ 报告生成失败: {e}")
automated_report_example()
示例10: 文件同步系统¶
def file_sync_example():
"""文件同步系统示例"""
print("\n🔄 文件同步系统示例")
def sync_directories(source_dir, target_dir):
"""同步目录"""
# 1. 获取源目录文件列表
source_files = store.call_tool("list_directory", {"path": source_dir})
# 2. 获取目标目录文件列表
target_files = store.call_tool("list_directory", {"path": target_dir})
source_file_names = set(source_files.get('files', []))
target_file_names = set(target_files.get('files', []))
# 3. 找出需要同步的文件
files_to_copy = source_file_names - target_file_names
files_to_delete = target_file_names - source_file_names
print(f"📁 同步分析:")
print(f" 需要复制: {len(files_to_copy)} 个文件")
print(f" 需要删除: {len(files_to_delete)} 个文件")
# 4. 批量复制文件
if files_to_copy:
copy_calls = []
for filename in files_to_copy:
# 读取源文件
copy_calls.append({
"tool_name": "read_file",
"arguments": {"path": f"{source_dir}/{filename}"}
})
# 批量读取
read_results = store.batch_call(copy_calls)
# 批量写入
write_calls = []
for i, filename in enumerate(files_to_copy):
read_result = read_results[i]
if read_result.get('success'):
write_calls.append({
"tool_name": "write_file",
"arguments": {
"path": f"{target_dir}/{filename}",
"content": read_result.get('content', '')
}
})
if write_calls:
write_results = store.batch_call(write_calls)
successful_copies = sum(1 for r in write_results if r.get('success'))
print(f"✅ 成功复制: {successful_copies}/{len(write_calls)} 个文件")
# 5. 批量删除文件
if files_to_delete:
delete_calls = []
for filename in files_to_delete:
delete_calls.append({
"tool_name": "delete_file",
"arguments": {"path": f"{target_dir}/{filename}"}
})
delete_results = store.batch_call(delete_calls)
successful_deletes = sum(1 for r in delete_results if r.get('success'))
print(f"🗑️ 成功删除: {successful_deletes}/{len(delete_calls)} 个文件")
print("🎯 目录同步完成")
# 执行同步
try:
sync_directories("/tmp/source", "/tmp/backup")
except Exception as e:
print(f"❌ 同步失败: {e}")
file_sync_example()
🔗 相关文档¶
📚 最佳实践总结¶
- 服务管理:合理配置服务,定期检查服务状态
- 错误处理:实现完善的错误处理和重试机制
- 性能优化:使用批量调用和链式调用提高效率
- 监控分析:建立监控体系,分析使用模式
- 资源管理:及时清理临时文件和资源
- 安全考虑:验证输入参数,控制访问权限
更新时间: 2025-01-09
版本: 1.0.0