From fbd24d43793e8d00ecb705ccf4cb218512773ef8 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 16 Feb 2026 11:51:26 -0800 Subject: [PATCH] feat(gateway): support per-channel and per-session queue policy overrides --- README.md | 10 +++++ config/default.yaml | 3 ++ docs/plans/state.json | 8 ++-- src/config/schema.test.ts | 24 ++++++++++++ src/config/schema.ts | 19 ++++++++++ src/daemon/services.ts | 4 ++ src/gateway/handlers/agent.test.ts | 59 ++++++++++++++++++++++++++++++ src/gateway/handlers/agent.ts | 10 ++++- src/gateway/lane-queue.test.ts | 19 ++++++++++ src/gateway/lane-queue.ts | 17 +++++++-- src/gateway/server.ts | 17 ++++++++- 11 files changed, 181 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index e5717c9..e02fbb9 100644 --- a/README.md +++ b/README.md @@ -807,6 +807,15 @@ server: mode: collect # collect | steer | interrupt cap: 50 # max pending requests per session lane overflow: drop_old # drop_old | drop_new + overrides: + channels: + ws: + mode: steer + cap: 10 + sessions: + ws:vip-user: + mode: interrupt + overflow: drop_new ``` Notes: @@ -814,6 +823,7 @@ Notes: - `steer` and `interrupt` keep only the latest pending request while one is active. - `interrupt` currently does not force-stop already running work; use `agent.cancel` for active cancellation. - On overflow, `drop_old` evicts the oldest pending request, `drop_new` rejects the new request. +- Override precedence: exact `sessions` match first, then `channels`. ## Gateway Request Body Limit diff --git a/config/default.yaml b/config/default.yaml index 2c0ca8d..a805212 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -65,6 +65,9 @@ server: mode: collect # collect | steer | interrupt cap: 50 # max queued (pending) requests per session lane overflow: drop_old # drop_old | drop_new + overrides: + channels: {} # e.g. ws: { mode: steer, cap: 10 } + sessions: {} # e.g. ws:vip-user: { mode: interrupt, overflow: drop_new } # Local-network service discovery (mDNS/Bonjour). Keep disabled by default. # Requires server.localhost: false so LAN clients can actually connect. discovery: diff --git a/docs/plans/state.json b/docs/plans/state.json index 23184f7..146adff 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -17,7 +17,7 @@ "status": "completed", "date": "2026-02-16", "updated": "2026-02-16", - "summary": "Added configurable gateway lane queue policy (`server.queue`) with mode (`collect|steer|interrupt`), per-lane cap, and overflow behavior (`drop_old|drop_new`). Wired through daemon -> gateway runtime, expanded LaneQueue behavior/tests, and documented the new config in README + default config.", + "summary": "Added configurable gateway lane queue policy (`server.queue`) with mode (`collect|steer|interrupt`), per-lane cap, overflow behavior (`drop_old|drop_new`), and per-channel/per-session overrides. Wired through daemon -> gateway runtime, applied override resolution in `agent.send`, expanded queue + handler tests, and documented config usage in README + default config.", "files_modified": [ "src/gateway/lane-queue.ts", "src/gateway/lane-queue.test.ts", @@ -26,9 +26,11 @@ "src/config/schema.ts", "src/config/schema.test.ts", "config/default.yaml", - "README.md" + "README.md", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts" ], - "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/config/schema.test.ts + pnpm typecheck passing" + "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/gateway/handlers/agent.test.ts src/config/schema.test.ts + pnpm typecheck passing" }, "docs-gateway-auth-config-keys": { "status": "completed", diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 89f014e..1e3832c 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -80,6 +80,8 @@ describe('configSchema — server', () => { expect(result.server.queue.mode).toBe('collect'); expect(result.server.queue.cap).toBe(50); expect(result.server.queue.overflow).toBe('drop_old'); + expect(result.server.queue.overrides.channels).toEqual({}); + expect(result.server.queue.overrides.sessions).toEqual({}); }); it('accepts custom queue settings', () => { @@ -98,6 +100,28 @@ describe('configSchema — server', () => { expect(result.server.queue.overflow).toBe('drop_new'); }); + it('accepts queue override settings', () => { + const result = configSchema.parse({ + ...minimalConfig, + server: { + queue: { + overrides: { + channels: { + ws: { mode: 'collect', cap: 5 }, + }, + sessions: { + 'ws:vip-user': { mode: 'interrupt', overflow: 'drop_new' }, + }, + }, + }, + }, + }); + expect(result.server.queue.overrides.channels.ws.mode).toBe('collect'); + expect(result.server.queue.overrides.channels.ws.cap).toBe(5); + expect(result.server.queue.overrides.sessions['ws:vip-user'].mode).toBe('interrupt'); + expect(result.server.queue.overrides.sessions['ws:vip-user'].overflow).toBe('drop_new'); + }); + it('defaults discovery settings', () => { const result = configSchema.parse(minimalConfig); expect(result.server.discovery.enabled).toBe(false); diff --git a/src/config/schema.ts b/src/config/schema.ts index 586d9f0..de616e7 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -39,6 +39,25 @@ const laneQueueSchema = z.object({ cap: z.number().min(1).max(1000).default(50), /** Overflow strategy when cap is reached. */ overflow: z.enum(['drop_old', 'drop_new']).default('drop_old'), + /** Optional per-channel/per-session queue policy overrides. */ + overrides: z.object({ + channels: z.record( + z.string(), + z.object({ + mode: z.enum(['collect', 'steer', 'interrupt']).optional(), + cap: z.number().min(1).max(1000).optional(), + overflow: z.enum(['drop_old', 'drop_new']).optional(), + }), + ).default({}), + sessions: z.record( + z.string(), + z.object({ + mode: z.enum(['collect', 'steer', 'interrupt']).optional(), + cap: z.number().min(1).max(1000).optional(), + overflow: z.enum(['drop_old', 'drop_new']).optional(), + }), + ).default({}), + }).default({}), }).default({}); const serverDiscoverySchema = z.object({ diff --git a/src/daemon/services.ts b/src/daemon/services.ts index 98b418d..aaa378a 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -325,6 +325,10 @@ export function createGateway(deps: GatewayDeps): GatewayServer { mode: config.server.queue.mode, cap: config.server.queue.cap, overflow: config.server.queue.overflow, + overrides: { + channels: config.server.queue.overrides.channels, + sessions: config.server.queue.overrides.sessions, + }, }, discovery: { enabled: config.server.discovery.enabled, diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 50e8239..3ac22e8 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -124,3 +124,62 @@ describe('createAgentHandlers command fast-path', () => { expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response'); }); }); + +describe('createAgentHandlers queue policy resolution', () => { + it('passes resolved per-request queue policy into lane enqueue', async () => { + const mockAgent = { + process: vi.fn(async () => 'ok'), + getUsage: vi.fn(() => ({ + primary: { inputTokens: 0, outputTokens: 0, calls: 0 }, + delegation: {}, + total: { inputTokens: 0, outputTokens: 0, calls: 0, estimatedCost: 0 }, + })), + getModelTier: vi.fn(() => 'default'), + setModelTier: vi.fn(), + compact: vi.fn(async () => null), + reset: vi.fn(), + setOnToolUse: vi.fn(), + }; + + const sessionBridge = { + getAgent: vi.fn(() => mockAgent), + getSessionId: vi.fn(() => 'ws:s1'), + setBusy: vi.fn(), + setOnToolUse: vi.fn(), + isBusy: vi.fn(() => false), + }; + + const laneQueue = { + enqueue: vi.fn(async (_laneId: string, work: () => Promise) => work()), + cancel: vi.fn(), + } as unknown as LaneQueue; + + const resolveQueuePolicy = vi.fn(() => ({ mode: 'steer' as const, cap: 3 })); + + const handlers = createAgentHandlers({ + sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'], + laneQueue, + resolveQueuePolicy, + }); + + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + + await handlers['agent.send']({ + id: 1, + method: 'agent.send', + params: { message: 'hello', connectionId: 'conn-1' }, + }, send); + + expect(resolveQueuePolicy).toHaveBeenCalledWith({ + laneId: 'ws:s1', + sessionId: 'ws:s1', + connectionId: 'conn-1', + channel: 'ws', + }); + expect((laneQueue.enqueue as unknown as ReturnType).mock.calls[0][2]).toEqual({ + mode: 'steer', + cap: 3, + }); + }); +}); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 9a21da8..da1c915 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -3,6 +3,7 @@ import type { SendFn } from '../router.js'; import { makeEvent, makeError, ErrorCode } from '../protocol.js'; import type { SessionBridge } from '../session-bridge.js'; import type { LaneQueue } from '../lane-queue.js'; +import type { LaneQueueConfig } from '../lane-queue.js'; import type { MetricsCollector } from '../metrics.js'; import type { Attachment } from '../../channels/types.js'; import type { SessionManager } from '../../session/manager.js'; @@ -14,6 +15,12 @@ import { randomUUID } from 'crypto'; export interface AgentHandlerDeps { sessionBridge: SessionBridge; laneQueue: LaneQueue; + resolveQueuePolicy?: (ctx: { + laneId: string; + sessionId?: string; + connectionId: string; + channel: string; + }) => Partial | undefined; metrics?: MetricsCollector; sessionManager?: SessionManager; commandRegistry?: CommandRegistry; @@ -48,6 +55,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { // Falls back to connectionId if session lookup fails (shouldn't happen). const sessionId = deps.sessionBridge.getSessionId(connectionId); const laneId = sessionId ?? connectionId; + const channel = sessionId?.split(':', 1)[0] ?? 'ws'; // Enqueue the work — if the lane is idle it runs immediately, // otherwise it waits for earlier requests on the same session to finish. @@ -312,7 +320,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { deps.sessionBridge.setOnToolUse(connectionId, undefined); deps.metrics?.endRequest(requestId); } - }); + }, deps.resolveQueuePolicy?.({ laneId, sessionId, connectionId, channel })); }, 'agent.cancel': async (request: GatewayRequest): Promise => { diff --git a/src/gateway/lane-queue.test.ts b/src/gateway/lane-queue.test.ts index ddd5d0c..bf0cdd6 100644 --- a/src/gateway/lane-queue.test.ts +++ b/src/gateway/lane-queue.test.ts @@ -248,4 +248,23 @@ describe('LaneQueue', () => { await expect(p1).resolves.toBe('active'); await expect(p3).resolves.toBe('new-pending'); }); + + it('supports per-enqueue policy overrides', async () => { + const queue = new LaneQueue({ mode: 'collect', cap: 10, overflow: 'drop_old' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'old-pending', { mode: 'steer' }); + const p3 = queue.enqueue('lane-a', async () => 'latest-pending', { mode: 'steer' }); + + await expect(p2).rejects.toThrow('Superseded by newer request'); + resolveFirst(); + + await expect(p1).resolves.toBe('active'); + await expect(p3).resolves.toBe('latest-pending'); + }); }); diff --git a/src/gateway/lane-queue.ts b/src/gateway/lane-queue.ts index d2cf093..3d74548 100644 --- a/src/gateway/lane-queue.ts +++ b/src/gateway/lane-queue.ts @@ -47,7 +47,8 @@ export class LaneQueue { * Returns a promise that resolves with the work's return value * once it has been executed (which may be immediately if the lane is idle). */ - async enqueue(laneId: string, work: () => Promise): Promise { + async enqueue(laneId: string, work: () => Promise, policy?: Partial): Promise { + const effective = this.resolvePolicy(policy); let lane = this.lanes.get(laneId); if (!lane) { lane = { active: false, queue: [] }; @@ -65,12 +66,12 @@ export class LaneQueue { } } - if (this.config.mode === 'steer' || this.config.mode === 'interrupt') { + if (effective.mode === 'steer' || effective.mode === 'interrupt') { this.rejectPending(lane, 'Superseded by newer request'); } - if (lane.queue.length >= this.config.cap) { - if (this.config.overflow === 'drop_new') { + if (lane.queue.length >= effective.cap) { + if (effective.overflow === 'drop_new') { return Promise.reject(new Error('Lane queue full (drop_new)')); } // drop_old @@ -131,6 +132,14 @@ export class LaneQueue { } } + private resolvePolicy(policy?: Partial): LaneQueueConfig { + return { + mode: policy?.mode ?? this.config.mode, + cap: Math.max(1, policy?.cap ?? this.config.cap), + overflow: policy?.overflow ?? this.config.overflow, + }; + } + /** * Process the next queued entry for a lane (called after current work finishes). * Runs asynchronously so the caller's finally block completes first. diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 259ba21..f2ebcd8 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -85,7 +85,12 @@ export interface GatewayServerConfig { maxViolations?: number; violationWindowMs?: number; }; - queue?: Partial; + queue?: Partial & { + overrides?: { + channels?: Record>; + sessions?: Record>; + }; + }; /** Optional pairing manager for DM pairing code management via gateway. */ pairingManager?: PairingManager; memoryStore?: MemoryStore; @@ -199,6 +204,16 @@ export class GatewayServer { const agentHandlers = createAgentHandlers({ sessionBridge: this.sessionBridge, laneQueue: this.laneQueue, + resolveQueuePolicy: ({ sessionId, channel }) => { + const sessionPolicy = sessionId + ? this.config.queue?.overrides?.sessions?.[sessionId] + : undefined; + if (sessionPolicy) { + return sessionPolicy; + } + + return this.config.queue?.overrides?.channels?.[channel]; + }, metrics: this.metrics, sessionManager: this.config.sessionManager, commandRegistry: this.config.commandRegistry,