Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Gateway:把同一个 Agent 投射到 17 个平台

本章核心源码gateway/run.py(7620 行)、gateway/session.py(1081 行)、gateway/platforms/base.py(1696 行)、gateway/config.py(957 行)

定位:本章拆解 Hermes 的 Gateway 子系统——如何通过一个 BasePlatformAdapter ABC 将同一个 AIAgent 投射到 Telegram、Discord、Slack、WhatsApp 等 17 个平台类型,以及 session 管理如何在多平台场景下保持一致。 前置依赖:第 4 章(AIAgent 内核)、第 13 章(CLI/TUI)。适用场景:想理解 Gateway 的架构,或准备接入新的消息平台。

为什么需要 Gateway

CLI 是 Hermes 的第一等入口,但不是唯一入口。当你希望在手机上通过 Telegram 和 agent 对话,或者在团队 Slack 频道中共享 agent 能力,你需要的不是另一个 agent 实现,而是一个协议适配层

Gateway 解决三个问题:

  1. 协议桥接:将 Telegram 的 Bot API、Discord 的 WebSocket、Slack 的 Events API 等不同协议统一为 MessageEvent → AIAgent → SendResult 的标准流程
  2. 会话管理:在无状态的消息平台上维护有状态的对话——session 创建、过期重置、跨平台隔离
  3. 持久运行:作为 systemd 服务或后台进程持续运行,不需要用户保持终端连接

Gateway 架构总览

graph TB
    subgraph "消息平台"
        T["Telegram Bot API"]
        D["Discord WebSocket"]
        S["Slack Events API"]
        W["WhatsApp Cloud API"]
        SG["Signal"]
        MX["Matrix"]
        MM["Mattermost"]
        HA["Home Assistant"]
        DT["DingTalk"]
        FS["Feishu"]
        WC["WeCom"]
        WX["Weixin"]
        EM["Email IMAP"]
        SM["SMS Twilio"]
        WH["Webhook"]
        AS["API Server"]
    end
    
    subgraph "Gateway Layer"
        BA["BasePlatformAdapter ABC<br/>base.py:470"]
        GR["GatewayRunner<br/>run.py:461"]
        SS["SessionStore<br/>session.py:503"]
    end
    
    subgraph "Agent Layer"
        AI["AIAgent<br/>run_agent.py"]
    end
    
    T --> BA
    D --> BA
    S --> BA
    W --> BA
    SG --> BA
    MX --> BA
    MM --> BA
    HA --> BA
    DT --> BA
    FS --> BA
    WC --> BA
    WX --> BA
    EM --> BA
    SM --> BA
    WH --> BA
    AS --> BA
    
    BA --> GR
    GR --> SS
    GR --> AI

BasePlatformAdapter:17 个平台类型的公约数

BasePlatformAdaptergateway/platforms/base.py:470)是所有平台适配器的抽象基类。它定义了平台适配器必须实现的三个核心方法:

# gateway/platforms/base.py:470-632
class BasePlatformAdapter(ABC):
    """Base class for platform adapters."""
    
    def __init__(self, config: PlatformConfig, platform: Platform):
        self.config = config
        self.platform = platform
        self._message_handler: Optional[MessageHandler] = None
        self._running = False
        self._active_sessions: Dict[str, asyncio.Event] = {}
        self._pending_messages: Dict[str, MessageEvent] = {}
        self._background_tasks: set[asyncio.Task] = set()
    
    @abstractmethod
    async def connect(self) -> bool:
        """Connect to the platform and start receiving messages."""
        pass
    
    @abstractmethod
    async def disconnect(self) -> None:
        """Disconnect from the platform."""
        pass
    
    @abstractmethod
    async def send(self, chat_id: str, content: str,
                   reply_to: Optional[str] = None,
                   metadata: Optional[Dict[str, Any]] = None) -> SendResult:
        """Send a message to a chat."""
        pass

可选方法与渐进增强

除了三个必须实现的抽象方法,BasePlatformAdapter 还定义了一系列可选方法,每个都有合理的默认行为:

方法默认行为覆盖后效果
send_typing()无操作显示"正在输入..."指示器
edit_message()返回 success=False流式编辑消息
send_image()将 URL 作为文本发送原生图片附件
send_voice()将路径作为文本发送原生语音消息
send_video()将路径作为文本发送原生视频播放
send_document()将路径作为文本发送原生文件下载
send_image_file()将路径作为文本发送本地图片附件
send_animation()委托给 send_image()GIF 自动播放

这个设计让新平台可以用最少的代码启动(只需 connect + disconnect + send),然后逐步增强富媒体能力。

