feat(gateway): add websocket ingress rate limiting

This commit is contained in:
William Valentin
2026-02-15 21:56:13 -08:00
parent 948d589ac3
commit 63d645bd87
10 changed files with 249 additions and 0 deletions
+28
View File
@@ -46,6 +46,34 @@ describe('configSchema — server', () => {
});
expect(result.server.max_request_body_bytes).toBe(2048);
});
it('defaults ws_rate_limit settings', () => {
const result = configSchema.parse(minimalConfig);
expect(result.server.ws_rate_limit.enabled).toBe(true);
expect(result.server.ws_rate_limit.capacity).toBe(30);
expect(result.server.ws_rate_limit.refill_per_sec).toBe(15);
expect(result.server.ws_rate_limit.max_violations).toBe(8);
expect(result.server.ws_rate_limit.violation_window_ms).toBe(10_000);
});
it('accepts custom ws_rate_limit settings', () => {
const result = configSchema.parse({
...minimalConfig,
server: {
ws_rate_limit: {
enabled: true,
capacity: 5,
refill_per_sec: 2,
max_violations: 3,
violation_window_ms: 2000,
},
},
});
expect(result.server.ws_rate_limit.capacity).toBe(5);
expect(result.server.ws_rate_limit.refill_per_sec).toBe(2);
expect(result.server.ws_rate_limit.max_violations).toBe(3);
expect(result.server.ws_rate_limit.violation_window_ms).toBe(2000);
});
});
describe('configSchema — agent_configs', () => {
+10
View File
@@ -24,6 +24,14 @@ const pairingSchema = z.object({
code_length: z.number().default(6),
}).default({});
const wsRateLimitSchema = z.object({
enabled: z.boolean().default(true),
capacity: z.number().min(1).max(1000).default(30),
refill_per_sec: z.number().min(1).max(1000).default(15),
max_violations: z.number().min(1).max(100).default(8),
violation_window_ms: z.number().min(1000).max(60000).default(10000),
}).default({});
const serverSchema = z.object({
tailscale: tailscaleSchema,
localhost: z.boolean().default(true),
@@ -38,6 +46,8 @@ const serverSchema = z.object({
lock: z.boolean().default(false),
/** Maximum size (bytes) for inbound HTTP request bodies (webhooks/Gmail push). */
max_request_body_bytes: z.number().min(1024).max(10 * 1024 * 1024).default(1_048_576),
/** Per-connection WebSocket ingress rate limit settings. */
ws_rate_limit: wsRateLimitSchema,
});
/** All supported model provider identifiers. Used by the config schema and TUI autocompletion. */
+7
View File
@@ -312,6 +312,13 @@ export function createGateway(deps: GatewayDeps): GatewayServer {
authHttp: config.server.auth_http,
lock: config.server.lock,
maxRequestBodyBytes: config.server.max_request_body_bytes,
wsRateLimit: {
enabled: config.server.ws_rate_limit.enabled,
capacity: config.server.ws_rate_limit.capacity,
refillPerSec: config.server.ws_rate_limit.refill_per_sec,
maxViolations: config.server.ws_rate_limit.max_violations,
violationWindowMs: config.server.ws_rate_limit.violation_window_ms,
},
commandRegistry: deps.commandRegistry,
intentRegistry: deps.intentRegistry,
routingPolicy: deps.routingPolicy,
+74
View File
@@ -503,3 +503,77 @@ describe('GatewayServer request body limits', () => {
expect(gmailHandler.handlePushNotification).not.toHaveBeenCalled();
});
});
describe('GatewayServer WebSocket ingress rate limiting', () => {
const RATE_PORT = 18895;
let rateServer: GatewayServer;
beforeAll(async () => {
if (!LISTEN_ALLOWED) {
return;
}
rateServer = new GatewayServer({
port: RATE_PORT,
sessionManager: mockSessionManager as unknown as GatewayServerConfig['sessionManager'],
modelClient: mockModelClient,
systemPrompt: 'Test prompt',
toolRegistry: mockToolRegistry as unknown as GatewayServerConfig['toolRegistry'],
toolExecutor: mockToolExecutor as unknown as GatewayServerConfig['toolExecutor'],
uiDir: resolve(import.meta.dirname, 'ui'),
wsRateLimit: {
enabled: true,
capacity: 2,
refillPerSec: 1,
maxViolations: 3,
violationWindowMs: 10_000,
},
});
await rateServer.start();
});
afterAll(async () => {
if (!LISTEN_ALLOWED) {
return;
}
await rateServer.stop();
});
it('throttles bursts and closes repeated offenders', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const ws = await new Promise<WebSocket>((resolve, reject) => {
const c = new WebSocket(`ws://127.0.0.1:${RATE_PORT}`);
c.on('open', () => resolve(c));
c.on('error', reject);
});
try {
const first = await sendAndReceive(ws, { id: 1, method: 'system.health' });
expect((first as GatewayResponse).id).toBe(1);
const second = await sendAndReceive(ws, { id: 2, method: 'system.health' });
expect((second as GatewayResponse).id).toBe(2);
const third = await sendAndReceive(ws, { id: 3, method: 'system.health' });
const rateErr = third as GatewayError;
expect(rateErr.error.code).toBe(ErrorCode.InternalError);
expect(rateErr.error.message).toContain('Rate limit exceeded');
// Trigger additional violations; server should close on max violation threshold.
ws.send(JSON.stringify({ id: 4, method: 'system.health' }));
ws.send(JSON.stringify({ id: 5, method: 'system.health' }));
const close = await new Promise<{ code: number; reason: string }>((resolve) => {
ws.on('close', (code, reason) => resolve({ code, reason: reason.toString() }));
});
expect(close.code).toBe(4008);
expect(close.reason).toContain('Rate limit exceeded');
} finally {
if (ws.readyState === WebSocket.OPEN) {
ws.close();
}
}
});
});
+85
View File
@@ -70,6 +70,14 @@ export interface GatewayServerConfig {
getTokenUsage?: () => TokenUsageEntry[];
/** Maximum allowed request body size for inbound HTTP POST bodies. */
maxRequestBodyBytes?: number;
/** Per-connection WebSocket ingress rate limiting. */
wsRateLimit?: {
enabled?: boolean;
capacity?: number;
refillPerSec?: number;
maxViolations?: number;
violationWindowMs?: number;
};
/** Optional pairing manager for DM pairing code management via gateway. */
pairingManager?: PairingManager;
memoryStore?: MemoryStore;
@@ -80,6 +88,13 @@ export interface GatewayServerConfig {
export class GatewayServer {
private static readonly DEFAULT_MAX_REQUEST_BODY_BYTES = 1_048_576; // 1 MiB
private static readonly DEFAULT_WS_RATE_LIMIT = {
enabled: true,
capacity: 30,
refillPerSec: 15,
maxViolations: 8,
violationWindowMs: 10_000,
} as const;
private wss: WebSocketServer | null = null;
private httpServer: HttpServer | null = null;
private router: Router;
@@ -87,6 +102,12 @@ export class GatewayServer {
private laneQueue: LaneQueue;
private metrics: MetricsCollector;
private connectionMap: Map<WebSocket, string> = new Map();
private connectionRateMap: Map<string, {
tokens: number;
lastRefillMs: number;
violations: number;
windowStartMs: number;
}> = new Map();
private config: GatewayServerConfig;
private startTime: number = Date.now();
@@ -291,8 +312,22 @@ export class GatewayServer {
const connectionId = randomUUID();
this.sessionBridge.connect(connectionId);
this.connectionMap.set(ws, connectionId);
this.connectionRateMap.set(connectionId, {
tokens: this.getWsRateConfig().capacity,
lastRefillMs: Date.now(),
violations: 0,
windowStartMs: Date.now(),
});
ws.on('message', async (data) => {
const limit = this.consumeConnectionRateToken(connectionId);
if (!limit.allowed) {
this.send(ws, makeError(0, ErrorCode.InternalError, `Rate limit exceeded. Retry in ${limit.retryMs}ms.`));
if (limit.close) {
ws.close(4008, 'Rate limit exceeded');
}
return;
}
const raw = data.toString();
await this.handleMessage(ws, connectionId, raw);
});
@@ -300,6 +335,7 @@ export class GatewayServer {
ws.on('close', () => {
this.sessionBridge.disconnect(connectionId);
this.connectionMap.delete(ws);
this.connectionRateMap.delete(connectionId);
});
ws.on('error', (err) => {
@@ -307,6 +343,55 @@ export class GatewayServer {
});
}
private getWsRateConfig(): Required<NonNullable<GatewayServerConfig['wsRateLimit']>> {
const raw = this.config.wsRateLimit ?? {};
return {
enabled: raw.enabled ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.enabled,
capacity: raw.capacity ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.capacity,
refillPerSec: raw.refillPerSec ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.refillPerSec,
maxViolations: raw.maxViolations ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.maxViolations,
violationWindowMs: raw.violationWindowMs ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.violationWindowMs,
};
}
private consumeConnectionRateToken(connectionId: string): { allowed: boolean; close: boolean; retryMs: number } {
const cfg = this.getWsRateConfig();
if (!cfg.enabled) {
return { allowed: true, close: false, retryMs: 0 };
}
const now = Date.now();
const state = this.connectionRateMap.get(connectionId);
if (!state) {
return { allowed: true, close: false, retryMs: 0 };
}
const elapsedMs = Math.max(0, now - state.lastRefillMs);
state.tokens = Math.min(cfg.capacity, state.tokens + (elapsedMs / 1000) * cfg.refillPerSec);
state.lastRefillMs = now;
if (state.tokens >= 1) {
state.tokens -= 1;
this.connectionRateMap.set(connectionId, state);
return { allowed: true, close: false, retryMs: 0 };
}
if (now - state.windowStartMs > cfg.violationWindowMs) {
state.windowStartMs = now;
state.violations = 0;
}
state.violations += 1;
this.connectionRateMap.set(connectionId, state);
const deficit = 1 - state.tokens;
const retryMs = Math.max(1, Math.ceil((deficit / cfg.refillPerSec) * 1000));
return {
allowed: false,
close: state.violations >= cfg.maxViolations,
retryMs,
};
}
/**
* Handle incoming HTTP requests.
* Optionally applies auth (when authHttp is enabled and a token is configured).