并发模型:为什么不全用 async
本章核心源码:
run_agent.py(_MAX_TOOL_WORKERS、6145-6153 并发工具执行)、model_tools.py:81(_run_async())、hermes_state.py(164-214 jitter retry)、gateway/session_context.py(contextvars 会话隔离)
定位:本章解释 Hermes 为什么选择同步编排器 + 异步桥接的混合并发模型,而不是全 async——从
_run_async()的三种路径到工具并发的 ThreadPoolExecutor 到 SQLite WAL 的锁竞争解法。 前置依赖:第 4 章(AIAgent 同步编排器)、第 6 章(工具系统)。适用场景:想理解 Hermes 为什么不全 async,或需要调试并发相关的问题。
为什么 AIAgent 是同步的
这个问题在第 4 章被简单提及,本章展开讨论。
一个 LLM agent 的核心循环是:构建消息 → 调用模型 → 解析工具调用 → 执行工具 → 循环。这个循环是天然串行的——你必须等模型返回才能知道要执行哪些工具,必须等工具执行完才能把结果喂给模型。async/await 在这里没有并发收益,只会增加复杂度。
但 Hermes 的入口层有两种截然不同的运行环境:
- CLI 模式:Python 主线程,没有 event loop,直接调用
agent.run_conversation() - Gateway 模式:asyncio event loop 驱动,平台消息通过
await接收
如果 AIAgent 是 async 的,CLI 模式需要 asyncio.run() 包装——看似简单,但引入了一个关键问题:async 上下文中不能调用同步阻塞函数(如 input()、subprocess.run()),否则会阻塞 event loop。Hermes 的工具系统大量使用同步阻塞 API(subprocess、文件 IO、SQLite),把它们全部改成 async 的工作量和出错概率都很高。
Hermes 的解法是:编排层同步,异步工具通过桥接函数在同步上下文中运行。
graph TB
subgraph "CLI 入口"
CLI["主线程<br/>无 event loop"]
end
subgraph "Gateway 入口"
GW["asyncio event loop"]
GW_THREAD["agent 运行在<br/>独立线程"]
end
subgraph "AIAgent (同步编排器)"
AGENT["run_conversation()<br/>同步大循环"]
TOOLS["工具执行<br/>ThreadPoolExecutor"]
ASYNC_BRIDGE["_run_async()<br/>model_tools.py:81"]
end
subgraph "异步工具"
BROWSER["browser_tool<br/>(playwright)"]
MCP["mcp_tool<br/>(async transport)"]
end
CLI --> AGENT
GW --> GW_THREAD --> AGENT
AGENT --> TOOLS
TOOLS --> ASYNC_BRIDGE
ASYNC_BRIDGE --> BROWSER
ASYNC_BRIDGE --> MCP
style AGENT fill:#f96,stroke:#333,stroke-width:2px
style ASYNC_BRIDGE fill:#ff9,stroke:#333,stroke-width:2px
_run_async():同步-异步桥接的唯一入口
_run_async()(model_tools.py:81)是 Hermes 中所有 sync → async 转换的单一入口点。它需要处理三种运行上下文:
# model_tools.py:81-115(简化)
def _run_async(coro):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
# 路径 A:已有 event loop(Gateway / RL 环境)
# → 在新线程中运行,避免嵌套 loop 冲突
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
return pool.submit(asyncio.run, coro).result()
else:
# 路径 B/C:无 event loop(CLI 主线程 / worker 线程)
# → 使用 per-thread 持久 loop
loop = _get_or_create_persistent_loop()
return loop.run_until_complete(coro)
三种路径的必要性
路径 A(Gateway 内部调用):Gateway 运行在 asyncio event loop 中,agent 在独立线程执行。如果 agent 的工具需要执行 async 代码,不能在当前线程的 loop 上 run_until_complete()——因为那个 loop 正在运行。解法是再开一个线程让 asyncio.run() 创建自己的 loop。
路径 B(CLI 主线程):CLI 没有 running loop。这里使用持久 event loop(_get_or_create_persistent_loop(),model_tools.py:66-78)而非 asyncio.run()。原因是 asyncio.run() 创建-销毁 loop 的生命周期会导致 httpx/AsyncOpenAI 等异步客户端在 GC 时触发 "Event loop is closed" 错误。
# model_tools.py:66-78
_worker_thread_local = threading.local()
def _get_or_create_persistent_loop() -> asyncio.AbstractEventLoop:
loop = getattr(_worker_thread_local, 'loop', None)
if loop is None or loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_worker_thread_local.loop = loop
return loop
路径 C(Worker 线程):并发工具执行在 ThreadPoolExecutor 的 worker 线程中。每个 worker 线程需要自己的持久 loop(通过 threading.local() 实现),避免与主线程的 loop 竞争,也避免 asyncio.run() 的 create-and-destroy 生命周期问题。
工具并发执行
ThreadPoolExecutor 策略(run_agent.py:6144-6159)
当模型在一次响应中返回多个工具调用(parallel tool calling),Hermes 使用 ThreadPoolExecutor 并发执行:
# run_agent.py:235
_MAX_TOOL_WORKERS = 8
# run_agent.py:6144-6153
max_workers = min(num_tools, _MAX_TOOL_WORKERS)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, (tc, name, args) in enumerate(parsed_calls):
f = executor.submit(_run_tool, i, tc, name, args)
futures.append(f)
concurrent.futures.wait(futures)
_MAX_TOOL_WORKERS = 8 是一个经过调优的上限:
- 太小(如 2):多个
terminal.execute()调用无法并行,延迟累加 - 太大(如 32):每个线程持有自己的 event loop 和可能的 subprocess,内存和文件描述符开销过高
- 8 是在"典型模型输出 2-5 个并行工具调用"场景下的平衡点
为什么不用 asyncio.gather()
虽然 async 世界有 asyncio.gather() 做并发,但 Hermes 的大多数工具是同步阻塞的——subprocess.run() 执行 shell 命令、sqlite3 执行查询、os.path.exists() 检查文件。在 async context 中运行这些同步代码需要 loop.run_in_executor(),本质上还是线程池。直接用 ThreadPoolExecutor 更简单、更透明。
SQLite WAL 并发
问题场景
当 Gateway 同时处理多个平台的消息时,多个 agent 线程可能并发写入同一个 state.db。SQLite 的 WAL(Write-Ahead Logging)模式允许并发读取,但写入仍然是串行的。
Jitter Retry(hermes_state.py:115-214)
SessionDB 使用 application-level jitter retry 解决锁竞争:
# hermes_state.py:123-136
class SessionDB:
_WRITE_MAX_RETRIES = 15
_WRITE_RETRY_MIN_S = 0.020 # 20ms
_WRITE_RETRY_MAX_S = 0.150 # 150ms
_CHECKPOINT_EVERY_N_WRITES = 50
# hermes_state.py:164-214(简化)
def _execute_write(self, fn):
for attempt in range(self._WRITE_MAX_RETRIES):
try:
with self._lock:
self._conn.execute("BEGIN IMMEDIATE")
result = fn(self._conn)
self._conn.commit()
# 成功后定期 WAL checkpoint
self._write_count += 1
if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0:
self._try_wal_checkpoint()
return result
except sqlite3.OperationalError as exc:
if "locked" in str(exc).lower() or "busy" in str(exc).lower():
if attempt < self._WRITE_MAX_RETRIES - 1:
jitter = random.uniform(
self._WRITE_RETRY_MIN_S, # 20ms
self._WRITE_RETRY_MAX_S, # 150ms
)
time.sleep(jitter)
continue
raise
关键设计决策:
- BEGIN IMMEDIATE(
hermes_state.py:183):在事务开始时就获取写锁,而非等到 COMMIT。这让锁竞争在事务开始时就暴露,而不是在 COMMIT 时——后者会导致已完成的工作被浪费 - 随机 jitter 而非固定退避:SQLite 内置的 busy handler 使用确定性的退避调度,会导致 convoy effect——多个 writer 以相同的节奏 sleep 然后同时醒来重试。随机 jitter(20-150ms)自然地打散了竞争者的重试时机
- 短超时 + 多重试:
timeout=1.0(hermes_state.py:150)让 SQLite 快速失败,然后由 application-level 的 15 次 jitter retry 接管。这比让 SQLite 在内置 busy handler 中等待 30 秒更好——因为我们能更精细地控制退避策略
WAL Checkpoint(hermes_state.py:216-229)
# hermes_state.py:216-229
def _try_wal_checkpoint(self) -> None:
"""Best-effort PASSIVE WAL checkpoint. Never blocks, never raises."""
with self._lock:
result = self._conn.execute("PRAGMA wal_checkpoint(PASSIVE)").fetchone()
每 50 次成功写入后执行 PASSIVE checkpoint,将 WAL 中已提交的 frames 刷回主数据库文件。PASSIVE 模式不阻塞其他连接的读取——它只刷那些当前没有读者需要的 frames。这防止 WAL 文件在多进程长期运行时无限增长。
Memory 后台预取
记忆系统的初始化(MemoryProvider 的 retrieve() 调用)可能涉及网络请求(如 Honcho API、向量数据库查询),耗时从几十毫秒到几秒不等。Hermes 在 AIAgent.__init__() 中将记忆预取放在后台线程:
# run_agent.py:1028-1054(第 4 章已讨论的记忆初始化)
# 外部 MemoryProvider 的 initialize_all() 在后台预取
self._memory_manager.initialize_all(
session_id=self.session_id,
platform=platform or "cli",
user_id=self._user_id,
)
预取结果在首次构建 system prompt 时合并——如果预取尚未完成,prompt 构建会等待(带超时)。这让初始化和首次 API 调用的延迟尽可能重叠。
contextvars 替代 os.environ:Gateway 会话状态的并发修复
Gateway 并发处理多条平台消息时,每条消息需要携带自己的会话上下文(platform、chat_id、thread_id)。早期实现使用 os.environ 存储这些值:
# 旧代码(已移除)
os.environ["HERMES_SESSION_THREAD_ID"] = str(context.source.thread_id)
这在并发场景下是不安全的:os.environ 是进程全局的。当 Message A 和 Message B 同时到达时,Message B 的 os.environ 写入会覆盖 Message A 的值——导致 Message A 的 agent 使用了错误的 thread_id,工具调用和通知发送到错误的聊天线程。
gateway/session_context.py 用 contextvars.ContextVar 替换了 os.environ:
# gateway/session_context.py:39-48
from contextvars import ContextVar
_SESSION_PLATFORM: ContextVar[str] = ContextVar("HERMES_SESSION_PLATFORM", default="")
_SESSION_CHAT_ID: ContextVar[str] = ContextVar("HERMES_SESSION_CHAT_ID", default="")
_SESSION_CHAT_NAME: ContextVar[str] = ContextVar("HERMES_SESSION_CHAT_NAME", default="")
_SESSION_THREAD_ID: ContextVar[str] = ContextVar("HERMES_SESSION_THREAD_ID", default="")
ContextVar 是任务局部的:每个 asyncio task(以及它通过 run_in_executor 派生的线程)获得自己的值副本,并发消息之间互不干扰。
设计细节:
- Token 机制保证清理:
set_session_vars()返回 reset tokens,handler 在finally块中调用clear_session_vars(tokens)恢复旧值,防止泄漏 - 向后兼容:
get_session_env()先查 ContextVar,若为空则回退到os.environ——CLI 和 cron 等非并发入口仍然通过环境变量传递会话状态,无需改动 - 最小侵入性:工具代码只需将
os.getenv("HERMES_SESSION_*")替换为get_session_env("HERMES_SESSION_*"),接口签名完全一致
这是一个典型的"正确的并发原语选择"修复:问题不在于缺少锁,而在于使用了错误作用域(进程全局 vs 任务局部)的状态存储。
并发安全的设计模式
IterationBudget 线程安全(run_agent.py:168-209)
第 4 章讨论了 IterationBudget 的功能,这里关注它的并发安全设计:
# run_agent.py:168-209
class IterationBudget:
def __init__(self, max_total: int):
self._used = 0
self._lock = threading.Lock()
def consume(self) -> bool:
with self._lock:
if self._used >= self.max_total:
return False
self._used += 1
return True
虽然 _used 的读写在 CPython 的 GIL 下是原子的,显式锁仍然必要:consume() 需要读取-比较-写入三步操作的原子性——GIL 不保证这个组合是原子的。
中断传播的并发安全(run_agent.py:609-617)
# run_agent.py:609-611, 615-617
self._interrupt_requested = False
self._client_lock = threading.RLock()
self._active_children_lock = threading.Lock()
_client_lock 使用 RLock(可重入锁)而非普通 Lock,因为中断处理可能在持有锁的情况下被递归调用(顶层 agent 中断 → 遍历子代理设置中断 → 子代理的 handler 也需要获取同一个锁)。
设计启示
Hermes 的并发模型展示了一个实用主义的工程选择:
- 同步编排是正确的默认选择:agent 的核心循环是天然串行的,async 在这里增加复杂度而不增加吞吐。只在真正需要的地方(async SDK、并发工具执行)引入并发
- 桥接函数是必要的隔离层:
_run_async()的三种路径处理了三种不同的运行上下文,让每个异步工具不需要知道自己运行在 CLI 还是 Gateway 还是 worker 线程中 - Application-level 锁优于数据库级锁:SQLite 的内置 busy handler 是通用解法,但 Hermes 的 jitter retry 是针对 agent 工作负载特性(短事务、突发写入、多进程)的定制解法,效果更好
设计赌注回扣:本章回扣 Run Anywhere 赌注——同步编排器让 Hermes 在没有 event loop 的 CLI 和有 event loop 的 Gateway 中用同一套代码运行;
_run_async()桥接让同步工具和异步工具在同一个编排循环中共存;jitter retry 让多个 Hermes 进程(CLI + Gateway + worktree agents)共享同一个 SQLite 数据库而不冲突。
版本演化说明
本章核心分析基于 Hermes Agent v0.8.x(2026 年 4 月)。 同步主循环加按需 async 桥接的总体形态,在首批公开 release 前后就已形成;v0.4.0-v0.8.0 之间的主要变化集中在 event loop 复用、线程池规模和 SQLite 重试参数的稳定性调优,而不是并发模型本身的彻底翻新。v0.8.x 将 Gateway 会话状态从
os.environ(进程全局)迁移到contextvars.ContextVar(任务局部),修复了并发消息互相覆盖会话上下文的问题。