跳转至

stop_service() 服务停止方法

📋 概述

stop_service() 方法用于优雅地停止正在运行的 MCP 服务。该方法会关闭服务连接,清理资源,并确保服务进程正确终止。

🔧 方法签名

def stop_service(
    self, 
    service_name: str,
    timeout: Optional[float] = 30.0,
    force: bool = False
) -> bool:
    """
    停止指定的 MCP 服务

    Args:
        service_name: 服务名称
        timeout: 停止超时时间(秒)
        force: 是否强制停止

    Returns:
        bool: 停止是否成功

    Raises:
        ServiceNotFoundError: 服务不存在
        ServiceStopError: 服务停止失败
    """

📝 参数说明

service_name (str)

  • 必需参数
  • 要停止的服务名称
  • 必须是已注册且正在运行的服务

timeout (Optional[float])

  • 可选参数,默认 30.0
  • 服务停止的超时时间(秒)
  • 超时后将根据 force 参数决定是否强制终止

force (bool)

  • 可选参数,默认 False
  • 是否强制停止服务
  • True: 立即终止进程,可能导致数据丢失
  • False: 优雅停止,等待服务完成当前操作

📊 返回值

  • True: 服务停止成功
  • False: 服务停止失败

💡 使用示例

基础服务停止

from mcpstore import MCPStore

# 初始化 MCPStore
store = MCPStore()

# 假设已有运行中的服务
try:
    success = store.stop_service("filesystem")
    if success:
        print("✅ 文件系统服务停止成功")
    else:
        print("❌ 文件系统服务停止失败")
except Exception as e:
    print(f"停止服务时发生错误: {e}")

强制停止服务

# 强制停止无响应的服务
success = store.stop_service(
    "filesystem",
    timeout=10.0,  # 较短的超时时间
    force=True     # 强制停止
)

if success:
    print("🔥 服务已强制停止")
else:
    print("❌ 强制停止失败")

批量停止服务

# 获取所有运行中的服务
running_services = []
all_services = store.list_services()

for service in all_services:
    if service.get('status') == 'running':
        running_services.append(service['name'])

print(f"发现 {len(running_services)} 个运行中的服务")

# 批量停止
stop_results = {}
for service_name in running_services:
    try:
        success = store.stop_service(service_name, timeout=20.0)
        stop_results[service_name] = success
        print(f"{'✅' if success else '❌'} {service_name}: {'停止成功' if success else '停止失败'}")
    except Exception as e:
        stop_results[service_name] = False
        print(f"❌ {service_name}: 停止异常 - {e}")

# 统计结果
successful = sum(1 for success in stop_results.values() if success)
total = len(stop_results)
print(f"\n停止结果: {successful}/{total} 个服务停止成功")

🔍 高级用法

优雅停止与监控

import time

def graceful_stop_service(store, service_name, max_wait_time=60.0):
    """优雅停止服务并监控过程"""

    print(f"🛑 开始停止服务: {service_name}")
    start_time = time.time()

    try:
        # 检查服务当前状态
        initial_status = store.get_service_status(service_name)
        print(f"📊 当前状态: {initial_status}")

        if initial_status != "running":
            print(f"ℹ️ 服务 {service_name} 未在运行中")
            return True

        # 尝试优雅停止
        print("🔄 尝试优雅停止...")
        success = store.stop_service(
            service_name, 
            timeout=max_wait_time * 0.8,  # 80% 时间用于优雅停止
            force=False
        )

        elapsed_time = time.time() - start_time

        if success:
            print(f"✅ 服务 {service_name} 优雅停止成功")
            print(f"⏱️ 停止耗时: {elapsed_time:.2f}s")

            # 验证服务已停止
            final_status = store.get_service_status(service_name)
            print(f"📊 最终状态: {final_status}")

            return True
        else:
            print(f"⚠️ 优雅停止失败,尝试强制停止...")

            # 强制停止
            force_success = store.stop_service(
                service_name,
                timeout=max_wait_time * 0.2,  # 20% 时间用于强制停止
                force=True
            )

            total_time = time.time() - start_time

            if force_success:
                print(f"🔥 服务 {service_name} 强制停止成功")
                print(f"⏱️ 总耗时: {total_time:.2f}s")
                return True
            else:
                print(f"💥 服务 {service_name} 停止完全失败")
                print(f"⏱️ 总耗时: {total_time:.2f}s")
                return False

    except Exception as e:
        elapsed_time = time.time() - start_time
        print(f"💥 停止服务 {service_name} 时发生异常: {e}")
        print(f"⏱️ 异常耗时: {elapsed_time:.2f}s")
        return False

# 使用优雅停止
success = graceful_stop_service(store, "filesystem", max_wait_time=45.0)

依赖服务停止

def stop_services_with_dependencies(store, service_dependencies):
    """按依赖关系停止服务(逆序)"""

    # 服务依赖关系(与启动时相同)
    # service_dependencies = {
    #     "database": [],
    #     "auth": ["database"],
    #     "api": ["database", "auth"]
    # }

    # 计算停止顺序(依赖关系的逆序)
    def get_stop_order(dependencies):
        """计算服务停止顺序"""
        # 简单的拓扑排序逆序
        order = []
        visited = set()

        def visit(service):
            if service in visited:
                return
            visited.add(service)

            # 先访问依赖此服务的其他服务
            for svc, deps in dependencies.items():
                if service in deps and svc not in visited:
                    visit(svc)

            order.append(service)

        for service in dependencies:
            visit(service)

        return order

    stop_order = get_stop_order(service_dependencies)
    print(f"🔄 服务停止顺序: {stop_order}")

    stopped_services = set()
    failed_services = set()

    for service_name in stop_order:
        try:
            print(f"🛑 停止服务: {service_name}")

            # 检查服务状态
            status = store.get_service_status(service_name)
            if status != "running":
                print(f"ℹ️ 服务 {service_name} 未在运行中")
                stopped_services.add(service_name)
                continue

            # 停止服务
            success = store.stop_service(service_name, timeout=30.0)

            if success:
                stopped_services.add(service_name)
                print(f"✅ 服务 {service_name} 停止成功")
            else:
                failed_services.add(service_name)
                print(f"❌ 服务 {service_name} 停止失败")

        except Exception as e:
            failed_services.add(service_name)
            print(f"💥 停止服务 {service_name} 时发生异常: {e}")

    return {
        'stopped': list(stopped_services),
        'failed': list(failed_services)
    }

