Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

并发模型:为什么不全用 async

本章核心源码run_agent.py_MAX_TOOL_WORKERS、6145-6153 并发工具执行)、model_tools.py:81_run_async())、hermes_state.py(164-214 jitter retry)、gateway/session_context.py(contextvars 会话隔离)

定位:本章解释 Hermes 为什么选择同步编排器 + 异步桥接的混合并发模型,而不是全 async——从 _run_async() 的三种路径到工具并发的 ThreadPoolExecutor 到 SQLite WAL 的锁竞争解法。 前置依赖:第 4 章(AIAgent 同步编排器)、第 6 章(工具系统)。适用场景:想理解 Hermes 为什么不全 async,或需要调试并发相关的问题。

为什么 AIAgent 是同步的

这个问题在第 4 章被简单提及,本章展开讨论。

一个 LLM agent 的核心循环是:构建消息 → 调用模型 → 解析工具调用 → 执行工具 → 循环。这个循环是天然串行的——你必须等模型返回才能知道要执行哪些工具,必须等工具执行完才能把结果喂给模型。async/await 在这里没有并发收益,只会增加复杂度。

但 Hermes 的入口层有两种截然不同的运行环境:

  • CLI 模式:Python 主线程,没有 event loop,直接调用 agent.run_conversation()
  • Gateway 模式:asyncio event loop 驱动,平台消息通过 await 接收

如果 AIAgent 是 async 的,CLI 模式需要 asyncio.run() 包装——看似简单,但引入了一个关键问题:async 上下文中不能调用同步阻塞函数(如 input()subprocess.run()),否则会阻塞 event loop。Hermes 的工具系统大量使用同步阻塞 API(subprocess、文件 IO、SQLite),把它们全部改成 async 的工作量和出错概率都很高。

Hermes 的解法是:编排层同步,异步工具通过桥接函数在同步上下文中运行

graph TB
    subgraph "CLI 入口"
        CLI["主线程<br/>无 event loop"]
    end

    subgraph "Gateway 入口"
        GW["asyncio event loop"]
        GW_THREAD["agent 运行在<br/>独立线程"]
    end

    subgraph "AIAgent (同步编排器)"
        AGENT["run_conversation()<br/>同步大循环"]
        TOOLS["工具执行<br/>ThreadPoolExecutor"]
        ASYNC_BRIDGE["_run_async()<br/>model_tools.py:81"]
    end

    subgraph "异步工具"
        BROWSER["browser_tool<br/>(playwright)"]
        MCP["mcp_tool<br/>(async transport)"]
    end

    CLI --> AGENT
    GW --> GW_THREAD --> AGENT
    AGENT --> TOOLS
    TOOLS --> ASYNC_BRIDGE
    ASYNC_BRIDGE --> BROWSER
    ASYNC_BRIDGE --> MCP

    style AGENT fill:#f96,stroke:#333,stroke-width:2px
    style ASYNC_BRIDGE fill:#ff9,stroke:#333,stroke-width:2px

_run_async():同步-异步桥接的唯一入口

_run_async()model_tools.py:81)是 Hermes 中所有 sync → async 转换的单一入口点。它需要处理三种运行上下文:

# model_tools.py:81-115(简化)
def _run_async(coro):
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = None

    if loop and loop.is_running():
        # 路径 A:已有 event loop(Gateway / RL 环境)
        # → 在新线程中运行,避免嵌套 loop 冲突
        with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
            return pool.submit(asyncio.run, coro).result()
    else:
        # 路径 B/C:无 event loop(CLI 主线程 / worker 线程)
        # → 使用 per-thread 持久 loop
        loop = _get_or_create_persistent_loop()
        return loop.run_until_complete(coro)

三种路径的必要性

路径 A(Gateway 内部调用):Gateway 运行在 asyncio event loop 中,agent 在独立线程执行。如果 agent 的工具需要执行 async 代码,不能在当前线程的 loop 上 run_until_complete()——因为那个 loop 正在运行。解法是再开一个线程让 asyncio.run() 创建自己的 loop。

路径 B(CLI 主线程):CLI 没有 running loop。这里使用持久 event loop_get_or_create_persistent_loop()model_tools.py:66-78)而非 asyncio.run()。原因是 asyncio.run() 创建-销毁 loop 的生命周期会导致 httpx/AsyncOpenAI 等异步客户端在 GC 时触发 "Event loop is closed" 错误。

# model_tools.py:66-78
_worker_thread_local = threading.local()

def _get_or_create_persistent_loop() -> asyncio.AbstractEventLoop:
    loop = getattr(_worker_thread_local, 'loop', None)
    if loop is None or loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        _worker_thread_local.loop = loop
    return loop

路径 C(Worker 线程):并发工具执行在 ThreadPoolExecutor 的 worker 线程中。每个 worker 线程需要自己的持久 loop(通过 threading.local() 实现),避免与主线程的 loop 竞争,也避免 asyncio.run() 的 create-and-destroy 生命周期问题。

工具并发执行

