Gateway:把同一个 Agent 投射到 17 个平台
本章核心源码:
gateway/run.py(7620 行)、gateway/session.py(1081 行)、gateway/platforms/base.py(1696 行)、gateway/config.py(957 行)
定位:本章拆解 Hermes 的 Gateway 子系统——如何通过一个
BasePlatformAdapterABC 将同一个 AIAgent 投射到 Telegram、Discord、Slack、WhatsApp 等 17 个平台类型,以及 session 管理如何在多平台场景下保持一致。 前置依赖:第 4 章(AIAgent 内核)、第 13 章(CLI/TUI)。适用场景:想理解 Gateway 的架构,或准备接入新的消息平台。
为什么需要 Gateway
CLI 是 Hermes 的第一等入口,但不是唯一入口。当你希望在手机上通过 Telegram 和 agent 对话,或者在团队 Slack 频道中共享 agent 能力,你需要的不是另一个 agent 实现,而是一个协议适配层。
Gateway 解决三个问题:
- 协议桥接:将 Telegram 的 Bot API、Discord 的 WebSocket、Slack 的 Events API 等不同协议统一为
MessageEvent → AIAgent → SendResult的标准流程 - 会话管理:在无状态的消息平台上维护有状态的对话——session 创建、过期重置、跨平台隔离
- 持久运行:作为 systemd 服务或后台进程持续运行,不需要用户保持终端连接
Gateway 架构总览
graph TB
subgraph "消息平台"
T["Telegram Bot API"]
D["Discord WebSocket"]
S["Slack Events API"]
W["WhatsApp Cloud API"]
SG["Signal"]
MX["Matrix"]
MM["Mattermost"]
HA["Home Assistant"]
DT["DingTalk"]
FS["Feishu"]
WC["WeCom"]
WX["Weixin"]
EM["Email IMAP"]
SM["SMS Twilio"]
WH["Webhook"]
AS["API Server"]
end
subgraph "Gateway Layer"
BA["BasePlatformAdapter ABC<br/>base.py:470"]
GR["GatewayRunner<br/>run.py:461"]
SS["SessionStore<br/>session.py:503"]
end
subgraph "Agent Layer"
AI["AIAgent<br/>run_agent.py"]
end
T --> BA
D --> BA
S --> BA
W --> BA
SG --> BA
MX --> BA
MM --> BA
HA --> BA
DT --> BA
FS --> BA
WC --> BA
WX --> BA
EM --> BA
SM --> BA
WH --> BA
AS --> BA
BA --> GR
GR --> SS
GR --> AI
BasePlatformAdapter:17 个平台类型的公约数
BasePlatformAdapter(gateway/platforms/base.py:470)是所有平台适配器的抽象基类。它定义了平台适配器必须实现的三个核心方法:
# gateway/platforms/base.py:470-632
class BasePlatformAdapter(ABC):
"""Base class for platform adapters."""
def __init__(self, config: PlatformConfig, platform: Platform):
self.config = config
self.platform = platform
self._message_handler: Optional[MessageHandler] = None
self._running = False
self._active_sessions: Dict[str, asyncio.Event] = {}
self._pending_messages: Dict[str, MessageEvent] = {}
self._background_tasks: set[asyncio.Task] = set()
@abstractmethod
async def connect(self) -> bool:
"""Connect to the platform and start receiving messages."""
pass
@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from the platform."""
pass
@abstractmethod
async def send(self, chat_id: str, content: str,
reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> SendResult:
"""Send a message to a chat."""
pass
可选方法与渐进增强
除了三个必须实现的抽象方法,BasePlatformAdapter 还定义了一系列可选方法,每个都有合理的默认行为:
| 方法 | 默认行为 | 覆盖后效果 |
|---|---|---|
send_typing() | 无操作 | 显示"正在输入..."指示器 |
edit_message() | 返回 success=False | 流式编辑消息 |
send_image() | 将 URL 作为文本发送 | 原生图片附件 |
send_voice() | 将路径作为文本发送 | 原生语音消息 |
send_video() | 将路径作为文本发送 | 原生视频播放 |
send_document() | 将路径作为文本发送 | 原生文件下载 |
send_image_file() | 将路径作为文本发送 | 本地图片附件 |
send_animation() | 委托给 send_image() | GIF 自动播放 |
这个设计让新平台可以用最少的代码启动(只需 connect + disconnect + send),然后逐步增强富媒体能力。
消息标准化
所有平台的入站消息都被转换为 MessageEvent(base.py:380-433):
# gateway/platforms/base.py:380-411
@dataclass
class MessageEvent:
text: str
message_type: MessageType = MessageType.TEXT
source: SessionSource = None
raw_message: Any = None
message_id: Optional[str] = None
media_urls: List[str] = field(default_factory=list)
media_types: List[str] = field(default_factory=list)
reply_to_message_id: Optional[str] = None
reply_to_text: Optional[str] = None
auto_skill: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.now)
MessageType 枚举(base.py:367-378)涵盖了所有消息类型:TEXT、LOCATION、PHOTO、VIDEO、AUDIO、VOICE、DOCUMENT、STICKER、COMMAND。
媒体缓存
消息平台的媒体 URL 通常是临时的(如 Telegram 的文件 URL 一小时后过期)。BasePlatformAdapter 提供了三套缓存工具(base.py:84-364):
- 图片缓存:
cache_image_from_url()(base.py:112)——下载并缓存到当前HERMES_HOME/cache/images/(默认 profile 下表现为~/.hermes/cache/images/),带重试和 SSRF 防护 - 音频缓存:
cache_audio_from_url()(base.py:228)——同样模式,用于语音消息的 STT 转写 - 文档缓存:
cache_document_from_bytes()(base.py:314)——文件名净化 + 路径遍历防护
缓存文件定期清理(cleanup_image_cache(max_age_hours=24)),防止磁盘膨胀。
发送重试
_RETRYABLE_ERROR_PATTERNS(base.py:453-463)定义了可重试的错误模式:
# gateway/platforms/base.py:453-463
_RETRYABLE_ERROR_PATTERNS = (
"connecterror", "connectionerror", "connectionreset",
"connectionrefused", "connecttimeout", "network",
"broken pipe", "remotedisconnected", "eoferror",
)
注意,普通的 read/write timeout 不在重试列表中——因为非幂等操作(如 send_message)在超时时可能已经到达服务器,重试会导致重复发送。只有 connect timeout(连接未建立)才安全重试。平台适配器可以通过设置 SendResult.retryable = True 显式标记安全重试的场景。
Typing 指示器
_keep_typing() 方法(base.py:957-986)持续发送 typing 指示器:
# gateway/platforms/base.py:957-975
async def _keep_typing(self, chat_id: str, interval: float = 2.0, metadata=None):
"""Continuously send typing indicator until cancelled."""
try:
while True:
if chat_id not in self._typing_paused:
await self.send_typing(chat_id, metadata=metadata)
await asyncio.sleep(interval)
except asyncio.CancelledError:
pass
Telegram/Discord 的 typing 状态 5 秒后自动消失,所以需要每 2 秒刷新一次。_typing_paused 集合在危险命令审批等待期间暂停 typing——对 Slack 尤其重要,因为 Slack 的 assistant_threads_setStatus 会禁用用户的输入框。
17 个平台类型
Gateway 当前支持的 17 个 Platform 枚举值定义在 gateway/config.py:47-66:
# gateway/config.py:47-66
class Platform(Enum):
LOCAL = "local"
TELEGRAM = "telegram"
DISCORD = "discord"
WHATSAPP = "whatsapp"
SLACK = "slack"
SIGNAL = "signal"
MATTERMOST = "mattermost"
MATRIX = "matrix"
HOMEASSISTANT = "homeassistant"
EMAIL = "email"
SMS = "sms"
DINGTALK = "dingtalk"
API_SERVER = "api_server"
WEBHOOK = "webhook"
FEISHU = "feishu"
WECOM = "wecom"
WEIXIN = "weixin"
每个平台都有一个对应的适配器文件(gateway/platforms/ 目录)。LOCAL 代表本地 CLI 终端——它不是消息平台,而是用于统一 session 管理的虚拟平台。
注意 WEIXIN 和 WECOM 的区别:WECOM 是企业微信(WeCom),面向企业内部通信,通过企业微信 API 接入;WEIXIN 是个人微信(WeChat),面向个人用户,通过独立的个人微信适配器(gateway/platforms/weixin.py)接入。两者的协议、认证方式和消息格式完全不同。
Matrix 适配器:mautrix 迁移与 E2EE
Matrix 适配器(gateway/platforms/matrix.py)已从 matrix-nio 重写为 mautrix-python(matrix.py:4-5)。这次迁移带来了几个重要改进:
- 可选的端到端加密(E2EE):支持持久化的 crypto state,加密会话在 Gateway 重启后无需重新验证
- 防 bot 循环:适配器忽略
m.notice类型的消息,防止 bot-to-bot 的消息回环——这在多个 bot 共存的 Matrix room 中尤为重要
Weixin 适配器:个人微信的接入架构
gateway/platforms/weixin.py(1669 行)是 Gateway 中最新也是最独特的适配器之一。它通过腾讯的 iLink Bot API 接入个人微信账号,与企业微信(WeCom)的接入方式完全不同。
认证:QR 码扫码登录
个人微信没有 OAuth 或 API Key 认证。Hermes 实现了完整的 QR 码交互登录流程(weixin.py:839):
sequenceDiagram
participant U as 用户手机微信
participant H as hermes gateway setup
participant I as iLink Bot API
H->>I: GET /ilink/bot/get_bot_qrcode
I-->>H: 返回 qrcode + qrcode_img_content
H->>H: 终端打印 QR 码(qrcode 库)
U->>I: 手机扫码
H->>I: 轮询 /ilink/bot/get_qrcode_status
I-->>H: status=scaned
Note over U: 用户在手机上确认
I-->>H: status=success + token + account_id
H->>H: 保存凭据到 ~/.hermes/weixin/accounts/
扫码状态轮询支持超时(480 秒)、二维码过期自动刷新(最多 3 次)、以及跨区域重定向(scaned_but_redirect 状态切换 API 端点)。
消息收发:Long-poll + context_token
Weixin 适配器使用 long-poll 模式接收消息(weixin.py:1078,getupdates 端点,35 秒超时)。每条入站消息携带一个 context_token,出站回复时必须回传当前对话对象的最新 context_token——这是 iLink Bot API 的核心约束。
ContextTokenStore(每个账号独立存储)负责维护 {sender_id → context_token} 映射,确保并发对话不会混淆 token。
媒体处理:AES-128-ECB 加密 CDN
微信的媒体文件(图片、视频、文件、语音)通过加密 CDN 传输。适配器实现了完整的下载-解密流程(weixin.py:537):
- 从消息中提取
encrypted_query_param和 AES key - 从微信 CDN(
novac2c.cdn.weixin.qq.com)下载加密数据 - 使用 AES-128-ECB 解密(
_aes128_ecb_decrypt,weixin.py:144) - 缓存到本地临时文件,传递给 agent 处理
上传方向类似:先请求上传 URL,加密后上传到 CDN,获取 encrypted_param 用于消息引用。
安全策略
# weixin.py:984-993
self._dm_policy = "open" # DM 策略:open / allowlist / disabled
self._group_policy = "disabled" # 群聊默认禁用(安全保守)
self._allow_from = [...] # DM 白名单
self._group_allow_from = [...] # 群聊白名单
默认 DM 开放但群聊禁用——这是个人微信场景的合理默认。群聊环境中 bot 被大量 @mention 可能导致 API 成本失控。
Token 冲突保护
连接时使用 acquire_scoped_lock("weixin-bot-token", ...) 防止同一个微信 token 被多个 Gateway 进程同时使用(weixin.py:1029-1046)。iLink Bot API 的 long-poll 连接是排他性的,多个进程争抢会导致消息丢失。
GatewayRunner:生命周期管理
GatewayRunner(gateway/run.py:461)是 Gateway 的主控制器,管理所有平台适配器的生命周期和消息路由。
初始化
# gateway/run.py:461-569
class GatewayRunner:
def __init__(self, config: Optional[GatewayConfig] = None):
self.config = config or load_gateway_config()
self.adapters: Dict[Platform, BasePlatformAdapter] = {}
# Session 管理
self.session_store = SessionStore(
self.config.sessions_dir, self.config,
has_active_processes_fn=lambda key: process_registry.has_active_for_session(key),
)
# Agent 缓存——保留 prompt cache
self._agent_cache: Dict[str, tuple] = {}
# DM 配对存储
from gateway.pairing import PairingStore
self.pairing_store = PairingStore()
三个关键设计决策:
-
Agent 缓存(
run.py:506-513):_agent_cache按 session key 缓存 AIAgent 实例。没有这个缓存,每条消息都会创建新的 AIAgent,重建 system prompt——打破 Anthropic 的 prompt caching,成本增加约 10 倍 -
Process Registry 集成(
run.py:488-492):SessionStore 接收一个has_active_processes_fn回调,用于检查 session 是否有活跃的后台进程。有活跃进程的 session 不会被自动重置 -
DM 配对(
run.py:557-559):未授权用户发送 DM 时,系统生成一个配对码。用户在 CLI 输入配对码完成授权,此后该平台的 DM 才会被路由到 agent
Agent 缓存的失败运行保护
_agent_cache 有一个重要的改进:failed-run no-evict gating。当一次 agent 运行失败(例如模型不可用、fallback 错误等)时,缓存的 agent 实例不会被驱逐。这看似违反直觉——失败了为什么要保留?原因是 MCP 重启循环问题:如果驱逐了缓存的 agent,下一条消息会重建 AIAgent,重新初始化所有 MCP server 连接。如果 MCP server 本身就是导致失败的原因(如超时或不可用),重新初始化只会再次触发失败,形成"失败→驱逐→重建→失败"的循环。保留缓存的 agent 实例让下一次消息可以跳过重建,直接使用已有的(可能部分可用的)agent。
优雅重启:drain-and-restart
Gateway 支持不停机重启——不是传统的 kill-and-relaunch,而是 drain-and-restart 模式(gateway/restart.py + gateway/run.py:491-532)。
当收到重启请求时,Gateway 进入 _draining 状态:
- 设置 draining 标志:新到达的消息被拒绝或延迟,正在处理的消息允许完成
- 等待 drain 超时:
restart_drain_timeout控制等待在途请求完成的最大时间 - detached restart:当前进程以退出码 75(
EX_TEMPFAIL,POSIX 标准的"临时失败")退出。外部进程管理器(如 systemd)看到这个退出码后知道应该立即重启服务,而非按指数退避延迟重启 - 服务自请求重启:Gateway 可以在进程内部触发自身的重启(例如配置变更后),通过
restart.py中的 detached restart 逻辑实现,不依赖外部信号
这个设计让 Gateway 在配置变更、代码更新或 MCP server 重新初始化时能零丢消息地完成重启。退出码 75 的选择也值得注意——它区分于退出码 0(正常退出,systemd 不会重启)和退出码 1(异常退出,systemd 会按配置的 RestartSec 延迟重启)。
Session 管理
SessionSource:消息来源
SessionSource(gateway/session.py:72-154)描述一条消息的来源:
# gateway/session.py:72-91
@dataclass
class SessionSource:
platform: Platform
chat_id: str
chat_name: Optional[str] = None
chat_type: str = "dm" # "dm", "group", "channel", "thread"
user_id: Optional[str] = None
user_name: Optional[str] = None
thread_id: Optional[str] = None
chat_topic: Optional[str] = None
user_id_alt: Optional[str] = None # Signal UUID
chat_id_alt: Optional[str] = None # Signal group internal ID
Session Key 构建
build_session_key()(gateway/session.py:444-500)是 session 隔离的核心逻辑:
# gateway/session.py:444-500(简化)
def build_session_key(source: SessionSource,
group_sessions_per_user: bool = True,
thread_sessions_per_user: bool = False) -> str:
platform = source.platform.value
if source.chat_type == "dm":
if source.chat_id:
if source.thread_id:
return f"agent:main:{platform}:dm:{source.chat_id}:{source.thread_id}"
return f"agent:main:{platform}:dm:{source.chat_id}"
return f"agent:main:{platform}:dm"
# Group/channel: optionally isolate per user
key_parts = ["agent:main", platform, source.chat_type]
if source.chat_id:
key_parts.append(source.chat_id)
if source.thread_id:
key_parts.append(source.thread_id)
# Thread sessions default to shared (all participants share context)
isolate_user = group_sessions_per_user
if source.thread_id and not thread_sessions_per_user:
isolate_user = False
if isolate_user and participant_id:
key_parts.append(str(participant_id))
return ":".join(key_parts)
关键规则:
- DM:每个 chat_id(+ 可选的 thread_id)一个 session
- 群组/频道:默认按用户隔离(每个用户独立上下文)
- 线程:默认共享(所有参与者看到同一个对话),可配置为按用户隔离
Session 重置策略
SessionResetPolicy(gateway/config.py:95-135)控制 session 何时被重置:
# gateway/config.py:95-109
@dataclass
class SessionResetPolicy:
mode: str = "both" # "daily", "idle", "both", "none"
at_hour: int = 4 # 每日重置时间(0-23 小时)
idle_minutes: int = 1440 # 空闲超时(默认 24 小时)
notify: bool = True # 重置时通知用户
notify_exclude_platforms: tuple = ("api_server", "webhook")
四种模式:
| 模式 | 触发条件 | 适用场景 |
|---|---|---|
daily | 每天 at_hour 时重置 | 日报类助手 |
idle | 空闲超过 idle_minutes 后重置 | 一般对话 |
both | daily 或 idle 任一触发(默认) | 推荐默认 |
none | 永不自动重置 | 由压缩器管理上下文 |
SessionStore._should_reset()(gateway/session.py:626-668)在每次消息到达时评估重置策略:
# gateway/session.py:649-668(简化)
def _should_reset(self, entry: SessionEntry, source: SessionSource) -> Optional[str]:
policy = self.config.get_reset_policy(
platform=source.platform,
session_type=source.chat_type
)
if policy.mode == "none":
return None
now = _now()
if policy.mode in ("idle", "both"):
idle_deadline = entry.updated_at + timedelta(minutes=policy.idle_minutes)
if now > idle_deadline:
return "idle"
if policy.mode in ("daily", "both"):
today_reset = now.replace(hour=policy.at_hour, minute=0, second=0)
if now.hour < policy.at_hour:
today_reset -= timedelta(days=1)
if entry.updated_at < today_reset:
return "daily"
return None
PII 脱敏
当 redact_pii=True 时,Gateway 在构建 system prompt 之前对用户 ID 和聊天 ID 进行哈希脱敏(gateway/session.py:34-57):
# gateway/session.py:37-39
def _hash_id(value: str) -> str:
"""Deterministic 12-char hex hash of an identifier."""
return hashlib.sha256(value.encode("utf-8")).hexdigest()[:12]
脱敏仅对 _PII_SAFE_PLATFORMS(WhatsApp、Signal、Telegram)生效(session.py:191-198)。Discord 被排除在外,因为 Discord 的 mention 格式 <@user_id> 需要真实 ID——如果脱敏,LLM 就无法正确 @ 用户。
SessionContext 与 System Prompt 注入
build_session_context_prompt()(gateway/session.py:202-340)构建注入 agent system prompt 的上下文信息:
# gateway/session.py:202-206
def build_session_context_prompt(
context: SessionContext,
*,
redact_pii: bool = False,
) -> str:
"""Build the dynamic system prompt section that tells the agent about its context."""
注入的信息包括:
- 消息来源平台和描述
- 频道主题(为 agent 提供频道用途上下文)
- 是否为多用户线程(
session.py:263-271) - 平台行为限制(如 Slack/Discord 不能搜索频道历史)
- 已连接的平台列表
- 各平台的 home channel
- Cron 任务的投递选项
设计启示
Gateway 子系统的设计可以提炼出三个原则:
-
公约数 ABC + 渐进增强:
BasePlatformAdapter的三个抽象方法(connect/disconnect/send)是所有平台的最小公约数,可选方法(send_image/send_voice/edit_message 等)允许平台按能力渐进增强。新平台用 50 行代码就能启动。Weixin 适配器的加入再次验证了这个模式的可扩展性 -
Session 隔离策略是可配置的:DM/群组/线程各有不同的隔离规则,重置策略有四种模式。这些不是硬编码的——因为不同的使用场景(个人助手 vs 团队 bot)需要不同的隔离语义
-
脱敏与平台行为耦合:PII 脱敏不是全局开关,而是根据平台特性选择性应用。这种务实的做法避免了"脱敏后功能损坏"的问题
设计赌注回扣:本章是 Run Anywhere 赌注的核心体现。17 个平台类型通过一个 ABC 共享同一个 AIAgent,session 管理策略让同一个 agent 既能做私人助手(DM 隔离),又能做团队 bot(线程共享)。PII 脱敏和平台行为注入也回扣了 Personal Long-Term 赌注——agent 需要知道自己在哪个平台、和谁对话,才能提供个性化服务。
版本演化说明
本章核心分析基于 Hermes Agent v0.8.0(2026 年 4 月)。 Gateway 与
BasePlatformAdapter的基础骨架早于 v0.3.0 就已出现;之后 v0.3.0-v0.8.0 之间持续扩张平台覆盖,并不断补充 session 重置、脱敏和投递行为。_agent_cache可以明确定位到 v0.4.0 发布窗口,它是 prompt cache 相关优化的重要节点。v0.8.0 阶段新增了 drain-and-restart 优雅重启机制、agent cache 的 failed-run no-evict 保护、Matrix mautrix 迁移(含可选 E2EE)以及个人微信(Weixin)适配器,平台总数从 16 增至 17。