stop_service() 服务停止方法¶
📋 概述¶
stop_service()
方法用于优雅地停止正在运行的 MCP 服务。该方法会关闭服务连接,清理资源,并确保服务进程正确终止。
🔧 方法签名¶
def stop_service(
self,
service_name: str,
timeout: Optional[float] = 30.0,
force: bool = False
) -> bool:
"""
停止指定的 MCP 服务
Args:
service_name: 服务名称
timeout: 停止超时时间(秒)
force: 是否强制停止
Returns:
bool: 停止是否成功
Raises:
ServiceNotFoundError: 服务不存在
ServiceStopError: 服务停止失败
"""
📝 参数说明¶
service_name (str)¶
- 必需参数
- 要停止的服务名称
- 必须是已注册且正在运行的服务
timeout (Optional[float])¶
- 可选参数,默认
30.0
- 服务停止的超时时间(秒)
- 超时后将根据
force
参数决定是否强制终止
force (bool)¶
- 可选参数,默认
False
- 是否强制停止服务
True
: 立即终止进程,可能导致数据丢失False
: 优雅停止,等待服务完成当前操作
📊 返回值¶
True
: 服务停止成功False
: 服务停止失败
💡 使用示例¶
基础服务停止¶
from mcpstore import MCPStore
# 初始化 MCPStore
store = MCPStore()
# 假设已有运行中的服务
try:
success = store.stop_service("filesystem")
if success:
print("✅ 文件系统服务停止成功")
else:
print("❌ 文件系统服务停止失败")
except Exception as e:
print(f"停止服务时发生错误: {e}")
强制停止服务¶
# 强制停止无响应的服务
success = store.stop_service(
"filesystem",
timeout=10.0, # 较短的超时时间
force=True # 强制停止
)
if success:
print("🔥 服务已强制停止")
else:
print("❌ 强制停止失败")
批量停止服务¶
# 获取所有运行中的服务
running_services = []
all_services = store.list_services()
for service in all_services:
if service.get('status') == 'running':
running_services.append(service['name'])
print(f"发现 {len(running_services)} 个运行中的服务")
# 批量停止
stop_results = {}
for service_name in running_services:
try:
success = store.stop_service(service_name, timeout=20.0)
stop_results[service_name] = success
print(f"{'✅' if success else '❌'} {service_name}: {'停止成功' if success else '停止失败'}")
except Exception as e:
stop_results[service_name] = False
print(f"❌ {service_name}: 停止异常 - {e}")
# 统计结果
successful = sum(1 for success in stop_results.values() if success)
total = len(stop_results)
print(f"\n停止结果: {successful}/{total} 个服务停止成功")
🔍 高级用法¶
优雅停止与监控¶
import time
def graceful_stop_service(store, service_name, max_wait_time=60.0):
"""优雅停止服务并监控过程"""
print(f"🛑 开始停止服务: {service_name}")
start_time = time.time()
try:
# 检查服务当前状态
initial_status = store.get_service_status(service_name)
print(f"📊 当前状态: {initial_status}")
if initial_status != "running":
print(f"ℹ️ 服务 {service_name} 未在运行中")
return True
# 尝试优雅停止
print("🔄 尝试优雅停止...")
success = store.stop_service(
service_name,
timeout=max_wait_time * 0.8, # 80% 时间用于优雅停止
force=False
)
elapsed_time = time.time() - start_time
if success:
print(f"✅ 服务 {service_name} 优雅停止成功")
print(f"⏱️ 停止耗时: {elapsed_time:.2f}s")
# 验证服务已停止
final_status = store.get_service_status(service_name)
print(f"📊 最终状态: {final_status}")
return True
else:
print(f"⚠️ 优雅停止失败,尝试强制停止...")
# 强制停止
force_success = store.stop_service(
service_name,
timeout=max_wait_time * 0.2, # 20% 时间用于强制停止
force=True
)
total_time = time.time() - start_time
if force_success:
print(f"🔥 服务 {service_name} 强制停止成功")
print(f"⏱️ 总耗时: {total_time:.2f}s")
return True
else:
print(f"💥 服务 {service_name} 停止完全失败")
print(f"⏱️ 总耗时: {total_time:.2f}s")
return False
except Exception as e:
elapsed_time = time.time() - start_time
print(f"💥 停止服务 {service_name} 时发生异常: {e}")
print(f"⏱️ 异常耗时: {elapsed_time:.2f}s")
return False
# 使用优雅停止
success = graceful_stop_service(store, "filesystem", max_wait_time=45.0)
依赖服务停止¶
def stop_services_with_dependencies(store, service_dependencies):
"""按依赖关系停止服务(逆序)"""
# 服务依赖关系(与启动时相同)
# service_dependencies = {
# "database": [],
# "auth": ["database"],
# "api": ["database", "auth"]
# }
# 计算停止顺序(依赖关系的逆序)
def get_stop_order(dependencies):
"""计算服务停止顺序"""
# 简单的拓扑排序逆序
order = []
visited = set()
def visit(service):
if service in visited:
return
visited.add(service)
# 先访问依赖此服务的其他服务
for svc, deps in dependencies.items():
if service in deps and svc not in visited:
visit(svc)
order.append(service)
for service in dependencies:
visit(service)
return order
stop_order = get_stop_order(service_dependencies)
print(f"🔄 服务停止顺序: {stop_order}")
stopped_services = set()
failed_services = set()
for service_name in stop_order:
try:
print(f"🛑 停止服务: {service_name}")
# 检查服务状态
status = store.get_service_status(service_name)
if status != "running":
print(f"ℹ️ 服务 {service_name} 未在运行中")
stopped_services.add(service_name)
continue
# 停止服务
success = store.stop_service(service_name, timeout=30.0)
if success:
stopped_services.add(service_name)
print(f"✅ 服务 {service_name} 停止成功")
else:
failed_services.add(service_name)
print(f"❌ 服务 {service_name} 停止失败")
except Exception as e:
failed_services.add(service_name)
print(f"💥 停止服务 {service_name} 时发生异常: {e}")
return {
'stopped': list(stopped_services),
'failed': list(failed_services)
}
# 定义服务依赖关系
dependencies = {
"database": [],
"auth": ["database"],
"filesystem": [],
"api": ["database", "auth"]
}
# 按依赖关系停止
results = stop_services_with_dependencies(store, dependencies)
print(f"停止成功: {results['stopped']}")
print(f"停止失败: {results['failed']}")
服务停止前的清理工作¶
def stop_service_with_cleanup(store, service_name):
"""停止服务前执行清理工作"""
print(f"🧹 开始清理服务: {service_name}")
try:
# 1. 完成正在进行的操作
print("⏳ 等待正在进行的操作完成...")
# 获取服务的活跃连接数
service_info = store.get_service_info(service_name)
active_connections = service_info.get('active_connections', 0)
if active_connections > 0:
print(f"🔗 发现 {active_connections} 个活跃连接,等待完成...")
time.sleep(5) # 给一些时间让连接完成
# 2. 保存服务状态(如果需要)
print("💾 保存服务状态...")
# 这里可以调用服务特定的保存方法
# 3. 通知其他服务
print("📢 通知依赖服务...")
# 通知其他可能依赖此服务的服务
# 4. 停止服务
print("🛑 停止服务...")
success = store.stop_service(service_name, timeout=30.0)
if success:
print(f"✅ 服务 {service_name} 清理并停止成功")
# 5. 清理临时文件和资源
print("🗑️ 清理临时资源...")
# 清理服务相关的临时文件、缓存等
return success
except Exception as e:
print(f"💥 清理停止过程中发生异常: {e}")
# 紧急停止
print("🚨 执行紧急停止...")
try:
return store.stop_service(service_name, force=True)
except:
return False
# 使用带清理的停止
success = stop_service_with_cleanup(store, "filesystem")
⚠️ 错误处理¶
常见错误类型¶
from mcpstore.exceptions import (
ServiceNotFoundError,
ServiceStopError,
ServiceTimeoutError
)
def safe_stop_service(store, service_name, force_on_timeout=True):
"""安全停止服务"""
try:
success = store.stop_service(service_name, timeout=30.0)
return success
except ServiceNotFoundError:
print(f"❌ 服务 {service_name} 不存在")
return False
except ServiceTimeoutError:
print(f"⏰ 服务 {service_name} 停止超时")
if force_on_timeout:
print("🔥 尝试强制停止...")
try:
return store.stop_service(service_name, force=True)
except Exception as e:
print(f"💥 强制停止也失败: {e}")
return False
return False
except ServiceStopError as e:
print(f"💥 服务 {service_name} 停止失败: {e}")
return False
except Exception as e:
print(f"🔥 未知错误: {e}")
return False
# 使用安全停止
success = safe_stop_service(store, "filesystem")
停止状态监控¶
def monitor_service_stop(store, service_name, check_interval=1.0):
"""监控服务停止过程"""
print(f"👀 开始监控服务停止: {service_name}")
# 启动停止过程(异步)
import threading
stop_result = {"success": None, "error": None}
def stop_worker():
try:
stop_result["success"] = store.stop_service(service_name)
except Exception as e:
stop_result["error"] = str(e)
stop_thread = threading.Thread(target=stop_worker)
stop_thread.start()
# 监控状态变化
previous_status = None
start_time = time.time()
while stop_thread.is_alive():
try:
current_status = store.get_service_status(service_name)
if current_status != previous_status:
elapsed = time.time() - start_time
print(f"📊 [{elapsed:.1f}s] 状态变化: {previous_status} → {current_status}")
previous_status = current_status
time.sleep(check_interval)
except Exception as e:
print(f"⚠️ 监控过程中发生错误: {e}")
break
# 等待停止完成
stop_thread.join()
total_time = time.time() - start_time
if stop_result["error"]:
print(f"💥 停止失败: {stop_result['error']}")
print(f"⏱️ 总耗时: {total_time:.2f}s")
return False
else:
success = stop_result["success"]
print(f"{'✅' if success else '❌'} 停止{'成功' if success else '失败'}")
print(f"⏱️ 总耗时: {total_time:.2f}s")
return success
# 使用监控停止
success = monitor_service_stop(store, "filesystem")
🔗 相关方法¶
start_service()
- 启动服务restart_service()
- 重启服务get_service_status()
- 获取服务状态check_services()
- 检查服务健康状态
📚 最佳实践¶
- 优雅停止优先:总是先尝试优雅停止,避免数据丢失
- 设置合理超时:给服务足够时间完成当前操作
- 处理依赖关系:按正确顺序停止有依赖关系的服务
- 监控停止过程:记录停止时间和状态变化
- 资源清理:确保停止后正确清理相关资源
- 错误恢复:停止失败时提供强制停止选项
更新时间: 2025-01-09
版本: 1.0.0