Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

从用户输入到最终响应

本章核心源码cli.pygateway/run.pyrun_agent.pymodel_tools.py

定位:本章用一次完整请求串起系统全貌,追踪消息如何从用户输入流经 CLI/Gateway → AIAgent → 模型 → 工具 → 响应。 前置依赖:第 2 章(仓库地图)。适用场景:想理解"一条消息在系统里到底经历了什么"。

为什么需要追踪一次请求

第 2 章给出了静态的仓库地图——五层架构和关键文件。但光看地图不够,你还需要看到消息如何流动

一次看似简单的对话——用户说"帮我搜索一下最近的 Python CVE",agent 调用 web_search 工具,返回结果——实际上穿越了入口层、编排层、能力层、状态层四层代码,经历了 system prompt 拼装、plugin hook、memory prefetch、模型 API 调用、工具调度、结果回注、session 持久化等十余个步骤。

本章用一次完整请求作为线索,把这些步骤串起来。后续章节会深入每个环节,但这一章让你先看到全貌。

完整流程图

sequenceDiagram
    participant U as 用户
    participant E as 入口层<br/>(CLI/Gateway)
    participant A as AIAgent<br/>(run_agent.py)
    participant M as 模型 API
    participant T as 工具系统<br/>(model_tools.py)
    participant S as 状态层<br/>(SessionDB)

    U->>E: 输入消息
    E->>A: 构造 AIAgent + 注入回调(详见第 2 章)
    A->>A: run_conversation()
    A->>A: 安全防护 + 输入清洗
    A->>A: 构建 system prompt(首次)
    A->>A: 预飞行压缩(如需)
    A->>A: Plugin pre_llm_call hook
    A->>A: Memory prefetch
    A->>A: 组装 api_messages
    A->>M: API 调用(streaming)
    M-->>A: 响应 + tool_calls

    alt 包含 tool_calls
        A->>T: _execute_tool_calls()
        T->>T: 并行/串行执行工具
        T-->>A: 工具结果
        A->>A: 结果回注 messages
        A->>S: 保存 session log
        A->>M: 下一轮 API 调用
        Note over A,M: 循环直到无 tool_calls<br/>或 budget 耗尽
    end

    M-->>A: 最终文本响应
    A->>A: Plugin post_llm_call hook
    A->>S: 持久化完整会话
    A->>A: Memory sync_all()
    A->>A: on_session_end hook
    A-->>E: 返回结果
    E-->>U: 展示响应

第一步:消息进入入口层

第 2 章已经展示了 CLI 和 Gateway 如何构造 AIAgent 并注入各自的回调(详见第 2 章"源码走读"节)。这里不再重复构造代码,直接关注消息进入 run_conversation() 之后发生了什么。

核心要点回顾:五个入口(CLI、Gateway、Batch、Cron、ACP)都构造同一个 AIAgent,通过 11 个回调(stream_delta_callbackclarify_callbacktool_start_callback 等)适配各自的 IO 模式。编排逻辑完全共享。

第二步:run_conversation() 启动

消息进入 AIAgent.run_conversation()run_agent.py:6800)后,编排器在进入主循环前执行一系列准备工作。这些准备工作不是可选的"nice-to-have"——每一步都防止了一类生产环境的崩溃或性能问题。

2.1 安全防护与输入清洗

# run_agent.py:6830
_install_safe_stdio()  # 包装 stdout/stderr 防 broken pipe

# run_agent.py:6840-6841
if isinstance(user_message, str):
    user_message = _sanitize_surrogates(user_message)

第一行安装 _SafeWriter,防止 systemd/Docker 场景下 stdout 写入失败导致崩溃(详见第 21 章)。第二行清除剪贴板粘贴可能带入的无效 surrogate 字符——Google Docs 等富文本编辑器粘贴的内容可能包含无效 UTF-8,会导致 JSON 序列化崩溃。