消息标准化

所有平台的入站消息都被转换为 MessageEventbase.py:380-433):

# gateway/platforms/base.py:380-411
@dataclass
class MessageEvent:
    text: str
    message_type: MessageType = MessageType.TEXT
    source: SessionSource = None
    raw_message: Any = None
    message_id: Optional[str] = None
    media_urls: List[str] = field(default_factory=list)
    media_types: List[str] = field(default_factory=list)
    reply_to_message_id: Optional[str] = None
    reply_to_text: Optional[str] = None
    auto_skill: Optional[str] = None
    timestamp: datetime = field(default_factory=datetime.now)

MessageType 枚举(base.py:367-378)涵盖了所有消息类型:TEXT、LOCATION、PHOTO、VIDEO、AUDIO、VOICE、DOCUMENT、STICKER、COMMAND。

媒体缓存

消息平台的媒体 URL 通常是临时的(如 Telegram 的文件 URL 一小时后过期)。BasePlatformAdapter 提供了三套缓存工具(base.py:84-364):

  • 图片缓存cache_image_from_url()base.py:112)——下载并缓存到当前 HERMES_HOME/cache/images/(默认 profile 下表现为 ~/.hermes/cache/images/),带重试和 SSRF 防护
  • 音频缓存cache_audio_from_url()base.py:228)——同样模式,用于语音消息的 STT 转写
  • 文档缓存cache_document_from_bytes()base.py:314)——文件名净化 + 路径遍历防护

缓存文件定期清理(cleanup_image_cache(max_age_hours=24)),防止磁盘膨胀。

发送重试

_RETRYABLE_ERROR_PATTERNSbase.py:453-463)定义了可重试的错误模式:

# gateway/platforms/base.py:453-463
_RETRYABLE_ERROR_PATTERNS = (
    "connecterror", "connectionerror", "connectionreset",
    "connectionrefused", "connecttimeout", "network",
    "broken pipe", "remotedisconnected", "eoferror",
)

注意,普通的 read/write timeout 不在重试列表中——因为非幂等操作(如 send_message)在超时时可能已经到达服务器,重试会导致重复发送。只有 connect timeout(连接未建立)才安全重试。平台适配器可以通过设置 SendResult.retryable = True 显式标记安全重试的场景。

Typing 指示器

_keep_typing() 方法(base.py:957-986)持续发送 typing 指示器:

# gateway/platforms/base.py:957-975
async def _keep_typing(self, chat_id: str, interval: float = 2.0, metadata=None):
    """Continuously send typing indicator until cancelled."""
    try:
        while True:
            if chat_id not in self._typing_paused:
                await self.send_typing(chat_id, metadata=metadata)
            await asyncio.sleep(interval)
    except asyncio.CancelledError:
        pass

Telegram/Discord 的 typing 状态 5 秒后自动消失,所以需要每 2 秒刷新一次。_typing_paused 集合在危险命令审批等待期间暂停 typing——对 Slack 尤其重要,因为 Slack 的 assistant_threads_setStatus 会禁用用户的输入框。

17 个平台类型

Gateway 当前支持的 17 个 Platform 枚举值定义在 gateway/config.py:47-66

# gateway/config.py:47-66
class Platform(Enum):
    LOCAL = "local"
    TELEGRAM = "telegram"
    DISCORD = "discord"
    WHATSAPP = "whatsapp"
    SLACK = "slack"
    SIGNAL = "signal"
    MATTERMOST = "mattermost"
    MATRIX = "matrix"
    HOMEASSISTANT = "homeassistant"
    EMAIL = "email"
    SMS = "sms"
    DINGTALK = "dingtalk"
    API_SERVER = "api_server"
    WEBHOOK = "webhook"
    FEISHU = "feishu"
    WECOM = "wecom"
    WEIXIN = "weixin"

每个平台都有一个对应的适配器文件(gateway/platforms/ 目录)。LOCAL 代表本地 CLI 终端——它不是消息平台,而是用于统一 session 管理的虚拟平台。

注意 WEIXINWECOM 的区别:WECOM 是企业微信(WeCom),面向企业内部通信,通过企业微信 API 接入;WEIXIN 是个人微信(WeChat),面向个人用户,通过独立的个人微信适配器(gateway/platforms/weixin.py)接入。两者的协议、认证方式和消息格式完全不同。

Matrix 适配器:mautrix 迁移与 E2EE

