跳转至

生命周期管理示例

本文档提供 MCPStore 服务生命周期管理的完整实际示例,涵盖监控、故障恢复、配置优化等各种场景。

🚀 基础生命周期管理

服务状态监控

from mcpstore import MCPStore
from mcpstore.core.models.service import ServiceConnectionState
import time

def monitor_service_states():
    """监控服务状态变化"""
    store = MCPStore.setup_store()

    # 注册测试服务
    store.for_store().add_service({
        "name": "test_service",
        "url": "https://httpbin.org/delay/2"  # 模拟慢响应
    })

    print("🔍 开始监控服务状态变化...")
    last_states = {}

    for i in range(120):  # 监控2分钟
        services = store.for_store().list_services()

        for service in services:
            current_state = service.status
            last_state = last_states.get(service.name)

            if current_state != last_state:
                timestamp = time.strftime("%H:%M:%S")
                print(f"[{timestamp}] 🔄 {service.name}: {last_state}{current_state}")
                last_states[service.name] = current_state

                # 获取详细状态信息
                service_info = store.for_store().get_service_info(service.name)
                if service_info and service_info.state_metadata:
                    metadata = service_info.state_metadata
                    print(f"         失败次数: {metadata.consecutive_failures}")
                    print(f"         重连次数: {metadata.reconnect_attempts}")
                    if metadata.error_message:
                        print(f"         错误信息: {metadata.error_message}")

        time.sleep(1)

# 使用
monitor_service_states()

生命周期事件处理

def lifecycle_event_handler():
    """生命周期事件处理器"""
    store = MCPStore.setup_store()

    def on_service_state_change(service_name, old_state, new_state, metadata):
        """服务状态变化回调"""
        print(f"📢 服务状态变化事件:")
        print(f"   服务: {service_name}")
        print(f"   状态: {old_state}{new_state}")

        # 根据状态变化执行不同操作
        if new_state == ServiceConnectionState.WARNING:
            print(f"⚠️ 服务 {service_name} 进入警告状态,开始密切监控")

        elif new_state == ServiceConnectionState.RECONNECTING:
            print(f"🔄 服务 {service_name} 开始重连,预计恢复时间: 30-60秒")

        elif new_state == ServiceConnectionState.UNREACHABLE:
            print(f"❌ 服务 {service_name} 不可达,考虑手动干预")
            # 可以在这里发送告警
            # send_alert(service_name, new_state, metadata.error_message)

        elif new_state == ServiceConnectionState.HEALTHY:
            print(f"✅ 服务 {service_name} 恢复健康")

    # 注册事件处理器(伪代码,实际需要根据具体实现)
    # store._orchestrator.lifecycle_manager.on_state_change = on_service_state_change

    return on_service_state_change

# 使用
handler = lifecycle_event_handler()

🛡️ 故障恢复管理

自动故障恢复

def auto_recovery_system():
    """自动故障恢复系统"""
    store = MCPStore.setup_store()

    def check_and_recover():
        """检查并恢复故障服务"""
        services = store.for_store().list_services()

        for service in services:
            if service.status == ServiceConnectionState.UNREACHABLE:
                print(f"🔧 检测到不可达服务: {service.name}")

                # 获取详细信息
                service_info = store.for_store().get_service_info(service.name)
                if service_info and service_info.state_metadata:
                    metadata = service_info.state_metadata

                    # 检查服务不可达时间
                    if metadata.state_entered_time:
                        from datetime import datetime
                        duration = datetime.now() - metadata.state_entered_time

                        if duration.total_seconds() > 300:  # 5分钟
                            print(f"   服务已不可达 {duration.total_seconds():.0f} 秒,尝试重启")

                            try:
                                # 尝试重启服务
                                success = store.for_store().restart_service(service.name)
                                if success:
                                    print(f"   ✅ 服务 {service.name} 重启成功")
                                else:
                                    print(f"   ❌ 服务 {service.name} 重启失败")

                                    # 重启失败,尝试重新注册
                                    if service_info.config:
                                        print(f"   🔄 尝试重新注册服务 {service.name}")
                                        store.for_store().remove_service(service.name)
                                        store.for_store().add_service(service_info.config)

                            except Exception as e:
                                print(f"   ❌ 恢复服务 {service.name} 时出错: {e}")

    # 定期检查和恢复
    import threading
    import time

    def recovery_loop():
        while True:
            try:
                check_and_recover()
                time.sleep(60)  # 每分钟检查一次
            except Exception as e:
                print(f"自动恢复系统错误: {e}")
                time.sleep(120)  # 出错时等待更长时间

    recovery_thread = threading.Thread(target=recovery_loop, daemon=True)
    recovery_thread.start()

    print("🛡️ 自动故障恢复系统已启动")
    return recovery_thread

