从用户输入到最终响应
本章核心源码:
cli.py、gateway/run.py、run_agent.py、model_tools.py
定位:本章用一次完整请求串起系统全貌,追踪消息如何从用户输入流经 CLI/Gateway → AIAgent → 模型 → 工具 → 响应。 前置依赖:第 2 章(仓库地图)。适用场景:想理解"一条消息在系统里到底经历了什么"。
为什么需要追踪一次请求
第 2 章给出了静态的仓库地图——五层架构和关键文件。但光看地图不够,你还需要看到消息如何流动。
一次看似简单的对话——用户说"帮我搜索一下最近的 Python CVE",agent 调用 web_search 工具,返回结果——实际上穿越了入口层、编排层、能力层、状态层四层代码,经历了 system prompt 拼装、plugin hook、memory prefetch、模型 API 调用、工具调度、结果回注、session 持久化等十余个步骤。
本章用一次完整请求作为线索,把这些步骤串起来。后续章节会深入每个环节,但这一章让你先看到全貌。
完整流程图
sequenceDiagram
participant U as 用户
participant E as 入口层<br/>(CLI/Gateway)
participant A as AIAgent<br/>(run_agent.py)
participant M as 模型 API
participant T as 工具系统<br/>(model_tools.py)
participant S as 状态层<br/>(SessionDB)
U->>E: 输入消息
E->>A: 构造 AIAgent + 注入回调(详见第 2 章)
A->>A: run_conversation()
A->>A: 安全防护 + 输入清洗
A->>A: 构建 system prompt(首次)
A->>A: 预飞行压缩(如需)
A->>A: Plugin pre_llm_call hook
A->>A: Memory prefetch
A->>A: 组装 api_messages
A->>M: API 调用(streaming)
M-->>A: 响应 + tool_calls
alt 包含 tool_calls
A->>T: _execute_tool_calls()
T->>T: 并行/串行执行工具
T-->>A: 工具结果
A->>A: 结果回注 messages
A->>S: 保存 session log
A->>M: 下一轮 API 调用
Note over A,M: 循环直到无 tool_calls<br/>或 budget 耗尽
end
M-->>A: 最终文本响应
A->>A: Plugin post_llm_call hook
A->>S: 持久化完整会话
A->>A: Memory sync_all()
A->>A: on_session_end hook
A-->>E: 返回结果
E-->>U: 展示响应
第一步:消息进入入口层
第 2 章已经展示了 CLI 和 Gateway 如何构造 AIAgent 并注入各自的回调(详见第 2 章"源码走读"节)。这里不再重复构造代码,直接关注消息进入 run_conversation() 之后发生了什么。
核心要点回顾:五个入口(CLI、Gateway、Batch、Cron、ACP)都构造同一个 AIAgent,通过 11 个回调(stream_delta_callback、clarify_callback、tool_start_callback 等)适配各自的 IO 模式。编排逻辑完全共享。
第二步:run_conversation() 启动
消息进入 AIAgent.run_conversation()(run_agent.py:6800)后,编排器在进入主循环前执行一系列准备工作。这些准备工作不是可选的"nice-to-have"——每一步都防止了一类生产环境的崩溃或性能问题。
2.1 安全防护与输入清洗
# run_agent.py:6830
_install_safe_stdio() # 包装 stdout/stderr 防 broken pipe
# run_agent.py:6840-6841
if isinstance(user_message, str):
user_message = _sanitize_surrogates(user_message)
第一行安装 _SafeWriter,防止 systemd/Docker 场景下 stdout 写入失败导致崩溃(详见第 21 章)。第二行清除剪贴板粘贴可能带入的无效 surrogate 字符——Google Docs 等富文本编辑器粘贴的内容可能包含无效 UTF-8,会导致 JSON 序列化崩溃。
此外,编排器还会执行 fallback provider 恢复(_restore_primary_runtime(),如果上一轮触发了降级则恢复主模型)和死连接清理(_cleanup_dead_connections(),检测并清除 provider 故障遗留的僵尸 socket)。
2.2 System Prompt 构建(首次)
# run_agent.py:6952-6968
if self._cached_system_prompt is None:
stored_prompt = None
if conversation_history and self._session_db:
session_row = self._session_db.get_session(self.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt")
if stored_prompt:
self._cached_system_prompt = stored_prompt # 续会话:复用缓存
else:
self._cached_system_prompt = self._build_system_prompt(system_message) # 新会话:构建
System prompt 只在新会话的首次调用时构建,后续调用复用缓存。这不是简单的性能优化——Anthropic 的 prompt caching 要求 system prompt 逐字节一致才能命中缓存。如果每次调用都重新构建(比如 memory 变化导致内容不同),缓存命中率会大幅下降。这个"prompt cache 敏感"的设计贯穿整个请求流程——后续的 memory context 和 plugin context 都注入 user message 而非 system prompt,同样是为了保护缓存。
详见第 5 章(提示词系统)。
2.3 预飞行压缩
# run_agent.py:7000-7049(简化,实际支持最多 3 轮多 pass 压缩)
if self.compression_enabled:
_preflight_tokens = estimate_request_tokens_rough(
messages, system_prompt=active_system_prompt, tools=self.tools
)
if _preflight_tokens >= self.context_compressor.threshold_tokens:
for _pass in range(3): # 多 pass:处理超大历史 + 小上下文窗口
messages, active_system_prompt = self._compress_context(messages, ...)
if _preflight_tokens < self.context_compressor.threshold_tokens:
break
在进入主循环前,编排器估算当前消息的 token 数(包括工具 schema 占用的 20-30K+ token)。如果已经接近模型的上下文窗口限制——比如用户切换到了一个窗口更小的模型——立即执行压缩。支持最多 3 轮多 pass 压缩,处理超大历史记录的极端情况。
详见第 12 章(上下文压缩)。
2.4 Plugin Hook
# run_agent.py:7063-7082
_plugin_user_context = ""
_pre_results = _invoke_hook("pre_llm_call", session_id=..., user_message=..., ...)
# 插件返回的上下文拼接到 _plugin_user_context
插件的 pre_llm_call 钩子在主循环前触发。插件可以返回额外的上下文信息,这些信息会被注入到 user message(不是 system prompt),保护 prompt cache。
2.5 Memory Prefetch
# run_agent.py:7098-7108
if self._memory_manager:
_ext_prefetch_cache = self._memory_manager.prefetch_all(original_user_message) or ""
如果配置了外部 memory provider(如 Honcho 或 Mem0),在进入主循环前预取一次相关记忆。预取结果被缓存,在整个主循环中复用——不会每次工具调用都重新预取(10 次工具调用 = 10 倍延迟 + 成本)。
详见第 11 章(Memory Provider)。
第三步:主循环——模型调用与工具执行
准备工作完成后,进入核心循环(run_agent.py:7111):
# run_agent.py:7111(伪代码,实际逻辑内联在 while 循环中)
while api_call_count < self.max_iterations and self.iteration_budget.remaining > 0:
# 1. 检查中断请求
if self._interrupt_requested:
break
# 2. 消耗 iteration budget
api_call_count += 1
if not self.iteration_budget.consume():
break # 预算耗尽
# 3. 组装 api_messages(内联,非独立方法)
# - 复制 messages,在 user message 中注入 memory + plugin context
# - 前置 system prompt
# - 应用 prompt caching
# - 清理孤立的 tool results
# 4. 调用模型 API(优先 streaming)
response = self._interruptible_streaming_api_call(api_messages, ...)
# 5. 解析响应
if assistant_message.tool_calls:
self._execute_tool_calls(assistant_message, messages, ...)
continue # 继续循环
else:
final_response = assistant_message.content # 最终响应
break
退出条件
主循环有以下退出路径:
| 退出条件 | 触发场景 | 类型 |
|---|---|---|
| 无 tool_calls | 模型给出了最终文本响应 | 正常退出(最常见) |
| 中断请求 | 用户发送了新消息或按了 Ctrl+C | 正常退出 |
| Budget 耗尽 | iteration_budget 用完(默认 90 次) | 正常退出 |
| max_iterations | 达到最大 API 调用次数 | 正常退出 |
| context_length_exceeded | 压缩后仍超限 | 错误退出 |
| 不可重试的 4xx 错误 | 认证失败、模型不存在等 | 错误退出 |
| 所有重试耗尽 | 3 次重试全部失败 | 错误退出 |
3.1 消息组装
每次 API 调用前,编排器将内部 messages 列表转换为 api_messages(run_agent.py:7168-7249)。关键操作:
# run_agent.py:7168-7249(简化)
api_messages = []
for idx, msg in enumerate(messages):
api_msg = msg.copy()
# 在当前 turn 的 user message 中注入 memory context + plugin context
if idx == current_turn_user_idx and msg.get("role") == "user":
if _ext_prefetch_cache:
api_msg["content"] += build_memory_context_block(_ext_prefetch_cache)
if _plugin_user_context:
api_msg["content"] += "\n\n" + _plugin_user_context
api_messages.append(api_msg)
# 前置 system prompt
api_messages = [{"role": "system", "content": effective_system}] + api_messages
# 应用 prompt caching + 清理孤立 tool results
api_messages = apply_anthropic_cache_control(api_messages, ...)
api_messages = self._sanitize_api_messages(api_messages)
重要设计:原始 messages 列表不被修改——memory context 和 plugin context 的注入只存在于 api_messages 副本中,不会持久化到 session DB。这保证了下一轮对话加载历史时不会看到上一轮的注入内容。
3.2 模型 API 调用
编排器优先使用 streaming 路径(_interruptible_streaming_api_call,run_agent.py:7358),即使没有 stream consumer。原因不是为了流式渲染,而是为了健康检测:streaming 模式提供 90 秒 stale-stream 检测和 60 秒 read timeout,防止 provider 保持连接但不返回响应时无限挂起。非 streaming 的 _interruptible_api_call 仅作为 fallback。
API 调用失败时,有最多 3 次重试(run_agent.py:7285+),包含:
- 指数退避
- 429 限流处理
context_length_exceeded时自动压缩并重试- 主模型不可用时切换到 fallback provider
详见第 21 章(运行时防御与容错)。
3.3 工具执行
当模型响应包含 tool_calls 时,编排器通过 _execute_tool_calls()(run_agent.py:5930)执行工具。它首先判断是否可以并行执行:
# run_agent.py:5930(简化)
def _execute_tool_calls(self, assistant_message, messages, ...):
tool_calls = assistant_message.tool_calls
if not _should_parallelize_tool_batch(tool_calls):
return self._execute_tool_calls_sequential(...)
return self._execute_tool_calls_concurrent(...)
如果一批工具调用都是只读的(如 read_file、web_search),或者虽然有写操作但目标路径不重叠,就使用 ThreadPoolExecutor 并行执行。否则串行执行。
每个工具的实际调用通过 _invoke_tool()(run_agent.py:5953)分发。工具被分为四类路由:
# run_agent.py:5953(简化,展示四类路由)
def _invoke_tool(self, function_name, function_args, ...):
if function_name in ("todo", "session_search", "memory", "clarify"):
return self._handle_builtin_tool(...) # 1. Agent 内置工具
elif self._memory_manager and self._memory_manager.has_tool(function_name):
return self._memory_manager.handle_tool_call(...) # 2. 外部 Memory Provider 工具
elif function_name == "delegate_task":
return delegate_task(...) # 3. 子代理委托
else:
return handle_function_call(...) # 4. ToolRegistry 分发
| 路由类别 | 示例工具 | 原因 |
|---|---|---|
| Agent 内置 | memory、todo、session_search、clarify | 需要访问 agent 内部状态(memory store、todo store、session DB、clarify callback) |
| Memory Provider 工具 | Honcho/Mem0 自定义工具 | 外部 memory provider 动态注册的工具 |
| 子代理委托 | delegate_task | 创建新 AIAgent 实例,分配独立 iteration budget |
| ToolRegistry | 其余所有工具 | 通过 model_tools.py 的 handle_function_call() 分发 |
详见第 6 章(工具系统)、第 9 章(子代理委托)、第 11 章(Memory Provider)。
第四步:循环结束与响应返回
当模型返回不包含 tool_calls 的响应时,主循环退出(run_agent.py:8876-8878)。退出循环后,编排器执行收尾工作(run_agent.py:9080+):
- Trajectory 保存:如果启用了轨迹记录(batch/RL 训练场景),保存完整对话轨迹
- Task 资源清理:
_cleanup_task_resources()清理本轮对话使用的临时资源 - Plugin hook:触发
post_llm_call钩子 - Session 持久化:将完整的 messages 列表写入 SQLite SessionDB
- Memory sync:
self._memory_manager.sync_all()通知所有 memory provider 同步本轮对话 - Memory prefetch 排队:
self._memory_manager.queue_prefetch_all()为下一轮对话预取记忆(后台执行,不阻塞当前响应) - Background review:如果达到 nudge 阈值,后台检查是否需要持久化记忆或创建技能
- Plugin hook:触发
on_session_end钩子 - 返回结果:将 final_response 和 messages 打包返回给入口层
入口层拿到结果后,CLI 渲染最终响应,Gateway 推送到消息平台。
设计选择:为什么是同步循环
整个 run_conversation() 是一个同步方法——它在调用线程上阻塞运行,直到对话完成才返回。这个选择看起来反直觉(为什么不用 async?),但有三个实际理由:
- 工具执行依赖顺序:大多数工具调用之间有隐含依赖(先读文件再修改),同步循环让控制流天然有序
- 错误处理简单:try/except 即可,不需要处理 async 取消、event loop 生命周期等问题
- Gateway 并发由线程实现:Gateway 为每个用户会话在独立线程中运行
run_conversation(),不需要 async 来实现并发
异步工具通过 _run_async()(model_tools.py:81)桥接到同步循环中——这个桥接机制在每个线程维护一个持久 event loop,避免反复创建和销毁。
详见第 19 章(并发模型)。
设计启示
追踪一次请求的旅程,可以看到三个贯穿全流程的设计原则:
- 回调驱动的多入口适配:同一个
AIAgent通过 11 个回调适配 CLI、Gateway、Batch 等场景,编排逻辑零冗余 - 防御性的每一步:SafeWriter、surrogate 清洗、死连接检测、预飞行压缩、interrupt 检查——主循环的每一步都有对应的防御措施
- 缓存与注入的分离:所有动态内容(memory context、plugin context)注入 user message 而非 system prompt,保护 Anthropic prompt cache 命中率
第 4 章将深入 AIAgent 类本身,拆解它的初始化链、callback 体系和 iteration budget 设计。
设计赌注回扣:本章展示的请求旅程同时回扣了四个设计赌注:CLI-First(CLI 是第一等入口)、Run Anywhere(同一编排逻辑扩展到 Gateway/Cron/Batch)、Personal Long-Term(memory prefetch/sync 贯穿请求生命周期)、Learning Loop(收尾阶段的 background review 检查是否需要创建技能)。
版本演化说明
本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。
run_conversation()的主循环骨架在早期公开版本时就已经成形;v0.4.0-v0.8.0 期间的主要变化集中在压缩前处理、memory/plugin 注入点,以及为了 prompt cache 稳定性而添加的保护逻辑。