此外,编排器还会执行 fallback provider 恢复(_restore_primary_runtime(),如果上一轮触发了降级则恢复主模型)和死连接清理(_cleanup_dead_connections(),检测并清除 provider 故障遗留的僵尸 socket)。

2.2 System Prompt 构建(首次)

# run_agent.py:6952-6968
if self._cached_system_prompt is None:
    stored_prompt = None
    if conversation_history and self._session_db:
        session_row = self._session_db.get_session(self.session_id)
        if session_row:
            stored_prompt = session_row.get("system_prompt")
    if stored_prompt:
        self._cached_system_prompt = stored_prompt                      # 续会话:复用缓存
    else:
        self._cached_system_prompt = self._build_system_prompt(system_message)  # 新会话:构建

System prompt 只在新会话的首次调用时构建,后续调用复用缓存。这不是简单的性能优化——Anthropic 的 prompt caching 要求 system prompt 逐字节一致才能命中缓存。如果每次调用都重新构建(比如 memory 变化导致内容不同),缓存命中率会大幅下降。这个"prompt cache 敏感"的设计贯穿整个请求流程——后续的 memory context 和 plugin context 都注入 user message 而非 system prompt,同样是为了保护缓存。

详见第 5 章(提示词系统)。

2.3 预飞行压缩

# run_agent.py:7000-7049(简化,实际支持最多 3 轮多 pass 压缩)
if self.compression_enabled:
    _preflight_tokens = estimate_request_tokens_rough(
        messages, system_prompt=active_system_prompt, tools=self.tools
    )
    if _preflight_tokens >= self.context_compressor.threshold_tokens:
        for _pass in range(3):  # 多 pass:处理超大历史 + 小上下文窗口
            messages, active_system_prompt = self._compress_context(messages, ...)
            if _preflight_tokens < self.context_compressor.threshold_tokens:
                break

在进入主循环前,编排器估算当前消息的 token 数(包括工具 schema 占用的 20-30K+ token)。如果已经接近模型的上下文窗口限制——比如用户切换到了一个窗口更小的模型——立即执行压缩。支持最多 3 轮多 pass 压缩,处理超大历史记录的极端情况。

详见第 12 章(上下文压缩)。

2.4 Plugin Hook

# run_agent.py:7063-7082
_plugin_user_context = ""
_pre_results = _invoke_hook("pre_llm_call", session_id=..., user_message=..., ...)
# 插件返回的上下文拼接到 _plugin_user_context

插件的 pre_llm_call 钩子在主循环前触发。插件可以返回额外的上下文信息,这些信息会被注入到 user message(不是 system prompt),保护 prompt cache。

2.5 Memory Prefetch

# run_agent.py:7098-7108
if self._memory_manager:
    _ext_prefetch_cache = self._memory_manager.prefetch_all(original_user_message) or ""

如果配置了外部 memory provider(如 Honcho 或 Mem0),在进入主循环前预取一次相关记忆。预取结果被缓存,在整个主循环中复用——不会每次工具调用都重新预取(10 次工具调用 = 10 倍延迟 + 成本)。

详见第 11 章(Memory Provider)。

第三步:主循环——模型调用与工具执行

准备工作完成后,进入核心循环(run_agent.py:7111):

# run_agent.py:7111(伪代码,实际逻辑内联在 while 循环中)
while api_call_count < self.max_iterations and self.iteration_budget.remaining > 0:
    # 1. 检查中断请求
    if self._interrupt_requested:
        break

    # 2. 消耗 iteration budget
    api_call_count += 1
    if not self.iteration_budget.consume():
        break  # 预算耗尽

    # 3. 组装 api_messages(内联,非独立方法)
    #    - 复制 messages,在 user message 中注入 memory + plugin context
    #    - 前置 system prompt
    #    - 应用 prompt caching
    #    - 清理孤立的 tool results

    # 4. 调用模型 API(优先 streaming)
    response = self._interruptible_streaming_api_call(api_messages, ...)

    # 5. 解析响应
    if assistant_message.tool_calls:
        self._execute_tool_calls(assistant_message, messages, ...)
        continue  # 继续循环
    else:
        final_response = assistant_message.content  # 最终响应
        break

