Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

定时调度与批量运行

本章核心源码cron/scheduler.py(904 行)、cron/jobs.py(759 行)、hermes_cli/cron.py(290 行)、batch_runner.py(1287 行)、environments/

定位:本章拆解 Hermes 的定时调度(Cron)和批量运行(Batch)子系统——agent 如何在无人值守的情况下自动执行任务、投递结果到消息平台,以及如何批量生成训练轨迹。 前置依赖:第 14 章(Gateway)。适用场景:想理解 cron 调度机制,或构建自动化 agent 任务。

为什么需要定时调度

CLI 和 Gateway 都是被动的——等待用户发送消息。但一个真正有用的 personal agent 需要主动性:每天早上推送新闻摘要、每小时检查 GitHub Issues、每周生成项目报告。

Cron 子系统解决的核心问题是:如何让同一个 AIAgent 在无人交互的场景下自动执行任务,并将结果投递到用户选择的平台?

同时,Hermes 的训练和评估流程(Learning Loop 赌注的基础设施)需要批量运行 agent 并收集轨迹。batch_runner.py 提供了这个能力。

Cron 子系统架构

graph TB
    subgraph "用户创建 Job"
        U1["CLI: hermes cron create"]
        U2["Chat: /cron create"]
        U3["Agent: cronjob tool"]
    end
    
    subgraph "Job 存储"
        JF["{HERMES_HOME}/cron/jobs.json<br/>jobs.py"]
    end
    
    subgraph "调度器"
        TK["tick()<br/>scheduler.py:811"]
        FL["文件锁<br/>.tick.lock"]
    end
    
    subgraph "执行"
        RJ["run_job()<br/>scheduler.py:519"]
        AI["AIAgent<br/>quiet_mode=True"]
    end
    
    subgraph "投递"
        DR["_deliver_result()<br/>scheduler.py:198"]
        LP["Live Adapter 路径"]
        SP["Standalone HTTP 路径"]
    end
    
    subgraph "输出"
        OF["{HERMES_HOME}/cron/output/"]
        TG["Telegram"]
        DC["Discord"]
        SL["Slack"]
        OT["其他平台..."]
    end
    
    U1 --> JF
    U2 --> JF
    U3 --> JF
    
    JF --> TK
    TK --> FL
    FL --> RJ
    RJ --> AI
    AI --> DR
    DR --> LP
    DR --> SP
    LP --> TG
    LP --> DC
    SP --> SL
    DR --> OF

Job 存储:jobs.py

Job CRUD

所有 cron job 都存储在当前 profile 的 HERMES_HOME/cron/jobs.json(默认 profile 下是 ~/.hermes/cron/jobs.json,见 cron/jobs.py:36-37)。create_job()jobs.py:366-464)是创建 job 的核心函数:

# cron/jobs.py:366-379
def create_job(
    prompt: str,
    schedule: str,
    name: Optional[str] = None,
    repeat: Optional[int] = None,
    deliver: Optional[str] = None,
    origin: Optional[Dict[str, Any]] = None,
    skill: Optional[str] = None,
    skills: Optional[List[str]] = None,
    model: Optional[str] = None,
    provider: Optional[str] = None,
    base_url: Optional[str] = None,
    script: Optional[str] = None,
) -> Dict[str, Any]:

每个 job 包含以下核心字段:

字段说明示例
prompt发送给 agent 的指令"总结最近 24 小时的 GitHub Issues"
schedule调度配置{"kind": "cron", "expr": "0 9 * * *"}
deliver投递目标"origin", "telegram", "local"
origin创建来源(用于 origin 投递){"platform": "telegram", "chat_id": "123"}
skills预加载的技能列表["github-auth", "project-analyzer"]
model每 job 模型覆盖"claude-sonnet-4-20250514"
script数据收集脚本路径"check_prices.py"
repeat运行次数限制None(无限)或 1(一次性)

调度解析

parse_schedule()jobs.py:117-203)支持四种调度格式:

# jobs.py:117-203(简化)
def parse_schedule(schedule: str) -> Dict[str, Any]:
    """
    Parse schedule string into structured format.
    
    Examples:
        "30m"              → once in 30 minutes
        "every 30m"        → recurring every 30 minutes
        "0 9 * * *"        → cron expression
        "2026-02-03T14:00" → once at timestamp
    """
格式解析为示例
持续时间once(一次性)"30m" → 30 分钟后执行一次
every + 持续时间interval(周期)"every 2h" → 每 2 小时
Cron 表达式cron"0 9 * * *" → 每天早上 9 点
ISO 时间戳once(定时)"2026-02-03T14:00" → 指定时间

