diff --git a/README.md b/README.md index 1eea554..e5717c9 100644 --- a/README.md +++ b/README.md @@ -797,6 +797,24 @@ server: violation_window_ms: 10000 ``` +## Gateway Lane Queue Policy + +Per-session FIFO queue policy for concurrent gateway requests (`agent.send`). + +```yaml +server: + queue: + mode: collect # collect | steer | interrupt + cap: 50 # max pending requests per session lane + overflow: drop_old # drop_old | drop_new +``` + +Notes: +- `collect` keeps all queued requests (subject to `cap`). +- `steer` and `interrupt` keep only the latest pending request while one is active. +- `interrupt` currently does not force-stop already running work; use `agent.cancel` for active cancellation. +- On overflow, `drop_old` evicts the oldest pending request, `drop_new` rejects the new request. + ## Gateway Request Body Limit Cap inbound HTTP POST body size (webhooks and Gmail push) to reduce memory-DoS risk. diff --git a/config/default.yaml b/config/default.yaml index 478d814..2c0ca8d 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -60,6 +60,11 @@ server: refill_per_sec: 15 max_violations: 8 violation_window_ms: 10000 + # Per-session FIFO lane queue for gateway requests. + queue: + mode: collect # collect | steer | interrupt + cap: 50 # max queued (pending) requests per session lane + overflow: drop_old # drop_old | drop_new # Local-network service discovery (mDNS/Bonjour). Keep disabled by default. # Requires server.localhost: false so LAN clients can actually connect. discovery: diff --git a/docs/plans/state.json b/docs/plans/state.json index ace713f..23184f7 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -13,6 +13,23 @@ "date": "2026-02-15", "summary": "Added docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md to document how gateway connectionIds map to durable sessionIds, how per-session FIFO lane queueing works, and how agent.cancel behaves." }, + "gateway-lane-queue-policy-phase1": { + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Added configurable gateway lane queue policy (`server.queue`) with mode (`collect|steer|interrupt`), per-lane cap, and overflow behavior (`drop_old|drop_new`). Wired through daemon -> gateway runtime, expanded LaneQueue behavior/tests, and documented the new config in README + default config.", + "files_modified": [ + "src/gateway/lane-queue.ts", + "src/gateway/lane-queue.test.ts", + "src/gateway/server.ts", + "src/daemon/services.ts", + "src/config/schema.ts", + "src/config/schema.test.ts", + "config/default.yaml", + "README.md" + ], + "test_status": "pnpm test:run src/gateway/lane-queue.test.ts src/config/schema.test.ts + pnpm typecheck passing" + }, "docs-gateway-auth-config-keys": { "status": "completed", "date": "2026-02-15", diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index bf384cf..89f014e 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -75,6 +75,29 @@ describe('configSchema — server', () => { expect(result.server.ws_rate_limit.violation_window_ms).toBe(2000); }); + it('defaults queue settings', () => { + const result = configSchema.parse(minimalConfig); + expect(result.server.queue.mode).toBe('collect'); + expect(result.server.queue.cap).toBe(50); + expect(result.server.queue.overflow).toBe('drop_old'); + }); + + it('accepts custom queue settings', () => { + const result = configSchema.parse({ + ...minimalConfig, + server: { + queue: { + mode: 'steer', + cap: 10, + overflow: 'drop_new', + }, + }, + }); + expect(result.server.queue.mode).toBe('steer'); + expect(result.server.queue.cap).toBe(10); + expect(result.server.queue.overflow).toBe('drop_new'); + }); + it('defaults discovery settings', () => { const result = configSchema.parse(minimalConfig); expect(result.server.discovery.enabled).toBe(false); diff --git a/src/config/schema.ts b/src/config/schema.ts index 8844eba..586d9f0 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -32,6 +32,15 @@ const wsRateLimitSchema = z.object({ violation_window_ms: z.number().min(1000).max(60000).default(10000), }).default({}); +const laneQueueSchema = z.object({ + /** Queue behavior for concurrent requests in the same session lane. */ + mode: z.enum(['collect', 'steer', 'interrupt']).default('collect'), + /** Max queued (pending) requests per lane. */ + cap: z.number().min(1).max(1000).default(50), + /** Overflow strategy when cap is reached. */ + overflow: z.enum(['drop_old', 'drop_new']).default('drop_old'), +}).default({}); + const serverDiscoverySchema = z.object({ /** Enable local-network service discovery (mDNS/Bonjour advertisement). */ enabled: z.boolean().default(false), @@ -59,6 +68,8 @@ const serverSchema = z.object({ max_request_body_bytes: z.number().min(1024).max(10 * 1024 * 1024).default(1_048_576), /** Per-connection WebSocket ingress rate limit settings. */ ws_rate_limit: wsRateLimitSchema, + /** Per-session gateway lane queue behavior. */ + queue: laneQueueSchema, /** Optional Bonjour/mDNS advertisement settings. */ discovery: serverDiscoverySchema, }); diff --git a/src/daemon/services.ts b/src/daemon/services.ts index 72ca4f9..98b418d 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -321,6 +321,11 @@ export function createGateway(deps: GatewayDeps): GatewayServer { maxViolations: config.server.ws_rate_limit.max_violations, violationWindowMs: config.server.ws_rate_limit.violation_window_ms, }, + queue: { + mode: config.server.queue.mode, + cap: config.server.queue.cap, + overflow: config.server.queue.overflow, + }, discovery: { enabled: config.server.discovery.enabled, serviceName: config.server.discovery.service_name, diff --git a/src/gateway/lane-queue.test.ts b/src/gateway/lane-queue.test.ts index e91f834..ddd5d0c 100644 --- a/src/gateway/lane-queue.test.ts +++ b/src/gateway/lane-queue.test.ts @@ -191,4 +191,61 @@ describe('LaneQueue', () => { const r2 = await queue.enqueue('lane-a', async () => 'second'); expect(r2).toBe('second'); }); + + it('steer mode keeps only the most recent pending request', async () => { + const queue = new LaneQueue({ mode: 'steer' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'old-pending'); + const p3 = queue.enqueue('lane-a', async () => 'latest-pending'); + + await expect(p2).rejects.toThrow('Superseded by newer request'); + resolveFirst(); + + await expect(p1).resolves.toBe('active'); + await expect(p3).resolves.toBe('latest-pending'); + }); + + it('drop_new overflow rejects newest request when cap is reached', async () => { + const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'pending-1'); + const p3 = queue.enqueue('lane-a', async () => 'pending-2'); + + await expect(p3).rejects.toThrow('Lane queue full (drop_new)'); + resolveFirst(); + + await expect(p1).resolves.toBe('active'); + await expect(p2).resolves.toBe('pending-1'); + }); + + it('drop_old overflow evicts oldest pending request when cap is reached', async () => { + const queue = new LaneQueue({ cap: 1, overflow: 'drop_old' }); + let resolveFirst!: () => void; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const p1 = queue.enqueue('lane-a', async () => { + await firstBlocks; + return 'active'; + }); + const p2 = queue.enqueue('lane-a', async () => 'old-pending'); + const p3 = queue.enqueue('lane-a', async () => 'new-pending'); + + await expect(p2).rejects.toThrow('Lane queue overflow (drop_old)'); + resolveFirst(); + + await expect(p1).resolves.toBe('active'); + await expect(p3).resolves.toBe('new-pending'); + }); }); diff --git a/src/gateway/lane-queue.ts b/src/gateway/lane-queue.ts index 69a36ed..d2cf093 100644 --- a/src/gateway/lane-queue.ts +++ b/src/gateway/lane-queue.ts @@ -21,8 +21,26 @@ interface Lane { queue: QueueEntry[]; } +export type LaneQueueMode = 'collect' | 'steer' | 'interrupt'; +export type LaneQueueOverflow = 'drop_old' | 'drop_new'; + +export interface LaneQueueConfig { + mode: LaneQueueMode; + cap: number; + overflow: LaneQueueOverflow; +} + export class LaneQueue { private lanes: Map = new Map(); + private config: LaneQueueConfig; + + constructor(config?: Partial) { + this.config = { + mode: config?.mode ?? 'collect', + cap: Math.max(1, config?.cap ?? 50), + overflow: config?.overflow ?? 'drop_old', + }; + } /** * Enqueue a unit of work for the given lane. @@ -47,6 +65,19 @@ export class LaneQueue { } } + if (this.config.mode === 'steer' || this.config.mode === 'interrupt') { + this.rejectPending(lane, 'Superseded by newer request'); + } + + if (lane.queue.length >= this.config.cap) { + if (this.config.overflow === 'drop_new') { + return Promise.reject(new Error('Lane queue full (drop_new)')); + } + // drop_old + const dropped = lane.queue.shift(); + dropped?.reject(new Error('Lane queue overflow (drop_old)')); + } + // Otherwise, queue the work and return a deferred promise return new Promise((resolve, reject) => { lane.queue.push({ @@ -85,10 +116,7 @@ export class LaneQueue { const lane = this.lanes.get(laneId); if (!lane) {return;} - const pending = lane.queue.splice(0); - for (const entry of pending) { - entry.reject(new Error('Lane cancelled')); - } + this.rejectPending(lane, 'Lane cancelled'); // Clean up empty idle lanes if (!lane.active && lane.queue.length === 0) { @@ -96,6 +124,13 @@ export class LaneQueue { } } + private rejectPending(lane: Lane, reason: string): void { + const pending = lane.queue.splice(0); + for (const entry of pending) { + entry.reject(new Error(reason)); + } + } + /** * Process the next queued entry for a lane (called after current work finishes). * Runs asynchronously so the caller's finally block completes first. diff --git a/src/gateway/server.ts b/src/gateway/server.ts index fafafb5..259ba21 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -6,6 +6,7 @@ import { serveStatic } from './static.js'; import { SessionBridge } from './session-bridge.js'; import type { SessionBridgeConfig } from './session-bridge.js'; import { LaneQueue } from './lane-queue.js'; +import type { LaneQueueConfig } from './lane-queue.js'; import { MetricsCollector } from './metrics.js'; import { authenticateRequest } from './auth.js'; import type { AuthConfig } from './auth.js'; @@ -84,6 +85,7 @@ export interface GatewayServerConfig { maxViolations?: number; violationWindowMs?: number; }; + queue?: Partial; /** Optional pairing manager for DM pairing code management via gateway. */ pairingManager?: PairingManager; memoryStore?: MemoryStore; @@ -143,7 +145,7 @@ export class GatewayServer { memoryStore: config.memoryStore, }); - this.laneQueue = new LaneQueue(); + this.laneQueue = new LaneQueue(config.queue); this.metrics = new MetricsCollector({ getQueueDepth: () => this.laneQueue.totalPending(), });