跳转至

start_service() 服务启动方法

📋 概述

start_service() 方法用于启动已注册但未运行的 MCP 服务。该方法会初始化服务连接,建立通信通道,并验证服务的可用性。

🔧 方法签名

def start_service(
    self, 
    service_name: str,
    timeout: Optional[float] = 30.0,
    retry_count: int = 3,
    retry_delay: float = 1.0
) -> bool:
    """
    启动指定的 MCP 服务

    Args:
        service_name: 服务名称
        timeout: 启动超时时间(秒)
        retry_count: 重试次数
        retry_delay: 重试间隔(秒)

    Returns:
        bool: 启动是否成功

    Raises:
        ServiceNotFoundError: 服务不存在
        ServiceStartError: 服务启动失败
    """

📝 参数说明

service_name (str)

  • 必需参数
  • 要启动的服务名称
  • 必须是已通过 add_service() 注册的服务

timeout (Optional[float])

  • 可选参数,默认 30.0
  • 服务启动的超时时间(秒)
  • 超时后将抛出异常

retry_count (int)

  • 可选参数,默认 3
  • 启动失败时的重试次数
  • 设置为 0 表示不重试

retry_delay (float)

  • 可选参数,默认 1.0
  • 重试之间的延迟时间(秒)

📊 返回值

  • True: 服务启动成功
  • False: 服务启动失败(在不抛出异常的情况下)

💡 使用示例

基础服务启动

from mcpstore import MCPStore

# 初始化 MCPStore
store = MCPStore()

# 注册服务
store.add_service({
    "mcpServers": {
        "filesystem": {
            "command": "npx",
            "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
        }
    }
})

# 启动服务
try:
    success = store.start_service("filesystem")
    if success:
        print("✅ 文件系统服务启动成功")
    else:
        print("❌ 文件系统服务启动失败")
except Exception as e:
    print(f"启动服务时发生错误: {e}")

自定义启动参数

# 设置较长的超时时间和更多重试次数
success = store.start_service(
    "filesystem",
    timeout=60.0,      # 60秒超时
    retry_count=5,     # 重试5次
    retry_delay=2.0    # 每次重试间隔2秒
)

if success:
    print("服务启动成功")

批量启动服务

# 注册多个服务
services_config = {
    "mcpServers": {
        "filesystem": {
            "command": "npx",
            "args": ["-y", "@modelcontextprotocol/server-filesystem", "/tmp"]
        },
        "web_search": {
            "command": "python",
            "args": ["-m", "web_search_server"]
        },
        "database": {
            "command": "python", 
            "args": ["-m", "database_server"]
        }
    }
}

store.add_service(services_config)

# 批量启动
service_names = ["filesystem", "web_search", "database"]
startup_results = {}

for service_name in service_names:
    try:
        success = store.start_service(service_name, timeout=45.0)
        startup_results[service_name] = success
        print(f"{'✅' if success else '❌'} {service_name}: {'成功' if success else '失败'}")
    except Exception as e:
        startup_results[service_name] = False
        print(f"❌ {service_name}: 启动异常 - {e}")

# 统计结果
successful = sum(1 for success in startup_results.values() if success)
total = len(startup_results)
print(f"\n启动结果: {successful}/{total} 个服务启动成功")

🔍 高级用法

服务启动状态监控

import time

def start_service_with_monitoring(store, service_name):
    """带监控的服务启动"""
    print(f"🚀 开始启动服务: {service_name}")
    start_time = time.time()

    try:
        # 启动服务
        success = store.start_service(service_name, timeout=30.0)

        elapsed_time = time.time() - start_time

        if success:
            print(f"✅ 服务 {service_name} 启动成功")
            print(f"⏱️ 启动耗时: {elapsed_time:.2f}s")

            # 验证服务状态
            status = store.get_service_status(service_name)
            print(f"📊 服务状态: {status}")

            # 获取可用工具
            tools = store.list_tools(service_name=service_name)
            print(f"🛠️ 可用工具: {len(tools)} 个")

        else:
            print(f"❌ 服务 {service_name} 启动失败")
            print(f"⏱️ 尝试耗时: {elapsed_time:.2f}s")

        return success

    except Exception as e:
        elapsed_time = time.time() - start_time
        print(f"💥 服务 {service_name} 启动异常: {e}")
        print(f"⏱️ 异常耗时: {elapsed_time:.2f}s")
        return False

# 使用监控启动
success = start_service_with_monitoring(store, "filesystem")

条件启动