ThreadPoolExecutor 策略(run_agent.py:6144-6159)

当模型在一次响应中返回多个工具调用(parallel tool calling),Hermes 使用 ThreadPoolExecutor 并发执行:

# run_agent.py:235
_MAX_TOOL_WORKERS = 8

# run_agent.py:6144-6153
max_workers = min(num_tools, _MAX_TOOL_WORKERS)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = []
    for i, (tc, name, args) in enumerate(parsed_calls):
        f = executor.submit(_run_tool, i, tc, name, args)
        futures.append(f)
    concurrent.futures.wait(futures)

_MAX_TOOL_WORKERS = 8 是一个经过调优的上限:

  • 太小(如 2):多个 terminal.execute() 调用无法并行,延迟累加
  • 太大(如 32):每个线程持有自己的 event loop 和可能的 subprocess,内存和文件描述符开销过高
  • 8 是在"典型模型输出 2-5 个并行工具调用"场景下的平衡点

为什么不用 asyncio.gather()

虽然 async 世界有 asyncio.gather() 做并发,但 Hermes 的大多数工具是同步阻塞的——subprocess.run() 执行 shell 命令、sqlite3 执行查询、os.path.exists() 检查文件。在 async context 中运行这些同步代码需要 loop.run_in_executor(),本质上还是线程池。直接用 ThreadPoolExecutor 更简单、更透明。

SQLite WAL 并发

问题场景

当 Gateway 同时处理多个平台的消息时,多个 agent 线程可能并发写入同一个 state.db。SQLite 的 WAL(Write-Ahead Logging)模式允许并发读取,但写入仍然是串行的。

Jitter Retry(hermes_state.py:115-214)

SessionDB 使用 application-level jitter retry 解决锁竞争:

# hermes_state.py:123-136
class SessionDB:
    _WRITE_MAX_RETRIES = 15
    _WRITE_RETRY_MIN_S = 0.020   # 20ms
    _WRITE_RETRY_MAX_S = 0.150   # 150ms
    _CHECKPOINT_EVERY_N_WRITES = 50
# hermes_state.py:164-214(简化)
def _execute_write(self, fn):
    for attempt in range(self._WRITE_MAX_RETRIES):
        try:
            with self._lock:
                self._conn.execute("BEGIN IMMEDIATE")
                result = fn(self._conn)
                self._conn.commit()
            # 成功后定期 WAL checkpoint
            self._write_count += 1
            if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0:
                self._try_wal_checkpoint()
            return result
        except sqlite3.OperationalError as exc:
            if "locked" in str(exc).lower() or "busy" in str(exc).lower():
                if attempt < self._WRITE_MAX_RETRIES - 1:
                    jitter = random.uniform(
                        self._WRITE_RETRY_MIN_S,    # 20ms
                        self._WRITE_RETRY_MAX_S,    # 150ms
                    )
                    time.sleep(jitter)
                    continue
            raise

关键设计决策:

  1. BEGIN IMMEDIATEhermes_state.py:183):在事务开始时就获取写锁,而非等到 COMMIT。这让锁竞争在事务开始时就暴露,而不是在 COMMIT 时——后者会导致已完成的工作被浪费
  2. 随机 jitter 而非固定退避:SQLite 内置的 busy handler 使用确定性的退避调度,会导致 convoy effect——多个 writer 以相同的节奏 sleep 然后同时醒来重试。随机 jitter(20-150ms)自然地打散了竞争者的重试时机
  3. 短超时 + 多重试timeout=1.0hermes_state.py:150)让 SQLite 快速失败,然后由 application-level 的 15 次 jitter retry 接管。这比让 SQLite 在内置 busy handler 中等待 30 秒更好——因为我们能更精细地控制退避策略

WAL Checkpoint(hermes_state.py:216-229)

# hermes_state.py:216-229
def _try_wal_checkpoint(self) -> None:
    """Best-effort PASSIVE WAL checkpoint. Never blocks, never raises."""
    with self._lock:
        result = self._conn.execute("PRAGMA wal_checkpoint(PASSIVE)").fetchone()

每 50 次成功写入后执行 PASSIVE checkpoint,将 WAL 中已提交的 frames 刷回主数据库文件。PASSIVE 模式不阻塞其他连接的读取——它只刷那些当前没有读者需要的 frames。这防止 WAL 文件在多进程长期运行时无限增长。

Memory 后台预取

记忆系统的初始化(MemoryProvider 的 retrieve() 调用)可能涉及网络请求(如 Honcho API、向量数据库查询),耗时从几十毫秒到几秒不等。Hermes 在 AIAgent.__init__() 中将记忆预取放在后台线程:

# run_agent.py:1028-1054(第 4 章已讨论的记忆初始化)
# 外部 MemoryProvider 的 initialize_all() 在后台预取
self._memory_manager.initialize_all(
    session_id=self.session_id,
    platform=platform or "cli",
    user_id=self._user_id,
)

预取结果在首次构建 system prompt 时合并——如果预取尚未完成,prompt 构建会等待(带超时)。这让初始化和首次 API 调用的延迟尽可能重叠。

