错误处理机制¶
📋 概述¶
MCPStore 提供了完善的错误处理机制,包括异常分类、错误恢复、重试策略和错误日志记录。通过统一的错误处理框架,确保系统的稳定性和可靠性。
🏗️ 错误处理架构¶
graph TB
A[错误发生] --> B[错误捕获]
B --> C[错误分类]
C --> D[错误处理策略]
D --> E[立即重试]
D --> F[延迟重试]
D --> G[降级处理]
D --> H[错误上报]
E --> I[成功恢复]
F --> I
G --> J[部分功能]
H --> K[告警通知]
I --> L[继续执行]
J --> L
K --> M[人工介入]
🔧 异常类型体系¶
核心异常类¶
class MCPStoreError(Exception):
"""MCPStore 基础异常类"""
def __init__(self, message, error_code=None, details=None):
super().__init__(message)
self.message = message
self.error_code = error_code
self.details = details or {}
self.timestamp = time.time()
class ServiceError(MCPStoreError):
"""服务相关异常"""
pass
class ServiceNotFoundError(ServiceError):
"""服务不存在异常"""
def __init__(self, service_name):
super().__init__(
f"Service '{service_name}' not found",
error_code="SERVICE_NOT_FOUND",
details={"service_name": service_name}
)
class ServiceStartError(ServiceError):
"""服务启动异常"""
def __init__(self, service_name, reason):
super().__init__(
f"Failed to start service '{service_name}': {reason}",
error_code="SERVICE_START_FAILED",
details={"service_name": service_name, "reason": reason}
)
class ServiceStopError(ServiceError):
"""服务停止异常"""
pass
class ServiceTimeoutError(ServiceError):
"""服务超时异常"""
pass
class ToolError(MCPStoreError):
"""工具相关异常"""
pass
class ToolNotFoundError(ToolError):
"""工具不存在异常"""
def __init__(self, tool_name, service_name=None):
message = f"Tool '{tool_name}' not found"
if service_name:
message += f" in service '{service_name}'"
super().__init__(
message,
error_code="TOOL_NOT_FOUND",
details={"tool_name": tool_name, "service_name": service_name}
)
class ToolExecutionError(ToolError):
"""工具执行异常"""
def __init__(self, tool_name, reason, output=None):
super().__init__(
f"Tool '{tool_name}' execution failed: {reason}",
error_code="TOOL_EXECUTION_FAILED",
details={
"tool_name": tool_name,
"reason": reason,
"output": output
}
)
class ConfigurationError(MCPStoreError):
"""配置异常"""
pass
class ConnectionError(MCPStoreError):
"""连接异常"""
pass
异常使用示例¶
from mcpstore.exceptions import *
def safe_service_operation(store, service_name, operation):
"""安全的服务操作"""
try:
if operation == "start":
return store.start_service(service_name)
elif operation == "stop":
return store.stop_service(service_name)
elif operation == "restart":
return store.restart_service(service_name)
else:
raise ValueError(f"Unknown operation: {operation}")
except ServiceNotFoundError as e:
print(f"❌ 服务不存在: {e.details['service_name']}")
return False
except ServiceStartError as e:
print(f"❌ 服务启动失败: {e.details['reason']}")
return False
except ServiceTimeoutError as e:
print(f"⏰ 服务操作超时: {e.message}")
return False
except ServiceError as e:
print(f"💥 服务操作失败: {e.message}")
return False
except Exception as e:
print(f"🔥 未知错误: {e}")
return False
# 使用示例
success = safe_service_operation(store, "filesystem", "start")
🔄 重试机制¶
基础重试装饰器¶
import time
import random
from functools import wraps
def retry(max_attempts=3, delay=1.0, backoff=2.0, jitter=True, exceptions=(Exception,)):
"""重试装饰器
Args:
max_attempts: 最大重试次数
delay: 初始延迟时间(秒)
backoff: 退避倍数
jitter: 是否添加随机抖动
exceptions: 需要重试的异常类型
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_attempts - 1:
# 最后一次尝试,抛出异常
raise e
# 计算延迟时间
wait_time = delay * (backoff ** attempt)
if jitter:
# 添加随机抖动(±25%)
jitter_range = wait_time * 0.25
wait_time += random.uniform(-jitter_range, jitter_range)
print(f"🔄 第 {attempt + 1} 次尝试失败,{wait_time:.1f}s 后重试: {e}")
time.sleep(wait_time)
except Exception as e:
# 不在重试范围内的异常,直接抛出
raise e
# 理论上不会到达这里
raise last_exception
return wrapper
return decorator
# 使用重试装饰器
@retry(max_attempts=3, delay=1.0, exceptions=(ServiceStartError, ServiceTimeoutError))
def start_service_with_retry(store, service_name):
"""带重试的服务启动"""
return store.start_service(service_name)
# 使用示例
try:
success = start_service_with_retry(store, "filesystem")
print(f"✅ 服务启动成功: {success}")
except Exception as e:
print(f"❌ 服务启动最终失败: {e}")
高级重试策略¶
from enum import Enum
from typing import Callable, Optional
class RetryStrategy(Enum):
FIXED = "fixed" # 固定间隔
LINEAR = "linear" # 线性增长
EXPONENTIAL = "exponential" # 指数退避
FIBONACCI = "fibonacci" # 斐波那契数列
class RetryConfig:
"""重试配置"""
def __init__(
self,
max_attempts: int = 3,
strategy: RetryStrategy = RetryStrategy.EXPONENTIAL,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
exceptions: tuple = (Exception,),
should_retry: Optional[Callable] = None
):
self.max_attempts = max_attempts
self.strategy = strategy
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
self.exceptions = exceptions
self.should_retry = should_retry
class RetryManager:
"""重试管理器"""
def __init__(self, config: RetryConfig):
self.config = config
def execute(self, func, *args, **kwargs):
"""执行带重试的函数"""
last_exception = None
for attempt in range(self.config.max_attempts):
try:
result = func(*args, **kwargs)
# 检查是否需要重试(即使没有异常)
if self.config.should_retry and self.config.should_retry(result):
if attempt < self.config.max_attempts - 1:
delay = self._calculate_delay(attempt)
print(f"🔄 结果不满足条件,{delay:.1f}s 后重试")
time.sleep(delay)
continue
return result
except self.config.exceptions as e:
last_exception = e
if attempt == self.config.max_attempts - 1:
raise e
delay = self._calculate_delay(attempt)
print(f"🔄 第 {attempt + 1} 次尝试失败,{delay:.1f}s 后重试: {e}")
time.sleep(delay)
except Exception as e:
# 不在重试范围内的异常
raise e
raise last_exception
def _calculate_delay(self, attempt: int) -> float:
"""计算延迟时间"""
if self.config.strategy == RetryStrategy.FIXED:
delay = self.config.base_delay
elif self.config.strategy == RetryStrategy.LINEAR:
delay = self.config.base_delay * (attempt + 1)
elif self.config.strategy == RetryStrategy.EXPONENTIAL:
delay = self.config.base_delay * (2 ** attempt)
elif self.config.strategy == RetryStrategy.FIBONACCI:
fib_sequence = [1, 1]
for i in range(2, attempt + 2):
fib_sequence.append(fib_sequence[i-1] + fib_sequence[i-2])
delay = self.config.base_delay * fib_sequence[attempt]
else:
delay = self.config.base_delay
# 限制最大延迟
delay = min(delay, self.config.max_delay)
# 添加随机抖动
if self.config.jitter:
jitter_range = delay * 0.1
delay += random.uniform(-jitter_range, jitter_range)
return max(0, delay)
# 使用高级重试
def check_service_health(store, service_name):
"""检查服务健康状态"""
status = store.get_service_status(service_name)
return status == "running"
# 配置重试策略
retry_config = RetryConfig(
max_attempts=5,
strategy=RetryStrategy.EXPONENTIAL,
base_delay=1.0,
max_delay=30.0,
exceptions=(ServiceError, ConnectionError),
should_retry=lambda result: not result # 结果为 False 时重试
)
retry_manager = RetryManager(retry_config)
# 执行带重试的健康检查
try:
is_healthy = retry_manager.execute(check_service_health, store, "filesystem")
print(f"✅ 服务健康状态: {is_healthy}")
except Exception as e:
print(f"❌ 健康检查失败: {e}")
🛡️ 降级处理¶
服务降级策略¶
class FallbackStrategy:
"""降级策略基类"""
def execute(self, original_func, *args, **kwargs):
"""执行降级逻辑"""
raise NotImplementedError
class CacheFallback(FallbackStrategy):
"""缓存降级策略"""
def __init__(self, cache_duration=300):
self.cache = {}
self.cache_duration = cache_duration
def execute(self, original_func, *args, **kwargs):
"""使用缓存数据"""
cache_key = self._generate_cache_key(original_func.__name__, args, kwargs)
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_duration:
print(f"📦 使用缓存数据: {original_func.__name__}")
return cached_data
# 缓存过期或不存在
raise Exception("No valid cache available")
def _generate_cache_key(self, func_name, args, kwargs):
"""生成缓存键"""
return f"{func_name}:{hash(str(args) + str(kwargs))}"
class DefaultValueFallback(FallbackStrategy):
"""默认值降级策略"""
def __init__(self, default_value):
self.default_value = default_value
def execute(self, original_func, *args, **kwargs):
"""返回默认值"""
print(f"🔄 使用默认值: {self.default_value}")
return self.default_value
class AlternativeServiceFallback(FallbackStrategy):
"""备用服务降级策略"""
def __init__(self, alternative_service):
self.alternative_service = alternative_service
def execute(self, original_func, *args, **kwargs):
"""使用备用服务"""
print(f"🔄 切换到备用服务: {self.alternative_service}")
# 这里实现切换到备用服务的逻辑
return None
class FallbackManager:
"""降级管理器"""
def __init__(self):
self.strategies = []
def add_strategy(self, strategy: FallbackStrategy):
"""添加降级策略"""
self.strategies.append(strategy)
def execute_with_fallback(self, func, *args, **kwargs):
"""执行带降级的函数"""
# 首先尝试正常执行
try:
result = func(*args, **kwargs)
# 如果成功,更新缓存
for strategy in self.strategies:
if isinstance(strategy, CacheFallback):
cache_key = strategy._generate_cache_key(func.__name__, args, kwargs)
strategy.cache[cache_key] = (result, time.time())
return result
except Exception as original_error:
print(f"⚠️ 原始调用失败: {original_error}")
# 尝试降级策略
for i, strategy in enumerate(self.strategies):
try:
result = strategy.execute(func, *args, **kwargs)
print(f"✅ 降级策略 {i+1} 成功")
return result
except Exception as fallback_error:
print(f"❌ 降级策略 {i+1} 失败: {fallback_error}")
continue
# 所有降级策略都失败
raise original_error
# 使用降级处理
def get_service_tools(store, service_name):
"""获取服务工具列表"""
return store.list_tools(service_name=service_name)
# 配置降级策略
fallback_manager = FallbackManager()
fallback_manager.add_strategy(CacheFallback(cache_duration=600)) # 10分钟缓存
fallback_manager.add_strategy(DefaultValueFallback([])) # 空列表作为默认值
# 执行带降级的操作
try:
tools = fallback_manager.execute_with_fallback(get_service_tools, store, "filesystem")
print(f"🛠️ 获取到工具: {len(tools)} 个")
except Exception as e:
print(f"❌ 所有策略都失败: {e}")
📊 错误监控和报告¶
错误收集器¶
import json
from collections import defaultdict, deque
from datetime import datetime
class ErrorCollector:
"""错误收集器"""
def __init__(self, max_errors=1000):
self.max_errors = max_errors
self.errors = deque(maxlen=max_errors)
self.error_stats = defaultdict(int)
self.error_trends = defaultdict(lambda: deque(maxlen=100))
def collect_error(self, error, context=None):
"""收集错误信息"""
error_info = {
'timestamp': time.time(),
'datetime': datetime.now().isoformat(),
'error_type': type(error).__name__,
'error_message': str(error),
'error_code': getattr(error, 'error_code', None),
'details': getattr(error, 'details', {}),
'context': context or {}
}
self.errors.append(error_info)
self.error_stats[error_info['error_type']] += 1
self.error_trends[error_info['error_type']].append(error_info['timestamp'])
# 触发错误处理
self._handle_error(error_info)
def _handle_error(self, error_info):
"""处理错误"""
# 记录日志
print(f"🔥 错误收集: {error_info['error_type']} - {error_info['error_message']}")
# 检查错误频率
error_type = error_info['error_type']
recent_errors = [
ts for ts in self.error_trends[error_type]
if time.time() - ts < 300 # 最近5分钟
]
if len(recent_errors) > 10: # 5分钟内超过10次同类错误
print(f"🚨 错误频率过高: {error_type} ({len(recent_errors)} 次/5分钟)")
def get_error_summary(self, hours=24):
"""获取错误摘要"""
cutoff_time = time.time() - hours * 3600
recent_errors = [e for e in self.errors if e['timestamp'] > cutoff_time]
summary = {
'total_errors': len(recent_errors),
'error_types': defaultdict(int),
'error_codes': defaultdict(int),
'hourly_distribution': defaultdict(int)
}
for error in recent_errors:
summary['error_types'][error['error_type']] += 1
if error['error_code']:
summary['error_codes'][error['error_code']] += 1
hour = int((error['timestamp'] % 86400) // 3600)
summary['hourly_distribution'][hour] += 1
return summary
def export_errors(self, filename=None):
"""导出错误数据"""
if not filename:
filename = f"errors_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
export_data = {
'export_time': datetime.now().isoformat(),
'total_errors': len(self.errors),
'errors': list(self.errors),
'statistics': dict(self.error_stats)
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False)
print(f"📁 错误数据已导出到: {filename}")
return filename
# 全局错误收集器
error_collector = ErrorCollector()
# 错误处理装饰器
def collect_errors(context=None):
"""错误收集装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
error_collector.collect_error(e, context)
raise e
return wrapper
return decorator
# 使用错误收集
@collect_errors(context={"operation": "service_management"})
def managed_start_service(store, service_name):
"""带错误收集的服务启动"""
return store.start_service(service_name)
# 使用示例
try:
success = managed_start_service(store, "nonexistent_service")
except Exception as e:
print(f"操作失败,错误已记录: {e}")
# 查看错误摘要
summary = error_collector.get_error_summary()
print(f"📊 错误摘要: {summary}")
🔗 相关文档¶
📚 最佳实践¶
- 异常分类:使用明确的异常类型,便于错误处理
- 重试策略:根据错误类型选择合适的重试策略
- 降级处理:为关键功能提供降级方案
- 错误监控:建立完善的错误收集和分析机制
- 日志记录:详细记录错误上下文信息
- 用户友好:提供清晰的错误信息和解决建议
更新时间: 2025-01-09
版本: 1.0.0