Matrix 适配器(gateway/platforms/matrix.py)已从 matrix-nio 重写为 mautrix-pythonmatrix.py:4-5)。这次迁移带来了几个重要改进:

  • 可选的端到端加密(E2EE):支持持久化的 crypto state,加密会话在 Gateway 重启后无需重新验证
  • 防 bot 循环:适配器忽略 m.notice 类型的消息,防止 bot-to-bot 的消息回环——这在多个 bot 共存的 Matrix room 中尤为重要

Weixin 适配器:个人微信的接入架构

gateway/platforms/weixin.py(1669 行)是 Gateway 中最新也是最独特的适配器之一。它通过腾讯的 iLink Bot API 接入个人微信账号,与企业微信(WeCom)的接入方式完全不同。

认证:QR 码扫码登录

个人微信没有 OAuth 或 API Key 认证。Hermes 实现了完整的 QR 码交互登录流程(weixin.py:839):

sequenceDiagram
    participant U as 用户手机微信
    participant H as hermes gateway setup
    participant I as iLink Bot API

    H->>I: GET /ilink/bot/get_bot_qrcode
    I-->>H: 返回 qrcode + qrcode_img_content
    H->>H: 终端打印 QR 码(qrcode 库)
    U->>I: 手机扫码
    H->>I: 轮询 /ilink/bot/get_qrcode_status
    I-->>H: status=scaned
    Note over U: 用户在手机上确认
    I-->>H: status=success + token + account_id
    H->>H: 保存凭据到 ~/.hermes/weixin/accounts/

扫码状态轮询支持超时(480 秒)、二维码过期自动刷新(最多 3 次)、以及跨区域重定向(scaned_but_redirect 状态切换 API 端点)。

消息收发:Long-poll + context_token

Weixin 适配器使用 long-poll 模式接收消息(weixin.py:1078getupdates 端点,35 秒超时)。每条入站消息携带一个 context_token,出站回复时必须回传当前对话对象的最新 context_token——这是 iLink Bot API 的核心约束。

ContextTokenStore(每个账号独立存储)负责维护 {sender_id → context_token} 映射,确保并发对话不会混淆 token。

媒体处理:AES-128-ECB 加密 CDN

微信的媒体文件(图片、视频、文件、语音)通过加密 CDN 传输。适配器实现了完整的下载-解密流程(weixin.py:537):

  1. 从消息中提取 encrypted_query_param 和 AES key
  2. 从微信 CDN(novac2c.cdn.weixin.qq.com)下载加密数据
  3. 使用 AES-128-ECB 解密(_aes128_ecb_decryptweixin.py:144
  4. 缓存到本地临时文件,传递给 agent 处理

上传方向类似:先请求上传 URL,加密后上传到 CDN,获取 encrypted_param 用于消息引用。

安全策略

# weixin.py:984-993
self._dm_policy = "open"              # DM 策略:open / allowlist / disabled
self._group_policy = "disabled"       # 群聊默认禁用(安全保守)
self._allow_from = [...]              # DM 白名单
self._group_allow_from = [...]        # 群聊白名单

默认 DM 开放但群聊禁用——这是个人微信场景的合理默认。群聊环境中 bot 被大量 @mention 可能导致 API 成本失控。

Token 冲突保护

连接时使用 acquire_scoped_lock("weixin-bot-token", ...) 防止同一个微信 token 被多个 Gateway 进程同时使用(weixin.py:1029-1046)。iLink Bot API 的 long-poll 连接是排他性的,多个进程争抢会导致消息丢失。

GatewayRunner:生命周期管理

GatewayRunnergateway/run.py:461)是 Gateway 的主控制器,管理所有平台适配器的生命周期和消息路由。

初始化

# gateway/run.py:461-569
class GatewayRunner:
    def __init__(self, config: Optional[GatewayConfig] = None):
        self.config = config or load_gateway_config()
        self.adapters: Dict[Platform, BasePlatformAdapter] = {}
        
        # Session 管理
        self.session_store = SessionStore(
            self.config.sessions_dir, self.config,
            has_active_processes_fn=lambda key: process_registry.has_active_for_session(key),
        )
        
        # Agent 缓存——保留 prompt cache
        self._agent_cache: Dict[str, tuple] = {}
        
        # DM 配对存储
        from gateway.pairing import PairingStore
        self.pairing_store = PairingStore()

三个关键设计决策:

  1. Agent 缓存run.py:506-513):_agent_cache 按 session key 缓存 AIAgent 实例。没有这个缓存,每条消息都会创建新的 AIAgent,重建 system prompt——打破 Anthropic 的 prompt caching,成本增加约 10 倍

  2. Process Registry 集成run.py:488-492):SessionStore 接收一个 has_active_processes_fn 回调,用于检查 session 是否有活跃的后台进程。有活跃进程的 session 不会被自动重置

  3. DM 配对run.py:557-559):未授权用户发送 DM 时,系统生成一个配对码。用户在 CLI 输入配对码完成授权,此后该平台的 DM 才会被路由到 agent