退出条件

主循环有以下退出路径:

退出条件触发场景类型
无 tool_calls模型给出了最终文本响应正常退出(最常见)
中断请求用户发送了新消息或按了 Ctrl+C正常退出
Budget 耗尽iteration_budget 用完(默认 90 次)正常退出
max_iterations达到最大 API 调用次数正常退出
context_length_exceeded压缩后仍超限错误退出
不可重试的 4xx 错误认证失败、模型不存在等错误退出
所有重试耗尽3 次重试全部失败错误退出

3.1 消息组装

每次 API 调用前,编排器将内部 messages 列表转换为 api_messagesrun_agent.py:7168-7249)。关键操作:

# run_agent.py:7168-7249(简化)
api_messages = []
for idx, msg in enumerate(messages):
    api_msg = msg.copy()
    # 在当前 turn 的 user message 中注入 memory context + plugin context
    if idx == current_turn_user_idx and msg.get("role") == "user":
        if _ext_prefetch_cache:
            api_msg["content"] += build_memory_context_block(_ext_prefetch_cache)
        if _plugin_user_context:
            api_msg["content"] += "\n\n" + _plugin_user_context
    api_messages.append(api_msg)

# 前置 system prompt
api_messages = [{"role": "system", "content": effective_system}] + api_messages

# 应用 prompt caching + 清理孤立 tool results
api_messages = apply_anthropic_cache_control(api_messages, ...)
api_messages = self._sanitize_api_messages(api_messages)

重要设计:原始 messages 列表不被修改——memory context 和 plugin context 的注入只存在于 api_messages 副本中,不会持久化到 session DB。这保证了下一轮对话加载历史时不会看到上一轮的注入内容。

3.2 模型 API 调用

编排器优先使用 streaming 路径(_interruptible_streaming_api_callrun_agent.py:7358),即使没有 stream consumer。原因不是为了流式渲染,而是为了健康检测:streaming 模式提供 90 秒 stale-stream 检测和 60 秒 read timeout,防止 provider 保持连接但不返回响应时无限挂起。非 streaming 的 _interruptible_api_call 仅作为 fallback。

API 调用失败时,有最多 3 次重试(run_agent.py:7285+),包含:

  • 指数退避
  • 429 限流处理
  • context_length_exceeded 时自动压缩并重试
  • 主模型不可用时切换到 fallback provider

详见第 21 章(运行时防御与容错)。

3.3 工具执行

当模型响应包含 tool_calls 时,编排器通过 _execute_tool_calls()run_agent.py:5930)执行工具。它首先判断是否可以并行执行:

# run_agent.py:5930(简化)
def _execute_tool_calls(self, assistant_message, messages, ...):
    tool_calls = assistant_message.tool_calls
    if not _should_parallelize_tool_batch(tool_calls):
        return self._execute_tool_calls_sequential(...)
    return self._execute_tool_calls_concurrent(...)

如果一批工具调用都是只读的(如 read_fileweb_search),或者虽然有写操作但目标路径不重叠,就使用 ThreadPoolExecutor 并行执行。否则串行执行。

每个工具的实际调用通过 _invoke_tool()run_agent.py:5953)分发。工具被分为四类路由

# run_agent.py:5953(简化,展示四类路由)
def _invoke_tool(self, function_name, function_args, ...):
    if function_name in ("todo", "session_search", "memory", "clarify"):
        return self._handle_builtin_tool(...)       # 1. Agent 内置工具
    elif self._memory_manager and self._memory_manager.has_tool(function_name):
        return self._memory_manager.handle_tool_call(...)  # 2. 外部 Memory Provider 工具
    elif function_name == "delegate_task":
        return delegate_task(...)                    # 3. 子代理委托
    else:
        return handle_function_call(...)             # 4. ToolRegistry 分发