# 使用
recovery_thread = auto_recovery_system()

手动故障诊断和恢复

def manual_recovery_toolkit():
    """手动故障恢复工具包"""
    store = MCPStore.setup_store()

    def diagnose_service(service_name):
        """诊断单个服务"""
        print(f"🔍 诊断服务: {service_name}")
        print("=" * 40)

        service_info = store.for_store().get_service_info(service_name)
        if not service_info:
            print("❌ 服务不存在")
            return False

        print(f"当前状态: {service_info.status}")
        print(f"服务类型: {'远程服务' if service_info.url else '本地服务'}")

        if service_info.state_metadata:
            metadata = service_info.state_metadata
            print(f"连续失败: {metadata.consecutive_failures}")
            print(f"重连次数: {metadata.reconnect_attempts}")
            print(f"最后成功: {metadata.last_success_time}")
            print(f"最后失败: {metadata.last_failure_time}")
            print(f"响应时间: {metadata.response_time}ms")

            if metadata.error_message:
                print(f"错误信息: {metadata.error_message}")

        # 执行健康检查
        print("\n🏥 执行健康检查...")
        health_info = store.for_store().check_services()

        # 查找当前服务的健康信息
        for health in health_info:
            if health.name == service_name:
                print(f"健康状态: {health.status}")
                print(f"响应时间: {health.response_time:.2f}ms")
                print(f"成功率: {health.success_rate:.1f}%")
                break

        return True

    def recover_service(service_name):
        """恢复单个服务"""
        print(f"🔧 恢复服务: {service_name}")

        # 方法1: 重启服务
        print("尝试重启服务...")
        success = store.for_store().restart_service(service_name)
        if success:
            print("✅ 重启成功")
            return True

        # 方法2: 重新注册服务
        print("重启失败,尝试重新注册...")
        service_info = store.for_store().get_service_info(service_name)
        if service_info and service_info.config:
            try:
                store.for_store().remove_service(service_name)
                store.for_store().add_service(service_info.config)
                print("✅ 重新注册成功")
                return True
            except Exception as e:
                print(f"❌ 重新注册失败: {e}")

        print("❌ 所有恢复方法都失败了")
        return False

    def batch_recovery():
        """批量恢复故障服务"""
        services = store.for_store().list_services()
        problem_services = [
            s for s in services 
            if s.status in [
                ServiceConnectionState.UNREACHABLE,
                ServiceConnectionState.RECONNECTING
            ]
        ]

        if not problem_services:
            print("✅ 没有发现问题服务")
            return

        print(f"🚨 发现 {len(problem_services)} 个问题服务")

        for service in problem_services:
            print(f"\n处理服务: {service.name}")
            diagnose_service(service.name)

            user_input = input(f"是否尝试恢复服务 {service.name}? (y/n): ")
            if user_input.lower() == 'y':
                recover_service(service.name)

    return {
        'diagnose': diagnose_service,
        'recover': recover_service,
        'batch_recovery': batch_recovery
    }

# 使用
toolkit = manual_recovery_toolkit()