contextvars 替代 os.environ:Gateway 会话状态的并发修复

Gateway 并发处理多条平台消息时,每条消息需要携带自己的会话上下文(platform、chat_id、thread_id)。早期实现使用 os.environ 存储这些值:

# 旧代码(已移除)
os.environ["HERMES_SESSION_THREAD_ID"] = str(context.source.thread_id)

这在并发场景下是不安全的:os.environ进程全局的。当 Message A 和 Message B 同时到达时,Message B 的 os.environ 写入会覆盖 Message A 的值——导致 Message A 的 agent 使用了错误的 thread_id,工具调用和通知发送到错误的聊天线程。

gateway/session_context.pycontextvars.ContextVar 替换了 os.environ

# gateway/session_context.py:39-48
from contextvars import ContextVar

_SESSION_PLATFORM: ContextVar[str] = ContextVar("HERMES_SESSION_PLATFORM", default="")
_SESSION_CHAT_ID: ContextVar[str] = ContextVar("HERMES_SESSION_CHAT_ID", default="")
_SESSION_CHAT_NAME: ContextVar[str] = ContextVar("HERMES_SESSION_CHAT_NAME", default="")
_SESSION_THREAD_ID: ContextVar[str] = ContextVar("HERMES_SESSION_THREAD_ID", default="")

ContextVar任务局部的:每个 asyncio task(以及它通过 run_in_executor 派生的线程)获得自己的值副本,并发消息之间互不干扰。

设计细节:

  • Token 机制保证清理set_session_vars() 返回 reset tokens,handler 在 finally 块中调用 clear_session_vars(tokens) 恢复旧值,防止泄漏
  • 向后兼容get_session_env() 先查 ContextVar,若为空则回退到 os.environ——CLI 和 cron 等非并发入口仍然通过环境变量传递会话状态,无需改动
  • 最小侵入性:工具代码只需将 os.getenv("HERMES_SESSION_*") 替换为 get_session_env("HERMES_SESSION_*"),接口签名完全一致

这是一个典型的"正确的并发原语选择"修复:问题不在于缺少锁,而在于使用了错误作用域(进程全局 vs 任务局部)的状态存储。

并发安全的设计模式

IterationBudget 线程安全(run_agent.py:168-209)

第 4 章讨论了 IterationBudget 的功能,这里关注它的并发安全设计:

# run_agent.py:168-209
class IterationBudget:
    def __init__(self, max_total: int):
        self._used = 0
        self._lock = threading.Lock()

    def consume(self) -> bool:
        with self._lock:
            if self._used >= self.max_total:
                return False
            self._used += 1
            return True

虽然 _used 的读写在 CPython 的 GIL 下是原子的,显式锁仍然必要:consume() 需要读取-比较-写入三步操作的原子性——GIL 不保证这个组合是原子的。

中断传播的并发安全(run_agent.py:609-617)

# run_agent.py:609-611, 615-617
self._interrupt_requested = False
self._client_lock = threading.RLock()
self._active_children_lock = threading.Lock()

_client_lock 使用 RLock(可重入锁)而非普通 Lock,因为中断处理可能在持有锁的情况下被递归调用(顶层 agent 中断 → 遍历子代理设置中断 → 子代理的 handler 也需要获取同一个锁)。

设计启示

Hermes 的并发模型展示了一个实用主义的工程选择:

  1. 同步编排是正确的默认选择:agent 的核心循环是天然串行的,async 在这里增加复杂度而不增加吞吐。只在真正需要的地方(async SDK、并发工具执行)引入并发
  2. 桥接函数是必要的隔离层_run_async() 的三种路径处理了三种不同的运行上下文,让每个异步工具不需要知道自己运行在 CLI 还是 Gateway 还是 worker 线程中
  3. Application-level 锁优于数据库级锁:SQLite 的内置 busy handler 是通用解法,但 Hermes 的 jitter retry 是针对 agent 工作负载特性(短事务、突发写入、多进程)的定制解法,效果更好

设计赌注回扣:本章回扣 Run Anywhere 赌注——同步编排器让 Hermes 在没有 event loop 的 CLI 和有 event loop 的 Gateway 中用同一套代码运行;_run_async() 桥接让同步工具和异步工具在同一个编排循环中共存;jitter retry 让多个 Hermes 进程(CLI + Gateway + worktree agents)共享同一个 SQLite 数据库而不冲突。


版本演化说明

本章核心分析基于 Hermes Agent v0.8.x(2026 年 4 月)。 同步主循环加按需 async 桥接的总体形态,在首批公开 release 前后就已形成;v0.4.0-v0.8.0 之间的主要变化集中在 event loop 复用、线程池规模和 SQLite 重试参数的稳定性调优,而不是并发模型本身的彻底翻新。v0.8.x 将 Gateway 会话状态从 os.environ(进程全局)迁移到 contextvars.ContextVar(任务局部),修复了并发消息互相覆盖会话上下文的问题。