Agent 缓存的失败运行保护

_agent_cache 有一个重要的改进:failed-run no-evict gating。当一次 agent 运行失败(例如模型不可用、fallback 错误等)时,缓存的 agent 实例不会被驱逐。这看似违反直觉——失败了为什么要保留?原因是 MCP 重启循环问题:如果驱逐了缓存的 agent,下一条消息会重建 AIAgent,重新初始化所有 MCP server 连接。如果 MCP server 本身就是导致失败的原因(如超时或不可用),重新初始化只会再次触发失败,形成"失败→驱逐→重建→失败"的循环。保留缓存的 agent 实例让下一次消息可以跳过重建,直接使用已有的(可能部分可用的)agent。

优雅重启:drain-and-restart

Gateway 支持不停机重启——不是传统的 kill-and-relaunch,而是 drain-and-restart 模式(gateway/restart.py + gateway/run.py:491-532)。

当收到重启请求时,Gateway 进入 _draining 状态:

  1. 设置 draining 标志:新到达的消息被拒绝或延迟,正在处理的消息允许完成
  2. 等待 drain 超时restart_drain_timeout 控制等待在途请求完成的最大时间
  3. detached restart:当前进程以退出码 75EX_TEMPFAIL,POSIX 标准的"临时失败")退出。外部进程管理器(如 systemd)看到这个退出码后知道应该立即重启服务,而非按指数退避延迟重启
  4. 服务自请求重启:Gateway 可以在进程内部触发自身的重启(例如配置变更后),通过 restart.py 中的 detached restart 逻辑实现,不依赖外部信号

这个设计让 Gateway 在配置变更、代码更新或 MCP server 重新初始化时能零丢消息地完成重启。退出码 75 的选择也值得注意——它区分于退出码 0(正常退出,systemd 不会重启)和退出码 1(异常退出,systemd 会按配置的 RestartSec 延迟重启)。

Session 管理

SessionSource:消息来源

SessionSourcegateway/session.py:72-154)描述一条消息的来源:

# gateway/session.py:72-91
@dataclass
class SessionSource:
    platform: Platform
    chat_id: str
    chat_name: Optional[str] = None
    chat_type: str = "dm"  # "dm", "group", "channel", "thread"
    user_id: Optional[str] = None
    user_name: Optional[str] = None
    thread_id: Optional[str] = None
    chat_topic: Optional[str] = None
    user_id_alt: Optional[str] = None  # Signal UUID
    chat_id_alt: Optional[str] = None  # Signal group internal ID

Session Key 构建

build_session_key()gateway/session.py:444-500)是 session 隔离的核心逻辑:

# gateway/session.py:444-500(简化)
def build_session_key(source: SessionSource,
                      group_sessions_per_user: bool = True,
                      thread_sessions_per_user: bool = False) -> str:
    platform = source.platform.value
    if source.chat_type == "dm":
        if source.chat_id:
            if source.thread_id:
                return f"agent:main:{platform}:dm:{source.chat_id}:{source.thread_id}"
            return f"agent:main:{platform}:dm:{source.chat_id}"
        return f"agent:main:{platform}:dm"
    
    # Group/channel: optionally isolate per user
    key_parts = ["agent:main", platform, source.chat_type]
    if source.chat_id:
        key_parts.append(source.chat_id)
    if source.thread_id:
        key_parts.append(source.thread_id)
    
    # Thread sessions default to shared (all participants share context)
    isolate_user = group_sessions_per_user
    if source.thread_id and not thread_sessions_per_user:
        isolate_user = False
    if isolate_user and participant_id:
        key_parts.append(str(participant_id))
    
    return ":".join(key_parts)

关键规则:

  • DM:每个 chat_id(+ 可选的 thread_id)一个 session
  • 群组/频道:默认按用户隔离(每个用户独立上下文)
  • 线程:默认共享(所有参与者看到同一个对话),可配置为按用户隔离

Session 重置策略

SessionResetPolicygateway/config.py:95-135)控制 session 何时被重置:

# gateway/config.py:95-109
@dataclass
class SessionResetPolicy:
    mode: str = "both"         # "daily", "idle", "both", "none"
    at_hour: int = 4           # 每日重置时间(0-23 小时)
    idle_minutes: int = 1440   # 空闲超时(默认 24 小时)
    notify: bool = True        # 重置时通知用户
    notify_exclude_platforms: tuple = ("api_server", "webhook")

