运行时防御与容错
本章核心源码:
run_agent.py(111-158 SafeWriter、7285+ API retry)、hermes_state.py(164-214 jitter retry)、agent/retry_utils.py、gateway/platforms/base.py(1046-1099 send retry)、gateway/run.py(1377-1463 reconnection)
定位:本章拆解 Hermes 的"第 2 层防御"——运行时容错机制。从 SafeWriter 的 broken pipe 保护到 API 重试与 fallback、平台消息投递重试、Gateway 断线重连,理解一个长时间运行的 agent 如何在网络抖动、provider 限流、管道断裂等故障中保持可用。 前置依赖:第 19 章(并发模型)、第 20 章(进程生命周期)。适用场景:需要理解 Hermes 的容错策略,或调试生产环境中的间歇性故障。
为什么需要运行时防御
第 20 章讨论了进程级的生命周期管理——信号处理、优雅关停、资源清理。但进程活着不等于系统健康。一个运行数小时的 agent 会遇到各种运行时故障:
- Broken pipe:systemd 服务的 stdout 管道断裂,
print()抛OSError - API 限流:OpenRouter 返回 429 Too Many Requests
- Provider 不可用:Anthropic API overloaded,所有请求超时
- 网络抖动:VPS 到 Telegram 的连接短暂中断
- SQLite 锁竞争:CLI 和 Gateway 同时写入 state.db
这些故障的共同特点是:它们是暂时的。如果 agent 因为一次 broken pipe 或一次 429 就崩溃,用户需要手动重启——这对一个号称"Run Anywhere"的系统是不可接受的。
SafeWriter:Broken Pipe 保护
问题场景
当 Hermes 作为 systemd service 或 Docker 容器运行时,stdout/stderr 连接到 journald 或 Docker log driver。如果日志系统重启、管道缓冲区满、或 socket 被 reset,任何 print() 调用都会抛出 OSError: [Errno 5] Input/output error。
更糟的情况:如果 except handler 内的 print() 也触发 broken pipe,就会产生双重故障——异常处理本身抛出了异常。
SafeWriter 实现(run_agent.py:111-158)
# run_agent.py:111-157
class _SafeWriter:
"""Transparent stdio wrapper that catches OSError/ValueError from broken pipes."""
__slots__ = ("_inner",)
def __init__(self, inner):
object.__setattr__(self, "_inner", inner)
def write(self, data):
try:
return self._inner.write(data)
except (OSError, ValueError):
return len(data) if isinstance(data, str) else 0
def flush(self):
try:
self._inner.flush()
except (OSError, ValueError):
pass
def isatty(self):
try:
return self._inner.isatty()
except (OSError, ValueError):
return False
def __getattr__(self, name):
return getattr(self._inner, name)
设计要点:
- 透明代理:
__getattr__转发所有未显式定义的属性访问到原始 stream,让 SafeWriter 对现有代码完全透明 - 捕获两种异常:
OSError(管道断裂)和ValueError(stream 被关闭后写入,常见于 ThreadPoolExecutor 线程清理时) - 静默失败:
write()返回数据长度(假装成功),而非返回 0 或 None。这防止调用方因为"写入 0 字节"而进入重试循环 - 安装时机(
run_agent.py:160-165):在AIAgent初始化早期包装sys.stdout和sys.stderr
# run_agent.py:160-165
def _install_safe_stdio() -> None:
for stream_name in ("stdout", "stderr"):
stream = getattr(sys, stream_name, None)
if stream is not None and not isinstance(stream, _SafeWriter):
setattr(sys, stream_name, _SafeWriter(stream))
isinstance 检查防止重复包装(子代理场景下 _install_safe_stdio() 可能被多次调用)。
asyncio 异常处理器(cli.py:8444-8449)
# cli.py:8444-8449
def _suppress_closed_loop_errors(loop, context):
# Suppress "Event loop is closed" RuntimeError from httpx transport cleanup
这是 SafeWriter 的补充——httpx 的 AsyncClient.__del__ 在 event loop 关闭后尝试关闭 transport,触发 RuntimeError。自定义的 asyncio exception handler 静默这些错误。
API 重试与 Fallback
重试循环(run_agent.py:7284-7339)
# run_agent.py:7284-7298
retry_count = 0
max_retries = 3
primary_recovery_attempted = False
max_compression_attempts = 3
codex_auth_retry_attempted = False
anthropic_auth_retry_attempted = False
nous_auth_retry_attempted = False
thinking_sig_retry_attempted = False
has_retried_429 = False
restart_with_compressed_messages = False
重试循环维护了多个一次性标志(*_attempted 变量),确保每种特殊恢复策略只尝试一次:
| 标志 | 触发条件 | 恢复策略 |
|---|---|---|
codex_auth_retry_attempted | Codex token 过期 | 刷新 OAuth token 后重试 |
anthropic_auth_retry_attempted | Anthropic token 过期 | 刷新 token 后重试 |
has_retried_429 | 429 Too Many Requests | 退避后重试,可能切 fallback |
restart_with_compressed_messages | 上下文超限 | 压缩消息后重试 |
thinking_sig_retry_attempted | 思考签名错误 | 调整参数后重试 |
Jittered Backoff(agent/retry_utils.py:1-57)
# agent/retry_utils.py:19-57
def jittered_backoff(
attempt: int,
*,
base_delay: float = 5.0,
max_delay: float = 120.0,
jitter_ratio: float = 0.5,
) -> float:
exponent = max(0, attempt - 1)
delay = min(base_delay * (2 ** exponent), max_delay)
# Seed from time + counter for decorrelation
seed = (time.time_ns() ^ (tick * 0x9E3779B9)) & 0xFFFFFFFF
rng = random.Random(seed)
jitter = rng.uniform(0, jitter_ratio * delay)
return delay + jitter
这个 jitter 实现有几个精妙之处:
- Decorrelated jitter:使用
time.time_ns() ^ (tick * 0x9E3779B9)作为 seed,确保即使在粗粒度时钟(某些虚拟机的时钟精度只到毫秒)上,并发的重试者也会得到不同的 jitter 值。0x9E3779B9是 golden ratio 的 32 位近似,提供良好的位散列特性 - Thread-safe counter:
_jitter_counter受_jitter_lock保护,确保每次调用得到唯一的 seed - Additive jitter:delay 是
base + jitter而非base * random——这确保最小延迟不为零
Fallback Provider 链
当主 provider 的所有重试耗尽后,编排器切换到 fallback chain(第 4 章已讨论配置结构)。Fallback 切换发生在重试循环的最外层:
# config.yaml 示例
fallback_providers:
- provider: anthropic
model: claude-sonnet-4-20250514
- provider: openai
model: gpt-4o
切换后的状态会在下一轮 run_conversation() 调用时通过 _restore_primary_runtime() 恢复——fallback 是临时的,不会永久改变用户的配置。
平台消息投递重试(base.py:1046-1099)
Gateway 向消息平台发送响应时也需要容错:
# gateway/platforms/base.py:1046-1105(简化)
async def send_with_retry(self, chat_id, content, *,
max_retries=2, base_delay=2.0) -> SendResult:
result = await self.send(chat_id=chat_id, content=content, ...)
if result.success:
return result
error_str = result.error or ""
is_network = result.retryable or self._is_retryable_error(error_str)
# Timeout errors are not safe to retry (message may have been delivered)
if not is_network and self._is_timeout_error(error_str):
return result
if is_network:
for attempt in range(1, max_retries + 1):
delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 1)
await asyncio.sleep(delay)
result = await self.send(...)
if result.success:
return result
if not (result.retryable or self._is_retryable_error(result.error)):
break # 错误类型变了,不再重试
关键设计:
- 超时不重试:timeout 错误可能意味着消息已经发送但确认丢失,重试会导致重复发送
- 错误类型变化检测:如果重试过程中错误从"网络错误"变为"格式错误",立即停止重试——这不是暂时性故障
- 投递失败通知:所有重试耗尽后,向用户发送一条轻量级的故障通知(
base.py:1100-1105),让用户知道请求已处理但响应未能投递
# gateway/platforms/base.py:1100-1105
notice = (
"\u26a0\ufe0f Message delivery failed after multiple attempts. "
"Please try again — your request was processed but the response could not be sent."
)
await self.send(chat_id=chat_id, content=notice, ...)
Gateway 断线重连(gateway/run.py:1377-1463)
当某个平台适配器在运行时断开(网络故障、token 过期),Gateway 不会整体停止——它将断开的平台放入 _failed_platforms 队列,由 reconnection loop 在后台重试。
# gateway/run.py:1377-1378
_MAX_ATTEMPTS = 10
_BACKOFF_CAP = 300 # 5 minutes max between retries
重连逻辑(gateway/run.py:1380-1463)
# gateway/run.py:1389-1463(简化)
for platform in list(self._failed_platforms.keys()):
info = self._failed_platforms[platform]
if now < info["next_retry"]:
continue # 还没到重试时间
if info["attempts"] >= _MAX_ATTEMPTS:
# 10 次失败后放弃
del self._failed_platforms[platform]
continue
adapter = self._create_adapter(platform, platform_config)
success = await adapter.connect()
if success:
self.adapters[platform] = adapter
del self._failed_platforms[platform]
else:
if adapter.has_fatal_error and not adapter.fatal_error_retryable:
# 不可恢复的错误(如 token 被撤销),停止重试
del self._failed_platforms[platform]
else:
backoff = min(30 * (2 ** (attempt - 1)), _BACKOFF_CAP)
info["next_retry"] = time.monotonic() + backoff
graph LR
FAIL["平台断线"] --> QUEUE["加入 _failed_platforms<br/>backoff=30s"]
QUEUE --> RETRY{"重连成功?"}
RETRY -->|"成功"| RESTORE["恢复到 self.adapters"]
RETRY -->|"失败 + 可重试"| BACK["指数退避<br/>30s → 60s → ... → 300s"]
RETRY -->|"失败 + 不可重试"| GIVE_UP["永久移除"]
RETRY -->|"尝试 >= 10 次"| GIVE_UP
BACK --> QUEUE
style RESTORE fill:#9f9,stroke:#333
style GIVE_UP fill:#f99,stroke:#333
设计要点:
- 指数退避上限 300s:
min(30 * (2^(n-1)), 300)——最大每 5 分钟重试一次,不浪费资源 - 区分可恢复和不可恢复错误:token 被撤销是不可恢复的,继续重试没有意义
- 重连后重建:成功重连后重建 channel directory(
build_channel_directory()),确保消息路由正确 - 独立于其他平台:一个平台的断线不影响其他平台的正常运行
Graceful Degradation 三原则
从以上四个子系统中可以提炼出 Hermes 运行时防御的三个核心原则:
原则 1:静默降级优于崩溃退出
SafeWriter 是这个原则的极致体现——print() 失败不应该杀死 agent。类似地,单个平台的断线不终止 Gateway、单个工具的超时不终止会话、单次 API 429 不终止对话。
原则 2:重试必须有界
每个重试机制都有明确的上限:
| 子系统 | 最大重试次数 | 最大延迟 |
|---|---|---|
| API 调用 | 3 次 | max_delay=120s |
| SQLite 写入 | 15 次 | 150ms per attempt |
| 消息投递 | 2 次 | ~6s total |
| 平台重连 | 10 次 | 300s between |
无界重试会让系统在不可恢复的故障中无限等待,浪费资源并延迟用户感知到问题。
原则 3:恢复策略不重复尝试
API 重试中的 *_attempted 标志确保每种特殊恢复策略只执行一次。如果刷新 Codex OAuth token 后重试仍然失败,不会再次刷新 token——这意味着问题不在 token 过期,需要其他恢复路径(如 fallback provider)。
KeyboardInterrupt 传播安全
一个容易被忽略的防御点:KeyboardInterrupt 可能在任何时刻发生。Hermes 通过以下方式确保中断安全:
- SQLite 事务保护:
_execute_write()的except BaseException包含 rollback(hermes_state.py:187-190),即使 KeyboardInterrupt 打断了事务,数据库也不会处于不一致状态 - atexit 注册:Browser 和 Terminal 的清理通过
atexit.register()注册,即使 KeyboardInterrupt 导致异常传播到顶层,进程退出时仍会执行清理 - Daemon threads:清理线程标记为
daemon=True,当主线程退出时自动终止,不会阻止进程退出
设计启示
- 防御层次而非防御深度:Hermes 的容错不是"一个超级健壮的重试机制",而是在不同层次设置不同的防御——IO 层(SafeWriter)、API 层(jittered retry + fallback)、平台层(send_with_retry)、基础设施层(reconnection loop)。每一层只处理自己能处理的故障
- Jitter 是分布式系统的基本工具:从 SQLite 的 jitter retry 到 API 的 jittered backoff 到 reconnection 的指数退避,Hermes 在所有有竞争的重试场景中都使用了 jitter。确定性退避在并发环境中会形成 convoy effect
- 一次性恢复标志:API 重试中的
*_attempted模式是一种简洁的状态机设计——每种恢复策略只尝试一次,失败后让出路径给其他策略。这避免了重复执行无效的恢复操作
设计哲学:先分类,后恢复
Hermes 有一个显式的错误分类层(agent/error_classifier.py),将诊断与恢复分离。每个 API 错误首先被分类为一个 FailoverReason(billing、rate_limit、overloaded、auth_expired、context_too_long 等),然后从分类结果推导恢复动作:是否可重试?是否需要压缩上下文?是否需要轮换凭据?是否需要 fallback?这种分离意味着添加新的错误类型只需要更新分类逻辑,不需要改动恢复逻辑——而恢复策略可以独立于具体的错误消息进行测试。
设计赌注回扣:本章直接服务于 Run Anywhere 赌注——SafeWriter 让 Hermes 在 systemd/Docker 环境中不会因为管道断裂崩溃;Gateway 的断线重连让 VPS 上的 agent 能自动恢复网络中断;多层重试机制让 agent 在网络不稳定的环境中保持可用。这些都是"从 $5 VPS 到 GPU 集群"场景的必要条件。
版本演化说明
本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。 本章讨论的 safe stdio、消息投递重试、断线重连和随机退避不是一次性落地的,而是在 v0.5.0-v0.8.0 之间持续加固。其中
send_with_retry已在 v0.5.0 发布窗口出现,jittered_backoff()则是 v0.8.0 发布窗口内的新加强。