一次性 job 自动设置 repeat=1,完成后自动删除。

防重入的 At-Most-Once 语义

advance_next_run()jobs.py:627-652)在 job 执行之前next_run_at 推进到下一个周期:

# jobs.py:627-652
def advance_next_run(job_id: str) -> bool:
    """Preemptively advance next_run_at for a recurring job before execution.

    Call this BEFORE run_job() so that if the process crashes mid-execution,
    the job won't re-fire on the next gateway restart.  This converts the
    scheduler from at-least-once to at-most-once for recurring jobs.
    """

这是一个有意识的 trade-off:丢失一次运行比在崩溃循环中重复执行几十次要好。一次性 job 不受影响——它们可以在重启后重试。

过期 Job 快进

当 Gateway 宕机一段时间后重启,积压的 job 不会全部立即触发。get_due_jobs()jobs.py:655-731)实现了智能快进:

# jobs.py:700-724(简化)
grace = _compute_grace_seconds(schedule)
if kind in ("cron", "interval") and (now - next_run_dt).total_seconds() > grace:
    # Job missed its window — fast-forward to next future run
    new_next = compute_next_run(schedule, now.isoformat())
    logger.info("Job '%s' missed its scheduled time. Fast-forwarding to: %s",
                job.get("name"), new_next)
    continue  # Skip this run

Grace window 根据调度周期动态计算(_compute_grace_secondsjobs.py:252-281):周期的一半,限制在 120 秒到 2 小时之间。这意味着日任务在宕机 2 小时内重启可以补跑,但超过 2 小时就跳过。

调度器:scheduler.py

tick() 函数

tick()scheduler.py:811-901)是调度器的核心——Gateway 每 60 秒在后台线程调用一次:

# scheduler.py:811-825
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
    """Check and run all due jobs.
    
    Uses a file lock so only one tick runs at a time.
    """
    _LOCK_DIR.mkdir(parents=True, exist_ok=True)
    lock_fd = open(_LOCK_FILE, "w")
    if fcntl:
        fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)

文件锁(位于当前 HERMES_HOME/cron/.tick.lock)确保即使 Gateway 进程内的 ticker、独立守护进程和手动 hermes cron tick 同时运行,也只有一个 tick 实际执行。

run_job():执行单个 Job

run_job()scheduler.py:519-809)创建一个独立的 AIAgent 来执行 job:

# scheduler.py:648-669(简化)
agent = AIAgent(
    model=turn_route["model"],
    api_key=turn_route["runtime"].get("api_key"),
    base_url=turn_route["runtime"].get("base_url"),
    provider=turn_route["runtime"].get("provider"),
    max_iterations=max_iterations,
    disabled_toolsets=["cronjob", "messaging", "clarify"],
    quiet_mode=True,
    skip_memory=True,      # Cron 不使用记忆
    platform="cron",
    session_id=_cron_session_id,
    session_db=_session_db,
)

三个关键设计决策:

  1. 禁用 cronjob/messaging/clarify 工具集scheduler.py:664):cron job 不能自己创建新 job(防止递归),不能发送消息(由 _deliver_result 统一处理),不能请求用户确认(无人在线)

  2. skip_memory=Truescheduler.py:667):cron 的 system prompt 不包含用户记忆。因为 cron 任务的上下文和交互式对话不同,注入记忆会干扰 cron prompt 的意图

  3. Inactivity-based timeoutscheduler.py:675-742):job 不是用固定超时,而是用不活跃超时——只要 agent 持续在调用工具和 API,就可以运行数小时;但如果 API 挂起或工具卡住超过 600 秒无活动,就被终止

SILENT 标记

当 cron agent 判断没有新内容需要报告时,它可以返回 [SILENT]scheduler.py:53-54):

# scheduler.py:53-54
SILENT_MARKER = "[SILENT]"

调度器检测到 [SILENT] 后跳过投递(scheduler.py:872-874),但输出仍保存到本地文件用于审计。这避免了"每天 9 点推送一条'没有新内容'"的噪音。

数据收集脚本

_run_job_script()scheduler.py:351-426)在 agent 执行前运行一个 Python 脚本,将其输出注入 prompt:

# scheduler.py:370-382
scripts_dir = get_hermes_home() / "scripts"
raw = Path(script_path).expanduser()
if raw.is_absolute():
    path = raw.resolve()
else:
    path = (scripts_dir / raw).resolve()

# Guard against path traversal
try:
    path.relative_to(scripts_dir_resolved)
except ValueError:
    return False, "Blocked: script path resolves outside the scripts directory"