四种模式:

模式触发条件适用场景
daily每天 at_hour 时重置日报类助手
idle空闲超过 idle_minutes 后重置一般对话
bothdaily 或 idle 任一触发(默认)推荐默认
none永不自动重置由压缩器管理上下文

SessionStore._should_reset()gateway/session.py:626-668)在每次消息到达时评估重置策略:

# gateway/session.py:649-668(简化)
def _should_reset(self, entry: SessionEntry, source: SessionSource) -> Optional[str]:
    policy = self.config.get_reset_policy(
        platform=source.platform,
        session_type=source.chat_type
    )
    if policy.mode == "none":
        return None
    
    now = _now()
    if policy.mode in ("idle", "both"):
        idle_deadline = entry.updated_at + timedelta(minutes=policy.idle_minutes)
        if now > idle_deadline:
            return "idle"
    
    if policy.mode in ("daily", "both"):
        today_reset = now.replace(hour=policy.at_hour, minute=0, second=0)
        if now.hour < policy.at_hour:
            today_reset -= timedelta(days=1)
        if entry.updated_at < today_reset:
            return "daily"
    
    return None

PII 脱敏

redact_pii=True 时,Gateway 在构建 system prompt 之前对用户 ID 和聊天 ID 进行哈希脱敏(gateway/session.py:34-57):

# gateway/session.py:37-39
def _hash_id(value: str) -> str:
    """Deterministic 12-char hex hash of an identifier."""
    return hashlib.sha256(value.encode("utf-8")).hexdigest()[:12]

脱敏仅对 _PII_SAFE_PLATFORMS(WhatsApp、Signal、Telegram)生效(session.py:191-198)。Discord 被排除在外,因为 Discord 的 mention 格式 <@user_id> 需要真实 ID——如果脱敏,LLM 就无法正确 @ 用户。

SessionContext 与 System Prompt 注入

build_session_context_prompt()gateway/session.py:202-340)构建注入 agent system prompt 的上下文信息:

# gateway/session.py:202-206
def build_session_context_prompt(
    context: SessionContext,
    *,
    redact_pii: bool = False,
) -> str:
    """Build the dynamic system prompt section that tells the agent about its context."""

注入的信息包括:

  • 消息来源平台和描述
  • 频道主题(为 agent 提供频道用途上下文)
  • 是否为多用户线程(session.py:263-271
  • 平台行为限制(如 Slack/Discord 不能搜索频道历史)
  • 已连接的平台列表
  • 各平台的 home channel
  • Cron 任务的投递选项

设计启示

Gateway 子系统的设计可以提炼出三个原则:

  1. 公约数 ABC + 渐进增强BasePlatformAdapter 的三个抽象方法(connect/disconnect/send)是所有平台的最小公约数,可选方法(send_image/send_voice/edit_message 等)允许平台按能力渐进增强。新平台用 50 行代码就能启动。Weixin 适配器的加入再次验证了这个模式的可扩展性

  2. Session 隔离策略是可配置的:DM/群组/线程各有不同的隔离规则,重置策略有四种模式。这些不是硬编码的——因为不同的使用场景(个人助手 vs 团队 bot)需要不同的隔离语义

  3. 脱敏与平台行为耦合:PII 脱敏不是全局开关,而是根据平台特性选择性应用。这种务实的做法避免了"脱敏后功能损坏"的问题


设计赌注回扣:本章是 Run Anywhere 赌注的核心体现。17 个平台类型通过一个 ABC 共享同一个 AIAgent,session 管理策略让同一个 agent 既能做私人助手(DM 隔离),又能做团队 bot(线程共享)。PII 脱敏和平台行为注入也回扣了 Personal Long-Term 赌注——agent 需要知道自己在哪个平台、和谁对话,才能提供个性化服务。


版本演化说明

本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。 Gateway 与 BasePlatformAdapter 的基础骨架早于 v0.3.0 就已出现;之后 v0.3.0-v0.8.0 之间持续扩张平台覆盖,并不断补充 session 重置、脱敏和投递行为。_agent_cache 可以明确定位到 v0.4.0 发布窗口,它是 prompt cache 相关优化的重要节点。v0.8.0 阶段新增了 drain-and-restart 优雅重启机制、agent cache 的 failed-run no-evict 保护、Matrix mautrix 迁移(含可选 E2EE)以及个人微信(Weixin)适配器,平台总数从 16 增至 17。