命令队列(2026-01-16)
我们通过一个轻量的进程内队列来序列化所有渠道的入站自动回复运行,防止多个 agent 运行之间相互冲突,同时允许跨会话的安全并行。
为什么需要队列
- 自动回复运行可能很耗资源(LLM 调用),多条消息同时到达时容易碰撞。
- 序列化可以避免对共享资源的争抢(会话文件、日志、CLI stdin),也能降低触发上游速率限制的概率。
工作原理
- 一个感知通道(lane)的 FIFO 队列,每条 lane 有可配置的并发上限(未配置的 lane 默认为 1;main 默认 4,subagent 默认 8)。
runEmbeddedPiAgent按会话 key 入队(lanesession:<key>),保证每个会话同一时刻只有一个活跃运行。- 每个会话运行随后排入全局 lane(默认
main),整体并行度受agents.defaults.maxConcurrent限制。 - 开启 verbose 日志时,如果排队等待超过约 2 秒,会输出一条简短提示。
- 入队时就会立即触发输入指示器(如果渠道支持),等待期间用户体验不受影响。
队列模式(按渠道)
入站消息可以干预当前运行、等待下一轮,或两者兼顾:
steer:立即注入当前运行(在下一个工具边界后取消待执行的工具调用)。如果未在流式输出,则回退为 followup。followup:当前运行结束后排入下一轮 agent 执行。collect:把所有排队消息合并为一次 followup 轮次(默认模式)。如果消息指向不同的渠道/线程,则逐条处理以保持路由正确。steer-backlog(即steer+backlog):立即干预当前运行,同时保留消息用于后续 followup。interrupt(旧版):中止该会话的活跃运行,然后执行最新消息。queue(旧版别名):等同于steer。
steer-backlog 意味着干预运行结束后还会有一次 followup 响应,在流式场景下可能看起来像是重复回复。如果你希望每条消息只产生一次响应,建议用 collect 或 steer。
发送 /queue collect 作为独立命令(按会话生效),或在配置中设置 messages.queue.byChannel.discord: "collect"。
默认值(未在配置中设置时):
- 所有渠道 →
collect
通过 messages.queue 全局或按渠道配置:
{
messages: {
queue: {
mode: "collect",
debounceMs: 1000,
cap: 20,
drop: "summarize",
byChannel: { discord: "collect" },
},
},
}
队列选项
以下选项适用于 followup、collect 和 steer-backlog(以及 steer 回退为 followup 时):
debounceMs:等待一段静默期后再开始 followup(避免连续”继续、继续”)。cap:每个会话的最大排队消息数。drop:溢出策略(old、new、summarize)。
summarize 会保留被丢弃消息的简要列表,并作为合成的 followup 提示注入。
默认值:debounceMs: 1000,cap: 20,drop: summarize。
按会话覆盖
- 发送
/queue <mode>作为独立命令,为当前会话设置队列模式。 - 选项可以组合使用:
/queue collect debounce:2s cap:25 drop:summarize /queue default或/queue reset清除当前会话的覆盖配置。
适用范围与保证
- 适用于所有使用 Gateway 回复管线的入站渠道的自动回复 agent 运行(WhatsApp web、Telegram、Slack、Discord、Signal、iMessage、webchat 等)。
- 默认 lane(
main)是进程级别的,覆盖入站请求和 main 心跳;设置agents.defaults.maxConcurrent可允许多会话并行。 - 可能存在额外的 lane(如
cron、subagent),后台任务可以并行运行而不阻塞入站回复。 - 按会话的 lane 保证同一时刻只有一个 agent 运行操作某个会话。
- 无外部依赖,无后台工作线程;纯 TypeScript + Promise 实现。
排查问题
- 如果命令看起来卡住了,开启 verbose 日志,查找”queued for …ms”日志行确认队列正在消耗。
- 如需了解队列深度,开启 verbose 日志并关注队列计时日志。