安全约束:脚本必须位于当前 HERMES_HOME/scripts/ 目录内(默认 profile 下通常表现为 ~/.hermes/scripts/)。路径遍历(../../etc/passwd)和符号链接逃逸都被检测和拒绝。脚本输出在注入 prompt 前经过 redact_sensitive_text() 脱敏。

投递路由

_deliver_result()scheduler.py:198-345)将 job 输出投递到目标平台。投递有两条路径:

  1. Live Adapter 路径scheduler.py:287-313):如果 Gateway 正在运行,通过已连接的平台适配器发送。这支持端到端加密(如 Matrix E2EE 房间)
  2. Standalone HTTP 路径scheduler.py:321-337):如果 Live Adapter 不可用,直接通过 HTTP API 发送。作为降级方案

Live Adapter 失败时自动回退到 Standalone 路径——双路径设计确保了投递的可靠性。

CLI 管理界面

hermes_cli/cron.py(290 行)提供了 cron 的命令行管理界面:

# hermes_cli/cron.py:253-290
def cron_command(args):
    """Handle cron subcommands."""
    subcmd = getattr(args, 'cron_command', None)
    if subcmd is None or subcmd == "list":
        cron_list(show_all)
    elif subcmd == "status":
        cron_status()
    elif subcmd in {"create", "add"}:
        return cron_create(args)
    elif subcmd == "edit":
        return cron_edit(args)
    elif subcmd == "pause":
        return _job_action("pause", args.job_id, "Paused")
    elif subcmd == "resume":
        return _job_action("resume", args.job_id, "Resumed")

支持的子命令:listcreateeditpauseresumerun(立即触发)、removestatustick(手动执行一次调度)。

cron_status()hermes_cli/cron.py:127-158)检查 Gateway 进程是否运行——如果没有,cron job 不会自动触发,需要提醒用户启动 Gateway。

批量运行:batch_runner.py

batch_runner.py(1287 行)是 Learning Loop 赌注的基础设施。它支持从数据集批量运行 agent 并收集训练轨迹:

# batch_runner.py:0-20(文档字符串)
"""
Batch Agent Runner

- Dataset loading and batching
- Parallel batch processing with multiprocessing
- Checkpointing for fault tolerance and resumption
- Trajectory saving in the proper format (from/value pairs)
- Tool usage statistics aggregation across all batches
"""

主要能力:

特性说明
并行处理multiprocessing.Pool 并行执行多个 prompt
检查点恢复中断后可从上次完成的批次继续
轨迹记录保存完整的对话轨迹(from/value 格式),用于 SFT/RLHF
工具统计跨批次汇总工具调用的成功/失败次数
工具集分布支持按概率分布随机分配工具集,增加训练数据多样性

RL 环境:environments/

environments/ 目录包含多个强化学习训练环境(environments/hermes_swe_env/environments/web_research_env.pyenvironments/terminal_test_env/ 等)。这些环境将 AIAgent 封装在 OpenAI Gym 风格的接口中,用于 RL 训练和评估。

environments/hermes_base_env.py 定义了基础环境类,提供 step() / reset() 接口。agent_loop.py 实现了 agent 在环境中的执行循环。

设计启示

拆解 Cron 和 Batch 子系统,可以提炼出三个设计原则:

  1. At-Most-Once 语义:通过在执行前推进 next_run_at,Cron 选择了"宁可漏跑一次也不重复执行"的策略。这对连接外部 API、发送消息等有副作用的任务至关重要

  2. 双路径投递:Live Adapter + Standalone HTTP 的双路径设计让投递既能利用 Gateway 的实时连接(E2EE 支持),又能在 Gateway 不完全可用时降级为独立 HTTP 调用

  3. 共享 AIAgent,差异化配置:Cron 使用和 CLI/Gateway 完全相同的 AIAgent,但通过 disabled_toolsetsskip_memoryquiet_mode 等参数定制行为。这比维护一个独立的"cron agent"更简单、更一致


设计赌注回扣:本章同时回扣了 Run AnywhereLearning Loop 两个赌注。Cron 子系统让 agent 在无人值守场景下自动执行任务并投递到任意平台(Run Anywhere)。Batch runner 和 RL environments 为 agent 的训练和评估提供基础设施(Learning Loop)。SILENT 标记和数据收集脚本也体现了 Hermes 对自动化场景的深入思考。


版本演化说明

本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。 Cron 和 batch 基础设施在 v0.3.0 之前就已经出现;v0.3.0-v0.8.0 之间逐步补齐了调度形式、投递目标、job 状态和守护逻辑。可以明确确认的是,script 预执行数据收集字段属于 v0.8.0 发布窗口的新能力。