diff --git a/README.md b/README.md index e02fbb9..f52bc90 100644 --- a/README.md +++ b/README.md @@ -804,27 +804,39 @@ Per-session FIFO queue policy for concurrent gateway requests (`agent.send`). ```yaml server: queue: - mode: collect # collect | steer | interrupt + mode: collect # collect | followup | steer | steer_backlog | interrupt cap: 50 # max pending requests per session lane overflow: drop_old # drop_old | drop_new + debounce_ms: 0 # delay before running next queued item + summarize_overflow: true overrides: channels: ws: - mode: steer + mode: followup cap: 10 sessions: ws:vip-user: mode: interrupt overflow: drop_new + debounce_ms: 100 ``` Notes: - `collect` keeps all queued requests (subject to `cap`). -- `steer` and `interrupt` keep only the latest pending request while one is active. +- `followup` keeps at most one pending item while a request is active; newer followups replace older pending items. +- `steer` and `steer_backlog` replace pending backlog with the newest request while one is active. +- `interrupt` uses steer-backlog queueing behavior; active work still requires `agent.cancel` for best-effort cancellation. - `interrupt` currently does not force-stop already running work; use `agent.cancel` for active cancellation. +- `debounce_ms` delays the next queued execution, helping collapse bursty same-session traffic. +- `summarize_overflow` enables richer overflow error messages and payload metadata. - On overflow, `drop_old` evicts the oldest pending request, `drop_new` rejects the new request. - Override precedence: exact `sessions` match first, then `channels`. +Runtime session controls from chat commands: +- `/queue` shows effective session queue policy. +- `/queue set ` sets a per-session override. +- `/queue reset` clears per-session queue overrides. + ## Gateway Request Body Limit Cap inbound HTTP POST body size (webhooks and Gmail push) to reduce memory-DoS risk. diff --git a/config/default.yaml b/config/default.yaml index a805212..8755377 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -62,11 +62,13 @@ server: violation_window_ms: 10000 # Per-session FIFO lane queue for gateway requests. queue: - mode: collect # collect | steer | interrupt + mode: collect # collect | followup | steer | steer_backlog | interrupt cap: 50 # max queued (pending) requests per session lane overflow: drop_old # drop_old | drop_new + debounce_ms: 0 # delay before starting next queued request + summarize_overflow: true overrides: - channels: {} # e.g. ws: { mode: steer, cap: 10 } + channels: {} # e.g. ws: { mode: followup, cap: 10, debounce_ms: 100 } sessions: {} # e.g. ws:vip-user: { mode: interrupt, overflow: drop_new } # Local-network service discovery (mDNS/Bonjour). Keep disabled by default. # Requires server.localhost: false so LAN clients can actually connect. diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index 003c079..d77a7da 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -33,6 +33,8 @@ The gateway serialises agent work **per session**, not per WebSocket connection: - Requests that target the same `sessionId` run one-at-a-time (FIFO) in a per-session lane. - Requests for different sessions can run in parallel. +- Lane policy is configurable (`collect`, `followup`, `steer_backlog`, `interrupt`) with per-channel and per-session overrides. +- Session-local overrides can be managed at runtime via `agent.send` commands: `/queue`, `/queue set ...`, `/queue reset`. This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`. @@ -555,6 +557,8 @@ Send a message to the agent and stream response. } ``` +When queue policy rejects/supersedes a request before execution, the server emits an `error` event with `code: 3` (`AgentBusy`) and includes `data.queue` metadata (`code`, `laneId`, `mode`, `overflow`, `droppedCount`). + #### `agent.cancel` Cancel the current agent operation. @@ -876,11 +880,20 @@ Error occurred during processing. "event": "error", "data": { "code": 5, - "message": "Internal error: ..." + "message": "Internal error: ...", + "queue": { + "code": "overflow", + "laneId": "ws:abc123", + "mode": "followup", + "overflow": "drop_new", + "droppedCount": 1 + } } } ``` +`data.queue` is optional and only present for queue policy rejections/superseded requests. + ## Error Codes | Code | Name | Description | diff --git a/docs/plans/state.json b/docs/plans/state.json index e5add47..cb13ca8 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -33,12 +33,39 @@ "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/gateway/handlers/agent.test.ts src/config/schema.test.ts + pnpm typecheck passing" }, "openclaw-gap-next-steps-3phase": { - "status": "planned", + "status": "in_progress", "date": "2026-02-16", "updated": "2026-02-16", - "summary": "Defined a concrete 3-phase implementation plan for remaining high-impact OpenClaw parity work: queue policy parity v2, channel reach expansion (Mattermost first), and companion-node capability negotiation foundation.", + "summary": "Defined and began executing a concrete 3-phase implementation plan for remaining high-impact OpenClaw parity work: queue policy parity v2, channel reach expansion (Mattermost first), and companion-node capability negotiation foundation.", "file": "2026-02-16-openclaw-gap-next-steps-3phase.md" }, + "openclaw-gap-phase1-queue-parity-v2": { + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Completed queue parity v2: added followup/steer_backlog semantics, lane debounce, overflow summarization metadata, runtime session queue controls (`/queue` show/set/reset), queue config patch keys, and docs/protocol updates.", + "files_modified": [ + "src/gateway/lane-queue.ts", + "src/gateway/lane-queue.test.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "src/gateway/server.ts", + "src/gateway/protocol.ts", + "src/gateway/handlers/config.ts", + "src/gateway/handlers/handlers.test.ts", + "src/commands/types.ts", + "src/commands/builtin/index.ts", + "src/commands/builtin/index.test.ts", + "src/commands/index.ts", + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/daemon/services.ts", + "config/default.yaml", + "README.md", + "docs/api/PROTOCOL.md" + ], + "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/gateway/handlers/agent.test.ts src/gateway/handlers/handlers.test.ts src/commands/builtin/index.test.ts src/config/schema.test.ts + pnpm typecheck + pnpm build passing" + }, "docs-gateway-auth-config-keys": { "status": "completed", "date": "2026-02-15", @@ -2962,12 +2989,12 @@ "tier2_completion": "4/4 (100%) — inbound webhooks, vector memory search, Dockerfile, heartbeat monitor", "tier3_completion": "5/5 (100%) — lane queue, credential redaction, web UI token dashboard, xAI (Grok) provider, Voyage AI embeddings", "tier4_completion": "4/4 (100%) — gateway lock, shell completion, Tailscale Serve/Funnel, DM pairing codes", - "feature_gap_scorecard": "115/128 match (90%), 0 partial (0%), 13 missing (10%)", + "feature_gap_scorecard": "116/128 match (91%), 0 partial (0%), 12 missing (9%)", "operator_dx_milestone": "Phase 3 (Live Ops Dashboard): 2/2 plans complete — milestone done", "gmail_auth_cli": "flynn gmail-auth command implemented with OAuth2 flow, doctor check, config routed to Telegram", "native_audio_support": "completed — smart routing for native audio (Gemini/OpenAI/GitHub) vs Whisper transcription fallback", "remaining_phases_completion": "Phase 1: 3/3 (100%) — context levels, command registry, memory structure. Phase 2: 3/3 (100%) — component registry, confidence routing, history index. Phase 3: 2/2 (100%) — adaptive memory/compaction, truthfulness/autonomy hardening", - "next_up": "OpenClaw gap: Location access (open next scoped implementation checklist)" + "next_up": "OpenClaw gap phase 2: Mattermost channel adapter + wiring/docs/tests" }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/commands/builtin/index.test.ts b/src/commands/builtin/index.test.ts index 4ed74c1..69fac64 100644 --- a/src/commands/builtin/index.test.ts +++ b/src/commands/builtin/index.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, vi } from 'vitest'; -import { createElevateCommand, createModelCommand } from './index.js'; +import { createElevateCommand, createModelCommand, createQueueCommand } from './index.js'; describe('builtin /model command', () => { it('passes through the full argument string', async () => { @@ -67,3 +67,47 @@ describe('builtin /elevate command', () => { expect(result).toEqual({ handled: true, text: 'status' }); }); }); + +describe('builtin /queue command', () => { + it('shows queue status with no args', async () => { + const cmd = createQueueCommand(); + const getQueue = vi.fn(() => 'queue status'); + const result = await cmd.execute([], { + channel: 'test', + senderId: 'user', + sessionId: 's1', + rawInput: '/queue', + services: { getQueue }, + }); + expect(getQueue).toHaveBeenCalledOnce(); + expect(result).toEqual({ handled: true, text: 'queue status' }); + }); + + it('passes through set operations', async () => { + const cmd = createQueueCommand(); + const setQueue = vi.fn(() => 'updated'); + const result = await cmd.execute(['set', 'mode', 'followup'], { + channel: 'test', + senderId: 'user', + sessionId: 's1', + rawInput: '/queue set mode followup', + services: { setQueue }, + }); + expect(setQueue).toHaveBeenCalledWith('mode followup'); + expect(result).toEqual({ handled: true, text: 'updated' }); + }); + + it('calls reset operation', async () => { + const cmd = createQueueCommand(); + const resetQueue = vi.fn(() => 'reset'); + const result = await cmd.execute(['reset'], { + channel: 'test', + senderId: 'user', + sessionId: 's1', + rawInput: '/queue reset', + services: { resetQueue }, + }); + expect(resetQueue).toHaveBeenCalledOnce(); + expect(result).toEqual({ handled: true, text: 'reset' }); + }); +}); diff --git a/src/commands/builtin/index.ts b/src/commands/builtin/index.ts index 1d89c12..3ec24bc 100644 --- a/src/commands/builtin/index.ts +++ b/src/commands/builtin/index.ts @@ -149,6 +149,36 @@ export function createElevateCommand(): CommandDefinition { }; } +export function createQueueCommand(): CommandDefinition { + return { + name: 'queue', + description: 'Inspect or update per-session queue policy', + execute: async (args, ctx) => { + if (args.length === 0 || args[0] === 'show') { + if (!ctx.services?.getQueue) { + return notAvailable('Queue command'); + } + return { handled: true, text: await ctx.services.getQueue() }; + } + + const [action, ...rest] = args; + if (action === 'reset') { + if (!ctx.services?.resetQueue) { + return notAvailable('Queue command'); + } + return { handled: true, text: await ctx.services.resetQueue() }; + } + + if (!ctx.services?.setQueue) { + return notAvailable('Queue command'); + } + + const input = action === 'set' ? rest.join(' ') : args.join(' '); + return { handled: true, text: await ctx.services.setQueue(input) }; + }, + }; +} + export function registerBuiltinCommands(registry: CommandRegistry): void { registry.register(createHelpCommand(registry)); registry.register(createStatusCommand()); @@ -157,4 +187,5 @@ export function registerBuiltinCommands(registry: CommandRegistry): void { registry.register(createCompactCommand()); registry.register(createResetCommand()); registry.register(createElevateCommand()); + registry.register(createQueueCommand()); } diff --git a/src/commands/index.ts b/src/commands/index.ts index ee13182..fda9d72 100644 --- a/src/commands/index.ts +++ b/src/commands/index.ts @@ -7,5 +7,6 @@ export { createModelCommand, createCompactCommand, createResetCommand, + createQueueCommand, registerBuiltinCommands, } from './builtin/index.js'; diff --git a/src/commands/types.ts b/src/commands/types.ts index cd7a2d3..c738a04 100644 --- a/src/commands/types.ts +++ b/src/commands/types.ts @@ -28,4 +28,8 @@ export interface CommandServices { getElevation?: () => Promise | string; setElevation?: (input: string) => Promise | string; + + getQueue?: () => Promise | string; + setQueue?: (input: string) => Promise | string; + resetQueue?: () => Promise | string; } diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 1e3832c..4d15df0 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -80,6 +80,8 @@ describe('configSchema — server', () => { expect(result.server.queue.mode).toBe('collect'); expect(result.server.queue.cap).toBe(50); expect(result.server.queue.overflow).toBe('drop_old'); + expect(result.server.queue.debounce_ms).toBe(0); + expect(result.server.queue.summarize_overflow).toBe(true); expect(result.server.queue.overrides.channels).toEqual({}); expect(result.server.queue.overrides.sessions).toEqual({}); }); @@ -89,15 +91,19 @@ describe('configSchema — server', () => { ...minimalConfig, server: { queue: { - mode: 'steer', + mode: 'steer_backlog', cap: 10, overflow: 'drop_new', + debounce_ms: 250, + summarize_overflow: false, }, }, }); - expect(result.server.queue.mode).toBe('steer'); + expect(result.server.queue.mode).toBe('steer_backlog'); expect(result.server.queue.cap).toBe(10); expect(result.server.queue.overflow).toBe('drop_new'); + expect(result.server.queue.debounce_ms).toBe(250); + expect(result.server.queue.summarize_overflow).toBe(false); }); it('accepts queue override settings', () => { @@ -110,7 +116,7 @@ describe('configSchema — server', () => { ws: { mode: 'collect', cap: 5 }, }, sessions: { - 'ws:vip-user': { mode: 'interrupt', overflow: 'drop_new' }, + 'ws:vip-user': { mode: 'interrupt', overflow: 'drop_new', debounce_ms: 1000, summarize_overflow: false }, }, }, }, @@ -120,6 +126,8 @@ describe('configSchema — server', () => { expect(result.server.queue.overrides.channels.ws.cap).toBe(5); expect(result.server.queue.overrides.sessions['ws:vip-user'].mode).toBe('interrupt'); expect(result.server.queue.overrides.sessions['ws:vip-user'].overflow).toBe('drop_new'); + expect(result.server.queue.overrides.sessions['ws:vip-user'].debounce_ms).toBe(1000); + expect(result.server.queue.overrides.sessions['ws:vip-user'].summarize_overflow).toBe(false); }); it('defaults discovery settings', () => { diff --git a/src/config/schema.ts b/src/config/schema.ts index de616e7..b74f1db 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -34,27 +34,35 @@ const wsRateLimitSchema = z.object({ const laneQueueSchema = z.object({ /** Queue behavior for concurrent requests in the same session lane. */ - mode: z.enum(['collect', 'steer', 'interrupt']).default('collect'), + mode: z.enum(['collect', 'followup', 'steer', 'steer_backlog', 'interrupt']).default('collect'), /** Max queued (pending) requests per lane. */ cap: z.number().min(1).max(1000).default(50), /** Overflow strategy when cap is reached. */ overflow: z.enum(['drop_old', 'drop_new']).default('drop_old'), + /** Debounce window before starting next queued request (ms). */ + debounce_ms: z.number().min(0).max(60_000).default(0), + /** Include contextual summary details in overflow rejections. */ + summarize_overflow: z.boolean().default(true), /** Optional per-channel/per-session queue policy overrides. */ overrides: z.object({ channels: z.record( z.string(), z.object({ - mode: z.enum(['collect', 'steer', 'interrupt']).optional(), + mode: z.enum(['collect', 'followup', 'steer', 'steer_backlog', 'interrupt']).optional(), cap: z.number().min(1).max(1000).optional(), overflow: z.enum(['drop_old', 'drop_new']).optional(), + debounce_ms: z.number().min(0).max(60_000).optional(), + summarize_overflow: z.boolean().optional(), }), ).default({}), sessions: z.record( z.string(), z.object({ - mode: z.enum(['collect', 'steer', 'interrupt']).optional(), + mode: z.enum(['collect', 'followup', 'steer', 'steer_backlog', 'interrupt']).optional(), cap: z.number().min(1).max(1000).optional(), overflow: z.enum(['drop_old', 'drop_new']).optional(), + debounce_ms: z.number().min(0).max(60_000).optional(), + summarize_overflow: z.boolean().optional(), }), ).default({}), }).default({}), diff --git a/src/daemon/services.ts b/src/daemon/services.ts index aaa378a..1686a91 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -325,9 +325,33 @@ export function createGateway(deps: GatewayDeps): GatewayServer { mode: config.server.queue.mode, cap: config.server.queue.cap, overflow: config.server.queue.overflow, + debounceMs: config.server.queue.debounce_ms, + summarizeOverflow: config.server.queue.summarize_overflow, overrides: { - channels: config.server.queue.overrides.channels, - sessions: config.server.queue.overrides.sessions, + channels: Object.fromEntries( + Object.entries(config.server.queue.overrides.channels).map(([key, value]) => [ + key, + { + mode: value.mode, + cap: value.cap, + overflow: value.overflow, + debounceMs: value.debounce_ms, + summarizeOverflow: value.summarize_overflow, + }, + ]), + ), + sessions: Object.fromEntries( + Object.entries(config.server.queue.overrides.sessions).map(([key, value]) => [ + key, + { + mode: value.mode, + cap: value.cap, + overflow: value.overflow, + debounceMs: value.debounce_ms, + summarizeOverflow: value.summarize_overflow, + }, + ]), + ), }, }, discovery: { diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 3ac22e8..dfdb462 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect, vi, beforeEach } from 'vitest'; import type { GatewayEvent, GatewayRequest, OutboundMessage } from '../protocol.js'; -import { LaneQueue } from '../lane-queue.js'; +import { LaneQueue, LaneQueueRejectedError } from '../lane-queue.js'; import { createAgentHandlers } from './agent.js'; import type { AgentHandlerDeps } from './agent.js'; import { CommandRegistry, registerBuiltinCommands } from '../../commands/index.js'; @@ -28,6 +28,7 @@ describe('createAgentHandlers command fast-path', () => { }; const sessionManager = { + getSessionConfig: vi.fn(), setSessionConfig: vi.fn(), deleteSessionConfig: vi.fn(), }; @@ -123,6 +124,26 @@ describe('createAgentHandlers command fast-path', () => { expect((sent[0] as GatewayEvent).event).toBe('done'); expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response'); }); + + it('handles /queue command via fast-path and persists queue session config', async () => { + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + const req: GatewayRequest = { + id: 5, + method: 'agent.send', + params: { + message: '/queue set mode followup', + connectionId: 'conn-1', + metadata: { isCommand: true, command: 'queue', commandArgs: 'set mode followup' }, + }, + }; + + await handlers['agent.send'](req, send); + + expect(sessionManager.setSessionConfig).toHaveBeenCalledWith('ws', 'ws:conn-1', 'queue.mode', 'followup'); + expect(mockAgent.process).not.toHaveBeenCalled(); + expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Set queue.mode=followup'); + }); }); describe('createAgentHandlers queue policy resolution', () => { @@ -154,7 +175,7 @@ describe('createAgentHandlers queue policy resolution', () => { cancel: vi.fn(), } as unknown as LaneQueue; - const resolveQueuePolicy = vi.fn(() => ({ mode: 'steer' as const, cap: 3 })); + const resolveQueuePolicy = vi.fn(() => ({ mode: 'steer_backlog' as const, cap: 3, debounceMs: 25 })); const handlers = createAgentHandlers({ sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'], @@ -178,8 +199,64 @@ describe('createAgentHandlers queue policy resolution', () => { channel: 'ws', }); expect((laneQueue.enqueue as unknown as ReturnType).mock.calls[0][2]).toEqual({ - mode: 'steer', + mode: 'steer_backlog', cap: 3, + debounceMs: 25, }); }); + + it('emits structured queue error events for lane rejections', async () => { + const sessionBridge = { + getAgent: vi.fn(() => ({ + process: vi.fn(async () => 'ok'), + getUsage: vi.fn(() => ({ + primary: { inputTokens: 0, outputTokens: 0, calls: 0 }, + delegation: {}, + total: { inputTokens: 0, outputTokens: 0, calls: 0, estimatedCost: 0 }, + })), + getModelTier: vi.fn(() => 'default'), + setModelTier: vi.fn(), + compact: vi.fn(async () => null), + reset: vi.fn(), + })), + getSessionId: vi.fn(() => 'ws:s1'), + setBusy: vi.fn(), + setOnToolUse: vi.fn(), + isBusy: vi.fn(() => false), + }; + + const laneQueue = { + enqueue: vi.fn(async () => { + throw new LaneQueueRejectedError({ + code: 'overflow', + laneId: 'ws:s1', + mode: 'followup', + overflow: 'drop_new', + droppedCount: 1, + message: 'Lane queue full (drop_new)', + }); + }), + cancel: vi.fn(), + } as unknown as LaneQueue; + + const handlers = createAgentHandlers({ + sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'], + laneQueue, + }); + + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + + await handlers['agent.send']({ + id: 6, + method: 'agent.send', + params: { message: 'hello', connectionId: 'conn-1' }, + }, send); + + expect(sent).toHaveLength(1); + const event = sent[0] as GatewayEvent; + expect(event.event).toBe('error'); + expect((event.data as { code: number }).code).toBe(3); + expect((event.data as { queue?: { code: string } }).queue?.code).toBe('overflow'); + }); }); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index da1c915..8a5bb77 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -4,6 +4,7 @@ import { makeEvent, makeError, ErrorCode } from '../protocol.js'; import type { SessionBridge } from '../session-bridge.js'; import type { LaneQueue } from '../lane-queue.js'; import type { LaneQueueConfig } from '../lane-queue.js'; +import { LaneQueueRejectedError } from '../lane-queue.js'; import type { MetricsCollector } from '../metrics.js'; import type { Attachment } from '../../channels/types.js'; import type { SessionManager } from '../../session/manager.js'; @@ -56,13 +57,15 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const sessionId = deps.sessionBridge.getSessionId(connectionId); const laneId = sessionId ?? connectionId; const channel = sessionId?.split(':', 1)[0] ?? 'ws'; + const resolvedPolicy = deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel }); // Enqueue the work — if the lane is idle it runs immediately, // otherwise it waits for earlier requests on the same session to finish. const requestId = request.id.toString(); deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' }); - return deps.laneQueue.enqueue(laneId, async () => { + try { + return await deps.laneQueue.enqueue(laneId, async () => { deps.sessionBridge.setBusy(connectionId, true); const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string' @@ -256,6 +259,89 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { return `Elevated mode: on until ${new Date(untilMs).toISOString()}`; }, + + getQueue: () => { + const mode = resolvedPolicy?.mode ?? 'collect'; + const cap = resolvedPolicy?.cap ?? 50; + const overflow = resolvedPolicy?.overflow ?? 'drop_old'; + const debounceMs = resolvedPolicy?.debounceMs ?? 0; + const summarizeOverflow = resolvedPolicy?.summarizeOverflow ?? true; + const source = deps.sessionManager && sessionId + ? deps.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode') ? 'session override' : 'default/channel' + : 'default/channel'; + return [ + '**Queue policy**', + `mode: ${mode}`, + `cap: ${cap}`, + `overflow: ${overflow}`, + `debounce_ms: ${debounceMs}`, + `summarize_overflow: ${summarizeOverflow}`, + `source: ${source}`, + ].join('\n'); + }, + + setQueue: (input: string) => { + if (!deps.sessionManager || !sessionId) { + return 'Queue command is not available in this session.'; + } + const [rawKey, ...rest] = input.trim().split(/\s+/); + const value = rest.join(' ').trim(); + if (!rawKey || !value) { + return 'Usage: /queue '; + } + const key = rawKey.toLowerCase(); + if (key === 'mode') { + if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(value)) { + return 'Invalid mode. Use one of: collect, followup, steer, steer_backlog, interrupt'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.mode', value); + return `Set queue.mode=${value} for this session`; + } + if (key === 'cap') { + const cap = Number.parseInt(value, 10); + if (!Number.isFinite(cap) || cap < 1 || cap > 1000) { + return 'Invalid cap. Use an integer between 1 and 1000'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.cap', String(cap)); + return `Set queue.cap=${cap} for this session`; + } + if (key === 'overflow') { + if (value !== 'drop_old' && value !== 'drop_new') { + return 'Invalid overflow. Use drop_old or drop_new'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.overflow', value); + return `Set queue.overflow=${value} for this session`; + } + if (key === 'debounce_ms') { + const debounceMs = Number.parseInt(value, 10); + if (!Number.isFinite(debounceMs) || debounceMs < 0 || debounceMs > 60_000) { + return 'Invalid debounce_ms. Use an integer between 0 and 60000'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.debounce_ms', String(debounceMs)); + return `Set queue.debounce_ms=${debounceMs} for this session`; + } + if (key === 'summarize_overflow') { + const normalized = value.toLowerCase(); + if (normalized !== 'true' && normalized !== 'false') { + return 'Invalid summarize_overflow. Use true or false'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.summarize_overflow', normalized); + return `Set queue.summarize_overflow=${normalized} for this session`; + } + return 'Unknown queue key. Use one of: mode, cap, overflow, debounce_ms, summarize_overflow'; + }, + + resetQueue: () => { + if (!deps.sessionManager || !sessionId) { + return 'Queue command is not available in this session.'; + } + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.mode'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.cap'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.overflow'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.debounce_ms'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.summarize_overflow'); + return 'Reset session queue overrides.'; + }, }, }); @@ -320,7 +406,18 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { deps.sessionBridge.setOnToolUse(connectionId, undefined); deps.metrics?.endRequest(requestId); } - }, deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel })); + }, resolvedPolicy); + } catch (err) { + if (err instanceof LaneQueueRejectedError) { + send(makeEvent(request.id, 'error', { + code: ErrorCode.AgentBusy, + message: err.message, + queue: err.details, + })); + return; + } + throw err; + } }, 'agent.cancel': async (request: GatewayRequest): Promise => { diff --git a/src/gateway/handlers/config.ts b/src/gateway/handlers/config.ts index 08eb6e4..05a1654 100644 --- a/src/gateway/handlers/config.ts +++ b/src/gateway/handlers/config.ts @@ -125,6 +125,31 @@ const PATCHABLE_KEYS: Record boolean config.server.localhost = value; return true; }, + 'server.queue.mode': (config, value) => { + if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(String(value))) {return false;} + config.server.queue.mode = value as typeof config.server.queue.mode; + return true; + }, + 'server.queue.cap': (config, value) => { + if (typeof value !== 'number' || !Number.isFinite(value) || value < 1 || value > 1000) {return false;} + config.server.queue.cap = Math.floor(value); + return true; + }, + 'server.queue.overflow': (config, value) => { + if (value !== 'drop_old' && value !== 'drop_new') {return false;} + config.server.queue.overflow = value; + return true; + }, + 'server.queue.debounce_ms': (config, value) => { + if (typeof value !== 'number' || !Number.isFinite(value) || value < 0 || value > 60_000) {return false;} + config.server.queue.debounce_ms = Math.floor(value); + return true; + }, + 'server.queue.summarize_overflow': (config, value) => { + if (typeof value !== 'boolean') {return false;} + config.server.queue.summarize_overflow = value; + return true; + }, }; export function createConfigHandlers(deps: ConfigHandlerDeps) { diff --git a/src/gateway/handlers/handlers.test.ts b/src/gateway/handlers/handlers.test.ts index 05ac45b..43ebbed 100644 --- a/src/gateway/handlers/handlers.test.ts +++ b/src/gateway/handlers/handlers.test.ts @@ -720,7 +720,18 @@ describe('config handlers', () => { function makeConfig() { return { telegram: { bot_token: 'secret-token-123', allowed_chat_ids: [12345] }, - server: { tailscale: {}, localhost: true, port: 18800 }, + server: { + tailscale: {}, + localhost: true, + port: 18800, + queue: { + mode: 'collect' as const, + cap: 50, + overflow: 'drop_old' as const, + debounce_ms: 0, + summarize_overflow: true, + }, + }, models: { default: { provider: 'anthropic' as const, model: 'claude-3-haiku', api_key: 'sk-secret-key' }, fallback_chain: ['anthropic'], @@ -754,18 +765,22 @@ describe('config handlers', () => { patches: { 'hooks.confirm': ['shell.exec', 'file.write'], 'hooks.log': ['file.read'], + 'server.queue.mode': 'followup', + 'server.queue.debounce_ms': 100, }, }, }; const result = await handlers['config.patch'](req) as GatewayResponse; const r = result.result as { applied: string[]; rejected: string[]; persisted: boolean }; - expect(r.applied).toEqual(['hooks.confirm', 'hooks.log']); + expect(r.applied).toEqual(['hooks.confirm', 'hooks.log', 'server.queue.mode', 'server.queue.debounce_ms']); expect(r.rejected).toEqual([]); expect(r.persisted).toBe(false); // Verify the config was actually mutated expect(config.hooks.confirm).toEqual(['shell.exec', 'file.write']); expect(config.hooks.log).toEqual(['file.read']); + expect(config.server.queue.mode).toBe('followup'); + expect(config.server.queue.debounce_ms).toBe(100); }); it('config.patch rejects unknown keys', async () => { @@ -798,6 +813,7 @@ describe('config handlers', () => { params: { patches: { 'hooks.confirm': 'not-an-array', + 'server.queue.cap': 0, }, }, }; @@ -805,7 +821,7 @@ describe('config handlers', () => { const r = result.result as { applied: string[]; rejected: string[]; persisted: boolean }; expect(r.applied).toEqual([]); - expect(r.rejected).toEqual(['hooks.confirm']); + expect(r.rejected).toEqual(['hooks.confirm', 'server.queue.cap']); expect(r.persisted).toBe(false); }); diff --git a/src/gateway/lane-queue.test.ts b/src/gateway/lane-queue.test.ts index bf0cdd6..e31c638 100644 --- a/src/gateway/lane-queue.test.ts +++ b/src/gateway/lane-queue.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from 'vitest'; -import { LaneQueue } from './lane-queue.js'; +import { LaneQueue, LaneQueueRejectedError } from './lane-queue.js'; describe('LaneQueue', () => { it('executes a single item immediately', async () => { @@ -192,8 +192,8 @@ describe('LaneQueue', () => { expect(r2).toBe('second'); }); - it('steer mode keeps only the most recent pending request', async () => { - const queue = new LaneQueue({ mode: 'steer' }); + it('followup mode keeps only the most recent pending request', async () => { + const queue = new LaneQueue({ mode: 'followup' }); let resolveFirst!: () => void; const firstBlocks = new Promise((r) => { resolveFirst = r; }); @@ -204,13 +204,32 @@ describe('LaneQueue', () => { const p2 = queue.enqueue('lane-a', async () => 'old-pending'); const p3 = queue.enqueue('lane-a', async () => 'latest-pending'); - await expect(p2).rejects.toThrow('Superseded by newer request'); + await expect(p2).rejects.toThrow('Superseded by newer follow-up request'); resolveFirst(); await expect(p1).resolves.toBe('active'); await expect(p3).resolves.toBe('latest-pending'); }); + it('steer_backlog mode replaces existing pending backlog with latest request', async () => { + const queue = new LaneQueue({ mode: 'steer_backlog' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'old-pending'); + const p3 = queue.enqueue('lane-a', async () => 'new-pending'); + + await expect(p2).rejects.toThrow('Superseded by newer request'); + resolveFirst(); + + await expect(p1).resolves.toBe('active'); + await expect(p3).resolves.toBe('new-pending'); + }); + it('drop_new overflow rejects newest request when cap is reached', async () => { const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' }); let resolveFirst!: () => void; @@ -223,7 +242,7 @@ describe('LaneQueue', () => { const p2 = queue.enqueue('lane-a', async () => 'pending-1'); const p3 = queue.enqueue('lane-a', async () => 'pending-2'); - await expect(p3).rejects.toThrow('Lane queue full (drop_new)'); + await expect(p3).rejects.toThrow('Lane queue full (drop_new): request rejected with 1 pending'); resolveFirst(); await expect(p1).resolves.toBe('active'); @@ -242,7 +261,7 @@ describe('LaneQueue', () => { const p2 = queue.enqueue('lane-a', async () => 'old-pending'); const p3 = queue.enqueue('lane-a', async () => 'new-pending'); - await expect(p2).rejects.toThrow('Lane queue overflow (drop_old)'); + await expect(p2).rejects.toThrow('Lane queue overflow (drop_old): oldest pending request dropped'); resolveFirst(); await expect(p1).resolves.toBe('active'); @@ -267,4 +286,54 @@ describe('LaneQueue', () => { await expect(p1).resolves.toBe('active'); await expect(p3).resolves.toBe('latest-pending'); }); + + it('applies debounce before starting queued work', async () => { + const queue = new LaneQueue({ debounceMs: 25 }); + const events: string[] = []; + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + events.push('active:start'); + await firstBlocks; + events.push('active:end'); + return 'active'; + }); + + const p2 = queue.enqueue('lane-a', async () => { + events.push('next:start'); + return 'next'; + }); + + resolveFirst(); + await p1; + expect(queue.isProcessing('lane-a')).toBe(true); + expect(events).toEqual(['active:start', 'active:end']); + await new Promise((r) => setTimeout(r, 40)); + await expect(p2).resolves.toBe('next'); + expect(events).toEqual(['active:start', 'active:end', 'next:start']); + }); + + it('returns structured queue rejection errors', async () => { + const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'pending'); + const p3 = queue.enqueue('lane-a', async () => 'dropped'); + + const err = await p3.catch((e) => e) as LaneQueueRejectedError; + expect(err).toBeInstanceOf(LaneQueueRejectedError); + expect(err.details.code).toBe('overflow'); + expect(err.details.overflow).toBe('drop_new'); + expect(err.details.laneId).toBe('lane-a'); + + resolveFirst(); + await expect(p1).resolves.toBe('active'); + await expect(p2).resolves.toBe('pending'); + }); }); diff --git a/src/gateway/lane-queue.ts b/src/gateway/lane-queue.ts index 3d74548..885e139 100644 --- a/src/gateway/lane-queue.ts +++ b/src/gateway/lane-queue.ts @@ -14,20 +14,56 @@ interface QueueEntry { work: () => Promise; resolve: (value: T) => void; reject: (reason: unknown) => void; + policy: LaneQueueConfig; + metadata?: LaneQueueEnqueueMetadata; } interface Lane { active: boolean; queue: QueueEntry[]; + debounceTimer?: ReturnType; } -export type LaneQueueMode = 'collect' | 'steer' | 'interrupt'; +export type LaneQueueMode = 'collect' | 'followup' | 'steer' | 'steer_backlog' | 'interrupt'; export type LaneQueueOverflow = 'drop_old' | 'drop_new'; export interface LaneQueueConfig { mode: LaneQueueMode; cap: number; overflow: LaneQueueOverflow; + debounceMs: number; + summarizeOverflow: boolean; +} + +export interface LaneQueueEnqueueMetadata { + requestId?: string; + label?: string; +} + +export interface LaneQueueEnqueueOptions { + policy?: Partial; + metadata?: LaneQueueEnqueueMetadata; +} + +export type LaneQueueRejectCode = 'superseded' | 'overflow' | 'cancelled'; + +export interface LaneQueueRejectDetails { + code: LaneQueueRejectCode; + laneId: string; + mode: LaneQueueMode; + overflow?: LaneQueueOverflow; + droppedCount?: number; + message: string; +} + +export class LaneQueueRejectedError extends Error { + readonly details: LaneQueueRejectDetails; + + constructor(details: LaneQueueRejectDetails) { + super(details.message); + this.name = 'LaneQueueRejectedError'; + this.details = details; + } } export class LaneQueue { @@ -39,6 +75,8 @@ export class LaneQueue { mode: config?.mode ?? 'collect', cap: Math.max(1, config?.cap ?? 50), overflow: config?.overflow ?? 'drop_old', + debounceMs: Math.max(0, config?.debounceMs ?? 0), + summarizeOverflow: config?.summarizeOverflow ?? true, }; } @@ -47,8 +85,13 @@ export class LaneQueue { * Returns a promise that resolves with the work's return value * once it has been executed (which may be immediately if the lane is idle). */ - async enqueue(laneId: string, work: () => Promise, policy?: Partial): Promise { - const effective = this.resolvePolicy(policy); + async enqueue( + laneId: string, + work: () => Promise, + policyOrOptions?: Partial | LaneQueueEnqueueOptions, + ): Promise { + const options = this.normalizeEnqueueOptions(policyOrOptions); + const effective = this.resolvePolicy(options.policy); let lane = this.lanes.get(laneId); if (!lane) { lane = { active: false, queue: [] }; @@ -56,7 +99,7 @@ export class LaneQueue { } // If nothing is running on this lane, execute immediately - if (!lane.active) { + if (!lane.active && !lane.debounceTimer) { lane.active = true; try { return await work(); @@ -66,17 +109,51 @@ export class LaneQueue { } } - if (effective.mode === 'steer' || effective.mode === 'interrupt') { - this.rejectPending(lane, 'Superseded by newer request'); + if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') { + this.rejectPending(laneId, lane, { + code: 'superseded', + laneId, + mode: effective.mode, + message: 'Superseded by newer request', + }); + } else if (effective.mode === 'followup' && lane.queue.length > 0) { + this.rejectPending(laneId, lane, { + code: 'superseded', + laneId, + mode: effective.mode, + message: 'Superseded by newer follow-up request', + }); } if (lane.queue.length >= effective.cap) { if (effective.overflow === 'drop_new') { - return Promise.reject(new Error('Lane queue full (drop_new)')); + return Promise.reject( + new LaneQueueRejectedError({ + code: 'overflow', + laneId, + mode: effective.mode, + overflow: 'drop_new', + droppedCount: 1, + message: effective.summarizeOverflow + ? `Lane queue full (drop_new): request rejected with ${lane.queue.length} pending` + : 'Lane queue full (drop_new)', + }), + ); } // drop_old const dropped = lane.queue.shift(); - dropped?.reject(new Error('Lane queue overflow (drop_old)')); + dropped?.reject( + new LaneQueueRejectedError({ + code: 'overflow', + laneId, + mode: effective.mode, + overflow: 'drop_old', + droppedCount: 1, + message: effective.summarizeOverflow + ? 'Lane queue overflow (drop_old): oldest pending request dropped' + : 'Lane queue overflow (drop_old)', + }), + ); } // Otherwise, queue the work and return a deferred promise @@ -85,13 +162,16 @@ export class LaneQueue { work: work as () => Promise, resolve: resolve as (value: unknown) => void, reject, + policy: effective, + metadata: options.metadata, }); }); } /** Check if a lane currently has active work executing. */ isProcessing(laneId: string): boolean { - return this.lanes.get(laneId)?.active ?? false; + const lane = this.lanes.get(laneId); + return (lane?.active ?? false) || Boolean(lane?.debounceTimer); } /** Get the number of pending (not yet started) items in a lane. */ @@ -117,18 +197,28 @@ export class LaneQueue { const lane = this.lanes.get(laneId); if (!lane) {return;} - this.rejectPending(lane, 'Lane cancelled'); + if (lane.debounceTimer) { + clearTimeout(lane.debounceTimer); + lane.debounceTimer = undefined; + } + + this.rejectPending(laneId, lane, { + code: 'cancelled', + laneId, + mode: this.config.mode, + message: 'Lane cancelled', + }); // Clean up empty idle lanes - if (!lane.active && lane.queue.length === 0) { + if (!lane.active && lane.queue.length === 0 && !lane.debounceTimer) { this.lanes.delete(laneId); } } - private rejectPending(lane: Lane, reason: string): void { + private rejectPending(laneId: string, lane: Lane, details: LaneQueueRejectDetails): void { const pending = lane.queue.splice(0); for (const entry of pending) { - entry.reject(new Error(reason)); + entry.reject(new LaneQueueRejectedError({ ...details, laneId, mode: entry.policy.mode })); } } @@ -137,6 +227,8 @@ export class LaneQueue { mode: policy?.mode ?? this.config.mode, cap: Math.max(1, policy?.cap ?? this.config.cap), overflow: policy?.overflow ?? this.config.overflow, + debounceMs: Math.max(0, policy?.debounceMs ?? this.config.debounceMs), + summarizeOverflow: policy?.summarizeOverflow ?? this.config.summarizeOverflow, }; } @@ -144,10 +236,28 @@ export class LaneQueue { * Process the next queued entry for a lane (called after current work finishes). * Runs asynchronously so the caller's finally block completes first. */ - private processNext(laneId: string): void { + private processNext(laneId: string, skipDebounce = false): void { const lane = this.lanes.get(laneId); if (!lane) {return;} + if (lane.active || lane.debounceTimer) { + return; + } + + const next = lane.queue[0]; + if (!next) { + this.lanes.delete(laneId); + return; + } + + if (!skipDebounce && next.policy.debounceMs > 0) { + lane.debounceTimer = setTimeout(() => { + lane.debounceTimer = undefined; + this.processNext(laneId, true); + }, next.policy.debounceMs); + return; + } + const entry = lane.queue.shift(); if (!entry) { // Lane is empty — clean up @@ -164,4 +274,20 @@ export class LaneQueue { this.processNext(laneId); }); } + + private normalizeEnqueueOptions( + policyOrOptions?: Partial | LaneQueueEnqueueOptions, + ): LaneQueueEnqueueOptions { + if (!policyOrOptions) { + return {}; + } + + if ('policy' in policyOrOptions || 'metadata' in policyOrOptions) { + return policyOrOptions as LaneQueueEnqueueOptions; + } + + return { + policy: policyOrOptions as Partial, + }; + } } diff --git a/src/gateway/protocol.ts b/src/gateway/protocol.ts index 7f124b8..7864b5a 100644 --- a/src/gateway/protocol.ts +++ b/src/gateway/protocol.ts @@ -85,6 +85,7 @@ export interface DoneEventData { export interface ErrorEventData { code: ErrorCode; message: string; + queue?: Record; } // ── Error codes ──────────────────────────────────────────────── diff --git a/src/gateway/server.ts b/src/gateway/server.ts index f2ebcd8..d9a75ee 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -205,14 +205,54 @@ export class GatewayServer { sessionBridge: this.sessionBridge, laneQueue: this.laneQueue, resolveQueuePolicy: ({ sessionId, channel }) => { - const sessionPolicy = sessionId - ? this.config.queue?.overrides?.sessions?.[sessionId] - : undefined; - if (sessionPolicy) { - return sessionPolicy; + const resolved: Partial = {}; + const channelPolicy = this.config.queue?.overrides?.channels?.[channel]; + if (channelPolicy) { + Object.assign(resolved, channelPolicy); } - return this.config.queue?.overrides?.channels?.[channel]; + const configuredSessionPolicy = sessionId + ? this.config.queue?.overrides?.sessions?.[sessionId] + : undefined; + if (configuredSessionPolicy) { + Object.assign(resolved, configuredSessionPolicy); + } + + if (sessionId) { + const runtimeMode = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode'); + const runtimeCap = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.cap'); + const runtimeOverflow = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.overflow'); + const runtimeDebounce = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.debounce_ms'); + const runtimeSummarize = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.summarize_overflow'); + + if (runtimeMode && ['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(runtimeMode)) { + resolved.mode = runtimeMode as LaneQueueConfig['mode']; + } + + if (runtimeCap) { + const cap = Number.parseInt(runtimeCap, 10); + if (Number.isFinite(cap) && cap >= 1 && cap <= 1000) { + resolved.cap = cap; + } + } + + if (runtimeOverflow && (runtimeOverflow === 'drop_old' || runtimeOverflow === 'drop_new')) { + resolved.overflow = runtimeOverflow; + } + + if (runtimeDebounce) { + const debounceMs = Number.parseInt(runtimeDebounce, 10); + if (Number.isFinite(debounceMs) && debounceMs >= 0 && debounceMs <= 60_000) { + resolved.debounceMs = debounceMs; + } + } + + if (runtimeSummarize === 'true' || runtimeSummarize === 'false') { + resolved.summarizeOverflow = runtimeSummarize === 'true'; + } + } + + return resolved; }, metrics: this.metrics, sessionManager: this.config.sessionManager,