# 诊断特定服务
# toolkit['diagnose']('weather')

# 恢复特定服务
# toolkit['recover']('weather')

# 批量恢复
# toolkit['batch_recovery']()

📊 高级监控和分析

性能分析仪表板

def performance_dashboard():
    """性能分析仪表板"""
    import time
    import os
    from collections import defaultdict, deque

    store = MCPStore.setup_store()

    # 性能数据收集器
    performance_data = defaultdict(lambda: {
        'response_times': deque(maxlen=100),
        'success_count': 0,
        'failure_count': 0,
        'state_history': deque(maxlen=50)
    })

    def collect_performance_data():
        """收集性能数据"""
        services = store.for_store().list_services()

        for service in services:
            service_data = performance_data[service.name]

            # 记录状态历史
            service_data['state_history'].append({
                'timestamp': time.time(),
                'state': service.status
            })

            # 获取详细信息
            service_info = store.for_store().get_service_info(service.name)
            if service_info and service_info.state_metadata:
                metadata = service_info.state_metadata

                if metadata.response_time:
                    service_data['response_times'].append(metadata.response_time)

                if service.status == ServiceConnectionState.HEALTHY:
                    service_data['success_count'] += 1
                else:
                    service_data['failure_count'] += 1

    def display_dashboard():
        """显示仪表板"""
        os.system('clear' if os.name == 'posix' else 'cls')

        print("📊 MCPStore 性能分析仪表板")
        print("=" * 60)
        print(f"更新时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
        print()

        services = store.for_store().list_services()

        # 总体统计
        total_services = len(services)
        healthy_services = sum(1 for s in services if s.status == ServiceConnectionState.HEALTHY)
        health_rate = (healthy_services / total_services * 100) if total_services > 0 else 0

        print(f"📈 总体状态:")
        print(f"   总服务数: {total_services}")
        print(f"   健康服务: {healthy_services}")
        print(f"   健康率: {health_rate:.1f}%")
        print()

        # 服务详情
        print(f"📋 服务性能详情:")
        for service in services:
            service_data = performance_data[service.name]

            # 计算平均响应时间
            avg_response_time = 0
            if service_data['response_times']:
                avg_response_time = sum(service_data['response_times']) / len(service_data['response_times'])

            # 计算可用性
            total_checks = service_data['success_count'] + service_data['failure_count']
            availability = (service_data['success_count'] / total_checks * 100) if total_checks > 0 else 0

            status_icon = {
                ServiceConnectionState.HEALTHY: "✅",
                ServiceConnectionState.WARNING: "⚠️",
                ServiceConnectionState.RECONNECTING: "🔄",
                ServiceConnectionState.UNREACHABLE: "❌",
                ServiceConnectionState.INITIALIZING: "🔧"
            }.get(service.status, "❓")

            print(f"   {status_icon} {service.name}")
            print(f"      状态: {service.status}")
            print(f"      平均响应: {avg_response_time:.2f}ms")
            print(f"      可用性: {availability:.1f}%")
            print()

    # 主循环
    while True:
        try:
            collect_performance_data()
            display_dashboard()
            time.sleep(5)  # 每5秒更新一次
        except KeyboardInterrupt:
            print("\n仪表板已停止")
            break
        except Exception as e:
            print(f"仪表板错误: {e}")
            time.sleep(10)

# 使用
# performance_dashboard()  # 启动仪表板

生命周期报告生成

def generate_lifecycle_report():
    """生成生命周期报告"""
    store = MCPStore.setup_store()
    from datetime import datetime, timedelta

    def collect_report_data():
        """收集报告数据"""
        services = store.for_store().list_services()

        report_data = {
            'timestamp': datetime.now(),
            'total_services': len(services),
            'services': [],
            'summary': {
                'healthy': 0,
                'warning': 0,
                'reconnecting': 0,
                'unreachable': 0,
                'other': 0
            }
        }

        for service in services:
            service_info = store.for_store().get_service_info(service.name)

            service_data = {
                'name': service.name,
                'status': service.status,
                'type': 'remote' if service.url else 'local',
                'url': service.url or '',
                'command': service.command or '',
                'tool_count': service.tool_count,
                'uptime': None,
                'last_failure': None,
                'failure_count': 0,
                'reconnect_count': 0
            }

            if service_info and service_info.state_metadata:
                metadata = service_info.state_metadata
                service_data.update({
                    'failure_count': metadata.consecutive_failures,
                    'reconnect_count': metadata.reconnect_attempts,
                    'last_failure': metadata.last_failure_time,
                    'response_time': metadata.response_time
                })

                # 计算运行时间
                if metadata.state_entered_time and service.status == ServiceConnectionState.HEALTHY:
                    uptime = datetime.now() - metadata.state_entered_time
                    service_data['uptime'] = uptime.total_seconds()

            report_data['services'].append(service_data)

            # 统计状态分布
            if service.status == ServiceConnectionState.HEALTHY:
                report_data['summary']['healthy'] += 1
            elif service.status == ServiceConnectionState.WARNING:
                report_data['summary']['warning'] += 1
            elif service.status == ServiceConnectionState.RECONNECTING:
                report_data['summary']['reconnecting'] += 1
            elif service.status == ServiceConnectionState.UNREACHABLE:
                report_data['summary']['unreachable'] += 1
            else:
                report_data['summary']['other'] += 1

        return report_data

    def format_report(data):
        """格式化报告"""
        report = []
        report.append("📊 MCPStore 生命周期报告")
        report.append("=" * 50)
        report.append(f"生成时间: {data['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"总服务数: {data['total_services']}")
        report.append("")

        # 状态摘要
        report.append("📈 状态摘要:")
        summary = data['summary']
        total = data['total_services']

        if total > 0:
            report.append(f"   ✅ 健康: {summary['healthy']} ({summary['healthy']/total*100:.1f}%)")
            report.append(f"   ⚠️ 警告: {summary['warning']} ({summary['warning']/total*100:.1f}%)")
            report.append(f"   🔄 重连中: {summary['reconnecting']} ({summary['reconnecting']/total*100:.1f}%)")
            report.append(f"   ❌ 不可达: {summary['unreachable']} ({summary['unreachable']/total*100:.1f}%)")
            report.append(f"   ❓ 其他: {summary['other']} ({summary['other']/total*100:.1f}%)")

        report.append("")

        # 服务详情
        report.append("📋 服务详情:")
        for service in data['services']:
            report.append(f"   🔸 {service['name']}")
            report.append(f"      状态: {service['status']}")
            report.append(f"      类型: {service['type']}")
            report.append(f"      工具数: {service['tool_count']}")

            if service['uptime']:
                uptime_hours = service['uptime'] / 3600
                report.append(f"      运行时间: {uptime_hours:.1f} 小时")

            if service['failure_count'] > 0:
                report.append(f"      失败次数: {service['failure_count']}")

            if service['reconnect_count'] > 0:
                report.append(f"      重连次数: {service['reconnect_count']}")

            report.append("")

        return "\n".join(report)

    def save_report(report_text, filename=None):
        """保存报告"""
        if not filename:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"mcpstore_lifecycle_report_{timestamp}.txt"

        with open(filename, 'w', encoding='utf-8') as f:
            f.write(report_text)

        print(f"📄 报告已保存到: {filename}")
        return filename

    # 生成报告
    data = collect_report_data()
    report_text = format_report(data)

    print(report_text)

    # 询问是否保存
    save_choice = input("\n是否保存报告到文件? (y/n): ")
    if save_choice.lower() == 'y':
        filename = save_report(report_text)
        return filename

    return report_text

# 使用
# report = generate_lifecycle_report()

🔗 相关文档

🎯 下一步