定时调度与批量运行
本章核心源码:
cron/scheduler.py(904 行)、cron/jobs.py(759 行)、hermes_cli/cron.py(290 行)、batch_runner.py(1287 行)、environments/
定位:本章拆解 Hermes 的定时调度(Cron)和批量运行(Batch)子系统——agent 如何在无人值守的情况下自动执行任务、投递结果到消息平台,以及如何批量生成训练轨迹。 前置依赖:第 14 章(Gateway)。适用场景:想理解 cron 调度机制,或构建自动化 agent 任务。
为什么需要定时调度
CLI 和 Gateway 都是被动的——等待用户发送消息。但一个真正有用的 personal agent 需要主动性:每天早上推送新闻摘要、每小时检查 GitHub Issues、每周生成项目报告。
Cron 子系统解决的核心问题是:如何让同一个 AIAgent 在无人交互的场景下自动执行任务,并将结果投递到用户选择的平台?
同时,Hermes 的训练和评估流程(Learning Loop 赌注的基础设施)需要批量运行 agent 并收集轨迹。batch_runner.py 提供了这个能力。
Cron 子系统架构
graph TB
subgraph "用户创建 Job"
U1["CLI: hermes cron create"]
U2["Chat: /cron create"]
U3["Agent: cronjob tool"]
end
subgraph "Job 存储"
JF["{HERMES_HOME}/cron/jobs.json<br/>jobs.py"]
end
subgraph "调度器"
TK["tick()<br/>scheduler.py:811"]
FL["文件锁<br/>.tick.lock"]
end
subgraph "执行"
RJ["run_job()<br/>scheduler.py:519"]
AI["AIAgent<br/>quiet_mode=True"]
end
subgraph "投递"
DR["_deliver_result()<br/>scheduler.py:198"]
LP["Live Adapter 路径"]
SP["Standalone HTTP 路径"]
end
subgraph "输出"
OF["{HERMES_HOME}/cron/output/"]
TG["Telegram"]
DC["Discord"]
SL["Slack"]
OT["其他平台..."]
end
U1 --> JF
U2 --> JF
U3 --> JF
JF --> TK
TK --> FL
FL --> RJ
RJ --> AI
AI --> DR
DR --> LP
DR --> SP
LP --> TG
LP --> DC
SP --> SL
DR --> OF
Job 存储:jobs.py
Job CRUD
所有 cron job 都存储在当前 profile 的 HERMES_HOME/cron/jobs.json(默认 profile 下是 ~/.hermes/cron/jobs.json,见 cron/jobs.py:36-37)。create_job()(jobs.py:366-464)是创建 job 的核心函数:
# cron/jobs.py:366-379
def create_job(
prompt: str,
schedule: str,
name: Optional[str] = None,
repeat: Optional[int] = None,
deliver: Optional[str] = None,
origin: Optional[Dict[str, Any]] = None,
skill: Optional[str] = None,
skills: Optional[List[str]] = None,
model: Optional[str] = None,
provider: Optional[str] = None,
base_url: Optional[str] = None,
script: Optional[str] = None,
) -> Dict[str, Any]:
每个 job 包含以下核心字段:
| 字段 | 说明 | 示例 |
|---|---|---|
prompt | 发送给 agent 的指令 | "总结最近 24 小时的 GitHub Issues" |
schedule | 调度配置 | {"kind": "cron", "expr": "0 9 * * *"} |
deliver | 投递目标 | "origin", "telegram", "local" |
origin | 创建来源(用于 origin 投递) | {"platform": "telegram", "chat_id": "123"} |
skills | 预加载的技能列表 | ["github-auth", "project-analyzer"] |
model | 每 job 模型覆盖 | "claude-sonnet-4-20250514" |
script | 数据收集脚本路径 | "check_prices.py" |
repeat | 运行次数限制 | None(无限)或 1(一次性) |
调度解析
parse_schedule()(jobs.py:117-203)支持四种调度格式:
# jobs.py:117-203(简化)
def parse_schedule(schedule: str) -> Dict[str, Any]:
"""
Parse schedule string into structured format.
Examples:
"30m" → once in 30 minutes
"every 30m" → recurring every 30 minutes
"0 9 * * *" → cron expression
"2026-02-03T14:00" → once at timestamp
"""
| 格式 | 解析为 | 示例 |
|---|---|---|
| 持续时间 | once(一次性) | "30m" → 30 分钟后执行一次 |
every + 持续时间 | interval(周期) | "every 2h" → 每 2 小时 |
| Cron 表达式 | cron | "0 9 * * *" → 每天早上 9 点 |
| ISO 时间戳 | once(定时) | "2026-02-03T14:00" → 指定时间 |
一次性 job 自动设置 repeat=1,完成后自动删除。
防重入的 At-Most-Once 语义
advance_next_run()(jobs.py:627-652)在 job 执行之前将 next_run_at 推进到下一个周期:
# jobs.py:627-652
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.
Call this BEFORE run_job() so that if the process crashes mid-execution,
the job won't re-fire on the next gateway restart. This converts the
scheduler from at-least-once to at-most-once for recurring jobs.
"""
这是一个有意识的 trade-off:丢失一次运行比在崩溃循环中重复执行几十次要好。一次性 job 不受影响——它们可以在重启后重试。
过期 Job 快进
当 Gateway 宕机一段时间后重启,积压的 job 不会全部立即触发。get_due_jobs()(jobs.py:655-731)实现了智能快进:
# jobs.py:700-724(简化)
grace = _compute_grace_seconds(schedule)
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > grace:
# Job missed its window — fast-forward to next future run
new_next = compute_next_run(schedule, now.isoformat())
logger.info("Job '%s' missed its scheduled time. Fast-forwarding to: %s",
job.get("name"), new_next)
continue # Skip this run
Grace window 根据调度周期动态计算(_compute_grace_seconds,jobs.py:252-281):周期的一半,限制在 120 秒到 2 小时之间。这意味着日任务在宕机 2 小时内重启可以补跑,但超过 2 小时就跳过。
调度器:scheduler.py
tick() 函数
tick()(scheduler.py:811-901)是调度器的核心——Gateway 每 60 秒在后台线程调用一次:
# scheduler.py:811-825
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""Check and run all due jobs.
Uses a file lock so only one tick runs at a time.
"""
_LOCK_DIR.mkdir(parents=True, exist_ok=True)
lock_fd = open(_LOCK_FILE, "w")
if fcntl:
fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
文件锁(位于当前 HERMES_HOME/cron/.tick.lock)确保即使 Gateway 进程内的 ticker、独立守护进程和手动 hermes cron tick 同时运行,也只有一个 tick 实际执行。
run_job():执行单个 Job
run_job()(scheduler.py:519-809)创建一个独立的 AIAgent 来执行 job:
# scheduler.py:648-669(简化)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
base_url=turn_route["runtime"].get("base_url"),
provider=turn_route["runtime"].get("provider"),
max_iterations=max_iterations,
disabled_toolsets=["cronjob", "messaging", "clarify"],
quiet_mode=True,
skip_memory=True, # Cron 不使用记忆
platform="cron",
session_id=_cron_session_id,
session_db=_session_db,
)
三个关键设计决策:
-
禁用 cronjob/messaging/clarify 工具集(
scheduler.py:664):cron job 不能自己创建新 job(防止递归),不能发送消息(由_deliver_result统一处理),不能请求用户确认(无人在线) -
skip_memory=True(scheduler.py:667):cron 的 system prompt 不包含用户记忆。因为 cron 任务的上下文和交互式对话不同,注入记忆会干扰 cron prompt 的意图 -
Inactivity-based timeout(
scheduler.py:675-742):job 不是用固定超时,而是用不活跃超时——只要 agent 持续在调用工具和 API,就可以运行数小时;但如果 API 挂起或工具卡住超过 600 秒无活动,就被终止
SILENT 标记
当 cron agent 判断没有新内容需要报告时,它可以返回 [SILENT](scheduler.py:53-54):
# scheduler.py:53-54
SILENT_MARKER = "[SILENT]"
调度器检测到 [SILENT] 后跳过投递(scheduler.py:872-874),但输出仍保存到本地文件用于审计。这避免了"每天 9 点推送一条'没有新内容'"的噪音。
数据收集脚本
_run_job_script()(scheduler.py:351-426)在 agent 执行前运行一个 Python 脚本,将其输出注入 prompt:
# scheduler.py:370-382
scripts_dir = get_hermes_home() / "scripts"
raw = Path(script_path).expanduser()
if raw.is_absolute():
path = raw.resolve()
else:
path = (scripts_dir / raw).resolve()
# Guard against path traversal
try:
path.relative_to(scripts_dir_resolved)
except ValueError:
return False, "Blocked: script path resolves outside the scripts directory"
安全约束:脚本必须位于当前 HERMES_HOME/scripts/ 目录内(默认 profile 下通常表现为 ~/.hermes/scripts/)。路径遍历(../../etc/passwd)和符号链接逃逸都被检测和拒绝。脚本输出在注入 prompt 前经过 redact_sensitive_text() 脱敏。
投递路由
_deliver_result()(scheduler.py:198-345)将 job 输出投递到目标平台。投递有两条路径:
- Live Adapter 路径(
scheduler.py:287-313):如果 Gateway 正在运行,通过已连接的平台适配器发送。这支持端到端加密(如 Matrix E2EE 房间) - Standalone HTTP 路径(
scheduler.py:321-337):如果 Live Adapter 不可用,直接通过 HTTP API 发送。作为降级方案
Live Adapter 失败时自动回退到 Standalone 路径——双路径设计确保了投递的可靠性。
CLI 管理界面
hermes_cli/cron.py(290 行)提供了 cron 的命令行管理界面:
# hermes_cli/cron.py:253-290
def cron_command(args):
"""Handle cron subcommands."""
subcmd = getattr(args, 'cron_command', None)
if subcmd is None or subcmd == "list":
cron_list(show_all)
elif subcmd == "status":
cron_status()
elif subcmd in {"create", "add"}:
return cron_create(args)
elif subcmd == "edit":
return cron_edit(args)
elif subcmd == "pause":
return _job_action("pause", args.job_id, "Paused")
elif subcmd == "resume":
return _job_action("resume", args.job_id, "Resumed")
支持的子命令:list、create、edit、pause、resume、run(立即触发)、remove、status、tick(手动执行一次调度)。
cron_status()(hermes_cli/cron.py:127-158)检查 Gateway 进程是否运行——如果没有,cron job 不会自动触发,需要提醒用户启动 Gateway。
批量运行:batch_runner.py
batch_runner.py(1287 行)是 Learning Loop 赌注的基础设施。它支持从数据集批量运行 agent 并收集训练轨迹:
# batch_runner.py:0-20(文档字符串)
"""
Batch Agent Runner
- Dataset loading and batching
- Parallel batch processing with multiprocessing
- Checkpointing for fault tolerance and resumption
- Trajectory saving in the proper format (from/value pairs)
- Tool usage statistics aggregation across all batches
"""
主要能力:
| 特性 | 说明 |
|---|---|
| 并行处理 | multiprocessing.Pool 并行执行多个 prompt |
| 检查点恢复 | 中断后可从上次完成的批次继续 |
| 轨迹记录 | 保存完整的对话轨迹(from/value 格式),用于 SFT/RLHF |
| 工具统计 | 跨批次汇总工具调用的成功/失败次数 |
| 工具集分布 | 支持按概率分布随机分配工具集,增加训练数据多样性 |
RL 环境:environments/
environments/ 目录包含多个强化学习训练环境(environments/hermes_swe_env/、environments/web_research_env.py、environments/terminal_test_env/ 等)。这些环境将 AIAgent 封装在 OpenAI Gym 风格的接口中,用于 RL 训练和评估。
environments/hermes_base_env.py 定义了基础环境类,提供 step() / reset() 接口。agent_loop.py 实现了 agent 在环境中的执行循环。
设计启示
拆解 Cron 和 Batch 子系统,可以提炼出三个设计原则:
-
At-Most-Once 语义:通过在执行前推进
next_run_at,Cron 选择了"宁可漏跑一次也不重复执行"的策略。这对连接外部 API、发送消息等有副作用的任务至关重要 -
双路径投递:Live Adapter + Standalone HTTP 的双路径设计让投递既能利用 Gateway 的实时连接(E2EE 支持),又能在 Gateway 不完全可用时降级为独立 HTTP 调用
-
共享 AIAgent,差异化配置:Cron 使用和 CLI/Gateway 完全相同的
AIAgent,但通过disabled_toolsets、skip_memory、quiet_mode等参数定制行为。这比维护一个独立的"cron agent"更简单、更一致
设计赌注回扣:本章同时回扣了 Run Anywhere 和 Learning Loop 两个赌注。Cron 子系统让 agent 在无人值守场景下自动执行任务并投递到任意平台(Run Anywhere)。Batch runner 和 RL environments 为 agent 的训练和评估提供基础设施(Learning Loop)。SILENT 标记和数据收集脚本也体现了 Hermes 对自动化场景的深入思考。
版本演化说明
本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。 Cron 和 batch 基础设施在 v0.3.0 之前就已经出现;v0.3.0-v0.8.0 之间逐步补齐了调度形式、投递目标、job 状态和守护逻辑。可以明确确认的是,
script预执行数据收集字段属于 v0.8.0 发布窗口的新能力。