路由类别示例工具原因
Agent 内置memorytodosession_searchclarify需要访问 agent 内部状态(memory store、todo store、session DB、clarify callback)
Memory Provider 工具Honcho/Mem0 自定义工具外部 memory provider 动态注册的工具
子代理委托delegate_task创建新 AIAgent 实例,分配独立 iteration budget
ToolRegistry其余所有工具通过 model_tools.pyhandle_function_call() 分发

详见第 6 章(工具系统)、第 9 章(子代理委托)、第 11 章(Memory Provider)。

第四步:循环结束与响应返回

当模型返回不包含 tool_calls 的响应时,主循环退出(run_agent.py:8876-8878)。退出循环后,编排器执行收尾工作(run_agent.py:9080+):

  1. Trajectory 保存:如果启用了轨迹记录(batch/RL 训练场景),保存完整对话轨迹
  2. Task 资源清理_cleanup_task_resources() 清理本轮对话使用的临时资源
  3. Plugin hook:触发 post_llm_call 钩子
  4. Session 持久化:将完整的 messages 列表写入 SQLite SessionDB
  5. Memory syncself._memory_manager.sync_all() 通知所有 memory provider 同步本轮对话
  6. Memory prefetch 排队self._memory_manager.queue_prefetch_all() 为下一轮对话预取记忆(后台执行,不阻塞当前响应)
  7. Background review:如果达到 nudge 阈值,后台检查是否需要持久化记忆或创建技能
  8. Plugin hook:触发 on_session_end 钩子
  9. 返回结果:将 final_response 和 messages 打包返回给入口层

入口层拿到结果后,CLI 渲染最终响应,Gateway 推送到消息平台。

设计选择:为什么是同步循环

整个 run_conversation() 是一个同步方法——它在调用线程上阻塞运行,直到对话完成才返回。这个选择看起来反直觉(为什么不用 async?),但有三个实际理由:

  1. 工具执行依赖顺序:大多数工具调用之间有隐含依赖(先读文件再修改),同步循环让控制流天然有序
  2. 错误处理简单:try/except 即可,不需要处理 async 取消、event loop 生命周期等问题
  3. Gateway 并发由线程实现:Gateway 为每个用户会话在独立线程中运行 run_conversation(),不需要 async 来实现并发

异步工具通过 _run_async()model_tools.py:81)桥接到同步循环中——这个桥接机制在每个线程维护一个持久 event loop,避免反复创建和销毁。

详见第 19 章(并发模型)。

设计启示

追踪一次请求的旅程,可以看到三个贯穿全流程的设计原则:

  1. 回调驱动的多入口适配:同一个 AIAgent 通过 11 个回调适配 CLI、Gateway、Batch 等场景,编排逻辑零冗余
  2. 防御性的每一步:SafeWriter、surrogate 清洗、死连接检测、预飞行压缩、interrupt 检查——主循环的每一步都有对应的防御措施
  3. 缓存与注入的分离:所有动态内容(memory context、plugin context)注入 user message 而非 system prompt,保护 Anthropic prompt cache 命中率

第 4 章将深入 AIAgent 类本身,拆解它的初始化链、callback 体系和 iteration budget 设计。


设计赌注回扣:本章展示的请求旅程同时回扣了四个设计赌注:CLI-First(CLI 是第一等入口)、Run Anywhere(同一编排逻辑扩展到 Gateway/Cron/Batch)、Personal Long-Term(memory prefetch/sync 贯穿请求生命周期)、Learning Loop(收尾阶段的 background review 检查是否需要创建技能)。


版本演化说明

本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。 run_conversation() 的主循环骨架在早期公开版本时就已经成形;v0.4.0-v0.8.0 期间的主要变化集中在压缩前处理、memory/plugin 注入点,以及为了 prompt cache 稳定性而添加的保护逻辑。