From 63d645bd8723ed7ea808d40deb0cbf9b0f7affb5 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Sun, 15 Feb 2026 21:56:13 -0800 Subject: [PATCH] feat(gateway): add websocket ingress rate limiting --- README.md | 14 +++ config/default.yaml | 6 ++ docs/deployment/PRODUCTION.md | 6 ++ .../2026-02-16-codebase-audit-report.md | 1 + docs/plans/state.json | 18 ++++ src/config/schema.test.ts | 28 ++++++ src/config/schema.ts | 10 +++ src/daemon/services.ts | 7 ++ src/gateway/server.test.ts | 74 ++++++++++++++++ src/gateway/server.ts | 85 +++++++++++++++++++ 10 files changed, 249 insertions(+) diff --git a/README.md b/README.md index 6c66416..f00dd02 100644 --- a/README.md +++ b/README.md @@ -686,6 +686,20 @@ server: The web UI detects the locked state and disables auto-reconnect when rejected. +## Gateway WebSocket Rate Limit + +Per-connection ingress throttling for WebSocket requests. Excess bursts are rejected; repeated violations close the connection (`4008`). + +```yaml +server: + ws_rate_limit: + enabled: true + capacity: 30 + refill_per_sec: 15 + max_violations: 8 + violation_window_ms: 10000 +``` + ## Gateway Request Body Limit Cap inbound HTTP POST body size (webhooks and Gmail push) to reduce memory-DoS risk. diff --git a/config/default.yaml b/config/default.yaml index cec82e4..b6c9028 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -18,6 +18,12 @@ server: port: 18800 # Maximum inbound HTTP request body size (bytes) for webhooks/Gmail push. max_request_body_bytes: 1048576 + ws_rate_limit: + enabled: true + capacity: 30 + refill_per_sec: 15 + max_violations: 8 + violation_window_ms: 10000 models: # ── Model tiers ──────────────────────────────────────────────────── diff --git a/docs/deployment/PRODUCTION.md b/docs/deployment/PRODUCTION.md index 749d56e..9f71c60 100644 --- a/docs/deployment/PRODUCTION.md +++ b/docs/deployment/PRODUCTION.md @@ -249,6 +249,12 @@ server: auth_http: true lock: false max_request_body_bytes: 1048576 + ws_rate_limit: + enabled: true + capacity: 30 + refill_per_sec: 15 + max_violations: 8 + violation_window_ms: 10000 ``` Generate a secure token: diff --git a/docs/plans/analysis/2026-02-16-codebase-audit-report.md b/docs/plans/analysis/2026-02-16-codebase-audit-report.md index c67d330..8502ad0 100644 --- a/docs/plans/analysis/2026-02-16-codebase-audit-report.md +++ b/docs/plans/analysis/2026-02-16-codebase-audit-report.md @@ -12,6 +12,7 @@ Scope: Production-risk-first audit of bugs, code improvements, and feature oppor - ✅ F-005 addressed: ESLint JS globals now include `FileReader`, removing UI false-positive lint failures for attachment handling code. - ✅ F-010 addressed: `session.compact` audit events now emit actual message counts for `messages_before/messages_after` (tokens remain in token fields). - ✅ F-012 addressed: synthetic repeated-tool nudge no longer emits invalid `tool_result.tool_use_id`; nudge is injected as plain user text guidance. +- ✅ F-009 addressed: gateway now enforces per-connection WebSocket ingress rate limits with deterministic throttle errors and close-on-repeated-violation behavior. ## Executive Summary diff --git a/docs/plans/state.json b/docs/plans/state.json index 4265ebc..f28ffe2 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -2478,6 +2478,24 @@ "docs/plans/analysis/2026-02-16-codebase-audit-report.md" ], "test_status": "pnpm test:run src/backends/native/agent.test.ts src/backends/native/orchestrator.test.ts + pnpm typecheck passing" + }, + "audit-followup-ws-rate-limiting": { + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Implemented gateway WebSocket ingress rate limiting (token bucket per connection) with deterministic throttle errors and close-on-repeated-violation enforcement; added schema/config wiring and tests.", + "files_modified": [ + "src/gateway/server.ts", + "src/gateway/server.test.ts", + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/daemon/services.ts", + "config/default.yaml", + "README.md", + "docs/deployment/PRODUCTION.md", + "docs/plans/analysis/2026-02-16-codebase-audit-report.md" + ], + "test_status": "pnpm test:run src/gateway/server.test.ts src/config/schema.test.ts + pnpm typecheck passing" } }, "overall_progress": { diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index c5cbfe8..547f408 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -46,6 +46,34 @@ describe('configSchema — server', () => { }); expect(result.server.max_request_body_bytes).toBe(2048); }); + + it('defaults ws_rate_limit settings', () => { + const result = configSchema.parse(minimalConfig); + expect(result.server.ws_rate_limit.enabled).toBe(true); + expect(result.server.ws_rate_limit.capacity).toBe(30); + expect(result.server.ws_rate_limit.refill_per_sec).toBe(15); + expect(result.server.ws_rate_limit.max_violations).toBe(8); + expect(result.server.ws_rate_limit.violation_window_ms).toBe(10_000); + }); + + it('accepts custom ws_rate_limit settings', () => { + const result = configSchema.parse({ + ...minimalConfig, + server: { + ws_rate_limit: { + enabled: true, + capacity: 5, + refill_per_sec: 2, + max_violations: 3, + violation_window_ms: 2000, + }, + }, + }); + expect(result.server.ws_rate_limit.capacity).toBe(5); + expect(result.server.ws_rate_limit.refill_per_sec).toBe(2); + expect(result.server.ws_rate_limit.max_violations).toBe(3); + expect(result.server.ws_rate_limit.violation_window_ms).toBe(2000); + }); }); describe('configSchema — agent_configs', () => { diff --git a/src/config/schema.ts b/src/config/schema.ts index 6ceb4f3..9683ed3 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -24,6 +24,14 @@ const pairingSchema = z.object({ code_length: z.number().default(6), }).default({}); +const wsRateLimitSchema = z.object({ + enabled: z.boolean().default(true), + capacity: z.number().min(1).max(1000).default(30), + refill_per_sec: z.number().min(1).max(1000).default(15), + max_violations: z.number().min(1).max(100).default(8), + violation_window_ms: z.number().min(1000).max(60000).default(10000), +}).default({}); + const serverSchema = z.object({ tailscale: tailscaleSchema, localhost: z.boolean().default(true), @@ -38,6 +46,8 @@ const serverSchema = z.object({ lock: z.boolean().default(false), /** Maximum size (bytes) for inbound HTTP request bodies (webhooks/Gmail push). */ max_request_body_bytes: z.number().min(1024).max(10 * 1024 * 1024).default(1_048_576), + /** Per-connection WebSocket ingress rate limit settings. */ + ws_rate_limit: wsRateLimitSchema, }); /** All supported model provider identifiers. Used by the config schema and TUI autocompletion. */ diff --git a/src/daemon/services.ts b/src/daemon/services.ts index 58875e5..0c93263 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -312,6 +312,13 @@ export function createGateway(deps: GatewayDeps): GatewayServer { authHttp: config.server.auth_http, lock: config.server.lock, maxRequestBodyBytes: config.server.max_request_body_bytes, + wsRateLimit: { + enabled: config.server.ws_rate_limit.enabled, + capacity: config.server.ws_rate_limit.capacity, + refillPerSec: config.server.ws_rate_limit.refill_per_sec, + maxViolations: config.server.ws_rate_limit.max_violations, + violationWindowMs: config.server.ws_rate_limit.violation_window_ms, + }, commandRegistry: deps.commandRegistry, intentRegistry: deps.intentRegistry, routingPolicy: deps.routingPolicy, diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 332e756..e2fe7c7 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -503,3 +503,77 @@ describe('GatewayServer request body limits', () => { expect(gmailHandler.handlePushNotification).not.toHaveBeenCalled(); }); }); + +describe('GatewayServer WebSocket ingress rate limiting', () => { + const RATE_PORT = 18895; + let rateServer: GatewayServer; + + beforeAll(async () => { + if (!LISTEN_ALLOWED) { + return; + } + rateServer = new GatewayServer({ + port: RATE_PORT, + sessionManager: mockSessionManager as unknown as GatewayServerConfig['sessionManager'], + modelClient: mockModelClient, + systemPrompt: 'Test prompt', + toolRegistry: mockToolRegistry as unknown as GatewayServerConfig['toolRegistry'], + toolExecutor: mockToolExecutor as unknown as GatewayServerConfig['toolExecutor'], + uiDir: resolve(import.meta.dirname, 'ui'), + wsRateLimit: { + enabled: true, + capacity: 2, + refillPerSec: 1, + maxViolations: 3, + violationWindowMs: 10_000, + }, + }); + await rateServer.start(); + }); + + afterAll(async () => { + if (!LISTEN_ALLOWED) { + return; + } + await rateServer.stop(); + }); + + it('throttles bursts and closes repeated offenders', async () => { + if (!LISTEN_ALLOWED) { + return; + } + + const ws = await new Promise((resolve, reject) => { + const c = new WebSocket(`ws://127.0.0.1:${RATE_PORT}`); + c.on('open', () => resolve(c)); + c.on('error', reject); + }); + + try { + const first = await sendAndReceive(ws, { id: 1, method: 'system.health' }); + expect((first as GatewayResponse).id).toBe(1); + + const second = await sendAndReceive(ws, { id: 2, method: 'system.health' }); + expect((second as GatewayResponse).id).toBe(2); + + const third = await sendAndReceive(ws, { id: 3, method: 'system.health' }); + const rateErr = third as GatewayError; + expect(rateErr.error.code).toBe(ErrorCode.InternalError); + expect(rateErr.error.message).toContain('Rate limit exceeded'); + + // Trigger additional violations; server should close on max violation threshold. + ws.send(JSON.stringify({ id: 4, method: 'system.health' })); + ws.send(JSON.stringify({ id: 5, method: 'system.health' })); + + const close = await new Promise<{ code: number; reason: string }>((resolve) => { + ws.on('close', (code, reason) => resolve({ code, reason: reason.toString() })); + }); + expect(close.code).toBe(4008); + expect(close.reason).toContain('Rate limit exceeded'); + } finally { + if (ws.readyState === WebSocket.OPEN) { + ws.close(); + } + } + }); +}); diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 66dff05..f440c1b 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -70,6 +70,14 @@ export interface GatewayServerConfig { getTokenUsage?: () => TokenUsageEntry[]; /** Maximum allowed request body size for inbound HTTP POST bodies. */ maxRequestBodyBytes?: number; + /** Per-connection WebSocket ingress rate limiting. */ + wsRateLimit?: { + enabled?: boolean; + capacity?: number; + refillPerSec?: number; + maxViolations?: number; + violationWindowMs?: number; + }; /** Optional pairing manager for DM pairing code management via gateway. */ pairingManager?: PairingManager; memoryStore?: MemoryStore; @@ -80,6 +88,13 @@ export interface GatewayServerConfig { export class GatewayServer { private static readonly DEFAULT_MAX_REQUEST_BODY_BYTES = 1_048_576; // 1 MiB + private static readonly DEFAULT_WS_RATE_LIMIT = { + enabled: true, + capacity: 30, + refillPerSec: 15, + maxViolations: 8, + violationWindowMs: 10_000, + } as const; private wss: WebSocketServer | null = null; private httpServer: HttpServer | null = null; private router: Router; @@ -87,6 +102,12 @@ export class GatewayServer { private laneQueue: LaneQueue; private metrics: MetricsCollector; private connectionMap: Map = new Map(); + private connectionRateMap: Map = new Map(); private config: GatewayServerConfig; private startTime: number = Date.now(); @@ -291,8 +312,22 @@ export class GatewayServer { const connectionId = randomUUID(); this.sessionBridge.connect(connectionId); this.connectionMap.set(ws, connectionId); + this.connectionRateMap.set(connectionId, { + tokens: this.getWsRateConfig().capacity, + lastRefillMs: Date.now(), + violations: 0, + windowStartMs: Date.now(), + }); ws.on('message', async (data) => { + const limit = this.consumeConnectionRateToken(connectionId); + if (!limit.allowed) { + this.send(ws, makeError(0, ErrorCode.InternalError, `Rate limit exceeded. Retry in ${limit.retryMs}ms.`)); + if (limit.close) { + ws.close(4008, 'Rate limit exceeded'); + } + return; + } const raw = data.toString(); await this.handleMessage(ws, connectionId, raw); }); @@ -300,6 +335,7 @@ export class GatewayServer { ws.on('close', () => { this.sessionBridge.disconnect(connectionId); this.connectionMap.delete(ws); + this.connectionRateMap.delete(connectionId); }); ws.on('error', (err) => { @@ -307,6 +343,55 @@ export class GatewayServer { }); } + private getWsRateConfig(): Required> { + const raw = this.config.wsRateLimit ?? {}; + return { + enabled: raw.enabled ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.enabled, + capacity: raw.capacity ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.capacity, + refillPerSec: raw.refillPerSec ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.refillPerSec, + maxViolations: raw.maxViolations ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.maxViolations, + violationWindowMs: raw.violationWindowMs ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.violationWindowMs, + }; + } + + private consumeConnectionRateToken(connectionId: string): { allowed: boolean; close: boolean; retryMs: number } { + const cfg = this.getWsRateConfig(); + if (!cfg.enabled) { + return { allowed: true, close: false, retryMs: 0 }; + } + + const now = Date.now(); + const state = this.connectionRateMap.get(connectionId); + if (!state) { + return { allowed: true, close: false, retryMs: 0 }; + } + + const elapsedMs = Math.max(0, now - state.lastRefillMs); + state.tokens = Math.min(cfg.capacity, state.tokens + (elapsedMs / 1000) * cfg.refillPerSec); + state.lastRefillMs = now; + + if (state.tokens >= 1) { + state.tokens -= 1; + this.connectionRateMap.set(connectionId, state); + return { allowed: true, close: false, retryMs: 0 }; + } + + if (now - state.windowStartMs > cfg.violationWindowMs) { + state.windowStartMs = now; + state.violations = 0; + } + state.violations += 1; + this.connectionRateMap.set(connectionId, state); + + const deficit = 1 - state.tokens; + const retryMs = Math.max(1, Math.ceil((deficit / cfg.refillPerSec) * 1000)); + return { + allowed: false, + close: state.violations >= cfg.maxViolations, + retryMs, + }; + } + /** * Handle incoming HTTP requests. * Optionally applies auth (when authHttp is enabled and a token is configured).