def conditional_start_service(store, service_name, force_restart=False):
    """条件启动服务"""

    # 检查服务是否已经运行
    try:
        status = store.get_service_status(service_name)

        if status == "running" and not force_restart:
            print(f"ℹ️ 服务 {service_name} 已在运行中")
            return True

        elif status == "running" and force_restart:
            print(f"🔄 强制重启服务 {service_name}")
            store.stop_service(service_name)
            time.sleep(2)  # 等待服务完全停止

    except Exception as e:
        print(f"⚠️ 无法获取服务状态: {e}")

    # 启动服务
    return store.start_service(service_name)

# 使用条件启动
success = conditional_start_service(store, "filesystem", force_restart=True)

依赖服务启动

def start_services_with_dependencies(store, service_dependencies):
    """按依赖关系启动服务"""

    # 服务依赖关系定义
    # service_dependencies = {
    #     "database": [],  # 无依赖
    #     "auth": ["database"],  # 依赖 database
    #     "api": ["database", "auth"]  # 依赖 database 和 auth
    # }

    started_services = set()
    failed_services = set()

    def start_service_recursive(service_name):
        """递归启动服务及其依赖"""

        if service_name in started_services:
            return True

        if service_name in failed_services:
            return False

        # 先启动依赖服务
        dependencies = service_dependencies.get(service_name, [])
        for dep_service in dependencies:
            if not start_service_recursive(dep_service):
                print(f"❌ 依赖服务 {dep_service} 启动失败,无法启动 {service_name}")
                failed_services.add(service_name)
                return False

        # 启动当前服务
        try:
            print(f"🚀 启动服务: {service_name}")
            success = store.start_service(service_name)

            if success:
                started_services.add(service_name)
                print(f"✅ 服务 {service_name} 启动成功")
                return True
            else:
                failed_services.add(service_name)
                print(f"❌ 服务 {service_name} 启动失败")
                return False

        except Exception as e:
            failed_services.add(service_name)
            print(f"💥 服务 {service_name} 启动异常: {e}")
            return False

    # 启动所有服务
    all_services = list(service_dependencies.keys())
    results = {}

    for service_name in all_services:
        results[service_name] = start_service_recursive(service_name)

    return results

# 定义服务依赖关系
dependencies = {
    "database": [],
    "auth": ["database"], 
    "filesystem": [],
    "api": ["database", "auth"]
}

# 按依赖关系启动
results = start_services_with_dependencies(store, dependencies)
print(f"启动结果: {results}")

⚠️ 错误处理

常见错误类型

from mcpstore.exceptions import (
    ServiceNotFoundError, 
    ServiceStartError,
    ServiceTimeoutError
)

def safe_start_service(store, service_name):
    """安全启动服务"""
    try:
        success = store.start_service(service_name)
        return success

    except ServiceNotFoundError:
        print(f"❌ 服务 {service_name} 不存在")
        return False

    except ServiceTimeoutError:
        print(f"⏰ 服务 {service_name} 启动超时")
        return False

    except ServiceStartError as e:
        print(f"💥 服务 {service_name} 启动失败: {e}")
        return False

    except Exception as e:
        print(f"🔥 未知错误: {e}")
        return False

# 使用安全启动
success = safe_start_service(store, "filesystem")

启动重试机制

import time
import random

def robust_start_service(store, service_name, max_attempts=5):
    """健壮的服务启动(带指数退避重试)"""

    for attempt in range(max_attempts):
        try:
            print(f"🔄 第 {attempt + 1} 次尝试启动 {service_name}")

            success = store.start_service(
                service_name,
                timeout=30.0,
                retry_count=1  # 内部重试设为1,外部控制重试
            )

            if success:
                print(f"✅ 服务 {service_name} 启动成功")
                return True

        except Exception as e:
            print(f"❌ 第 {attempt + 1} 次尝试失败: {e}")

        # 指数退避延迟
        if attempt < max_attempts - 1:
            delay = (2 ** attempt) + random.uniform(0, 1)
            print(f"⏳ 等待 {delay:.1f}s 后重试...")
            time.sleep(delay)

    print(f"💥 服务 {service_name} 启动最终失败")
    return False

# 使用健壮启动
success = robust_start_service(store, "filesystem")

🔗 相关方法

📚 最佳实践

  1. 设置合理超时:根据服务复杂度设置适当的启动超时时间
  2. 处理依赖关系:按正确顺序启动有依赖关系的服务
  3. 监控启动过程:记录启动时间和状态变化
  4. 错误恢复:实现重试机制和错误处理
  5. 资源管理:确保启动失败时正确清理资源

更新时间: 2025-01-09
版本: 1.0.0