# 定义服务依赖关系
dependencies = {
    "database": [],
    "auth": ["database"],
    "filesystem": [],
    "api": ["database", "auth"]
}

# 按依赖关系停止
results = stop_services_with_dependencies(store, dependencies)
print(f"停止成功: {results['stopped']}")
print(f"停止失败: {results['failed']}")

服务停止前的清理工作

def stop_service_with_cleanup(store, service_name):
    """停止服务前执行清理工作"""

    print(f"🧹 开始清理服务: {service_name}")

    try:
        # 1. 完成正在进行的操作
        print("⏳ 等待正在进行的操作完成...")

        # 获取服务的活跃连接数
        service_info = store.get_service_info(service_name)
        active_connections = service_info.get('active_connections', 0)

        if active_connections > 0:
            print(f"🔗 发现 {active_connections} 个活跃连接,等待完成...")
            time.sleep(5)  # 给一些时间让连接完成

        # 2. 保存服务状态(如果需要)
        print("💾 保存服务状态...")
        # 这里可以调用服务特定的保存方法

        # 3. 通知其他服务
        print("📢 通知依赖服务...")
        # 通知其他可能依赖此服务的服务

        # 4. 停止服务
        print("🛑 停止服务...")
        success = store.stop_service(service_name, timeout=30.0)

        if success:
            print(f"✅ 服务 {service_name} 清理并停止成功")

            # 5. 清理临时文件和资源
            print("🗑️ 清理临时资源...")
            # 清理服务相关的临时文件、缓存等

        return success

    except Exception as e:
        print(f"💥 清理停止过程中发生异常: {e}")

        # 紧急停止
        print("🚨 执行紧急停止...")
        try:
            return store.stop_service(service_name, force=True)
        except:
            return False

# 使用带清理的停止
success = stop_service_with_cleanup(store, "filesystem")

⚠️ 错误处理

常见错误类型

from mcpstore.exceptions import (
    ServiceNotFoundError,
    ServiceStopError, 
    ServiceTimeoutError
)

def safe_stop_service(store, service_name, force_on_timeout=True):
    """安全停止服务"""
    try:
        success = store.stop_service(service_name, timeout=30.0)
        return success

    except ServiceNotFoundError:
        print(f"❌ 服务 {service_name} 不存在")
        return False

    except ServiceTimeoutError:
        print(f"⏰ 服务 {service_name} 停止超时")

        if force_on_timeout:
            print("🔥 尝试强制停止...")
            try:
                return store.stop_service(service_name, force=True)
            except Exception as e:
                print(f"💥 强制停止也失败: {e}")
                return False
        return False

    except ServiceStopError as e:
        print(f"💥 服务 {service_name} 停止失败: {e}")
        return False

    except Exception as e:
        print(f"🔥 未知错误: {e}")
        return False

# 使用安全停止
success = safe_stop_service(store, "filesystem")

停止状态监控

def monitor_service_stop(store, service_name, check_interval=1.0):
    """监控服务停止过程"""

    print(f"👀 开始监控服务停止: {service_name}")

    # 启动停止过程(异步)
    import threading

    stop_result = {"success": None, "error": None}

    def stop_worker():
        try:
            stop_result["success"] = store.stop_service(service_name)
        except Exception as e:
            stop_result["error"] = str(e)

    stop_thread = threading.Thread(target=stop_worker)
    stop_thread.start()

    # 监控状态变化
    previous_status = None
    start_time = time.time()

    while stop_thread.is_alive():
        try:
            current_status = store.get_service_status(service_name)

            if current_status != previous_status:
                elapsed = time.time() - start_time
                print(f"📊 [{elapsed:.1f}s] 状态变化: {previous_status}{current_status}")
                previous_status = current_status

            time.sleep(check_interval)

        except Exception as e:
            print(f"⚠️ 监控过程中发生错误: {e}")
            break

    # 等待停止完成
    stop_thread.join()

    total_time = time.time() - start_time

    if stop_result["error"]:
        print(f"💥 停止失败: {stop_result['error']}")
        print(f"⏱️ 总耗时: {total_time:.2f}s")
        return False
    else:
        success = stop_result["success"]
        print(f"{'✅' if success else '❌'} 停止{'成功' if success else '失败'}")
        print(f"⏱️ 总耗时: {total_time:.2f}s")
        return success

# 使用监控停止
success = monitor_service_stop(store, "filesystem")

🔗 相关方法

📚 最佳实践

  1. 优雅停止优先:总是先尝试优雅停止,避免数据丢失
  2. 设置合理超时:给服务足够时间完成当前操作
  3. 处理依赖关系:按正确顺序停止有依赖关系的服务
  4. 监控停止过程:记录停止时间和状态变化
  5. 资源清理:确保停止后正确清理相关资源
  6. 错误恢复:停止失败时提供强制停止选项

更新时间: 2025-01-09
版本: 1.0.0