import { WebSocketServer, WebSocket } from 'ws'; import { randomUUID } from 'crypto'; import { createServer, type Server as HttpServer, type IncomingMessage, type ServerResponse } from 'http'; import { Router } from './router.js'; import { serveStatic } from './static.js'; import { SessionBridge } from './session-bridge.js'; import type { SessionBridgeConfig } from './session-bridge.js'; import { LaneQueue } from './lane-queue.js'; import type { LaneQueueConfig } from './lane-queue.js'; import { CanvasStore } from './canvas-store.js'; import { MetricsCollector } from './metrics.js'; import { authenticateRequest, authorizeNodeMethod } from './auth.js'; import type { AuthConfig } from './auth.js'; import { startGatewayDiscovery, type GatewayDiscoveryHandle } from './discovery.js'; import { parseMessage, makeError, makeResponse, ErrorCode, type OutboundMessage, } from './protocol.js'; import { createSystemHandlers, createSessionHandlers, createToolHandlers, createAgentHandlers, createConfigHandlers, createPairingHandlers, createIntentHandlers, createRoutingHandlers, createHistoryHandlers, createCanvasHandlers, createNodeHandlers, } from './handlers/index.js'; import { discoverServices } from './handlers/services.js'; import type { TokenUsageEntry, ContextUsageEntry } from './handlers/system.js'; import type { NodeConnectionState } from './handlers/node.js'; import type { SessionManager } from '../session/manager.js'; import type { Config } from '../config/index.js'; import type { ToolRegistry } from '../tools/registry.js'; import type { ToolExecutor } from '../tools/executor.js'; import type { WebhookHandler } from '../automation/webhooks.js'; import type { GmailWatcher } from '../automation/gmail.js'; import type { PairingManager } from '../channels/pairing.js'; import type { MemoryStore } from '../memory/store.js'; import type { CommandRegistry } from '../commands/index.js'; import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; import type { ChannelRegistry } from '../channels/index.js'; import { RequestBodyTooLargeError, readRequestBody } from '../utils/httpBody.js'; import type { TeamsAdapter } from '../channels/teams/adapter.js'; import type { GoogleChatAdapter } from '../channels/googleChat/adapter.js'; import type { BlueBubblesAdapter } from '../channels/bluebubbles/adapter.js'; import type { LineAdapter } from '../channels/line/adapter.js'; import type { FeishuAdapter } from '../channels/feishu/adapter.js'; import type { ZaloAdapter } from '../channels/zalo/adapter.js'; export interface GatewayServerConfig { port: number; host?: string; sessionManager: SessionManager; modelClient: SessionBridgeConfig['modelClient']; systemPrompt: string; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; version?: string; auth?: AuthConfig; /** When true, only one WebSocket client can be connected at a time. */ lock?: boolean; /** Whether to apply token auth to HTTP requests too (default: true when token is set). */ authHttp?: boolean; uiDir?: string; config?: Config; /** Optional persistence callback for config.patch updates. */ persistConfig?: (nextConfig: Config) => Promise | void; /** Optional callback for system.restart. Should trigger graceful shutdown + process restart. */ restart?: () => Promise; channelRegistry?: ChannelRegistry; /** Optional webhook handler for inbound webhook HTTP routes. */ webhookHandler?: WebhookHandler; /** Optional Gmail handler for Pub/Sub push notifications. */ gmailHandler?: GmailWatcher; /** Optional callback to retrieve per-session token usage data for the dashboard. */ getTokenUsage?: () => TokenUsageEntry[]; /** Optional callback to retrieve per-session context usage data for the dashboard. */ getContextUsage?: () => ContextUsageEntry[]; /** Maximum allowed request body size for inbound HTTP POST bodies. */ maxRequestBodyBytes?: number; /** Per-connection WebSocket ingress rate limiting. */ wsRateLimit?: { enabled?: boolean; capacity?: number; refillPerSec?: number; maxViolations?: number; violationWindowMs?: number; }; queue?: Partial & { overrides?: { channels?: Record>; sessions?: Record>; }; }; nodes?: { enabled: boolean; allowedRoles: string[]; featureGates: Record; locationEnabled?: boolean; pushEnabled?: boolean; }; /** Optional pairing manager for DM pairing code management via gateway. */ pairingManager?: PairingManager; memoryStore?: MemoryStore; commandRegistry?: CommandRegistry; intentRegistry?: ComponentRegistry; routingPolicy?: RoutingPolicy; discovery?: { enabled: boolean; serviceName: string; serviceType: string; txtRecord?: Record; }; /** Optional Teams adapter for inbound Bot Framework activity webhooks. */ teamsHandler?: Pick; /** Optional Google Chat adapter for inbound Chat event webhooks. */ googleChatHandler?: Pick; /** Optional BlueBubbles adapter for inbound iMessage event webhooks. */ blueBubblesHandler?: Pick; /** Optional LINE adapter for inbound webhook events. */ lineHandler?: Pick; /** Optional Feishu adapter for inbound webhook events. */ feishuHandler?: Pick; /** Optional Zalo adapter for inbound webhook events. */ zaloHandler?: Pick; /** Optional WebChat PWA push-subscription settings. */ webchatPush?: { enabled?: boolean; vapidPublicKey?: string; maxSubscriptions?: number; }; } interface WebchatPushSubscriptionRecord { endpoint: string; keys: { p256dh: string; auth: string; }; userAgent?: string; createdAt: number; updatedAt: number; } export class GatewayServer { private static readonly DEFAULT_MAX_REQUEST_BODY_BYTES = 1_048_576; // 1 MiB private static readonly DEFAULT_WS_RATE_LIMIT = { enabled: true, capacity: 30, refillPerSec: 15, maxViolations: 8, violationWindowMs: 10_000, } as const; private wss: WebSocketServer | null = null; private httpServer: HttpServer | null = null; private router: Router; private sessionBridge: SessionBridge; private laneQueue: LaneQueue; private canvasStore: CanvasStore; private metrics: MetricsCollector; private discoveryHandle: GatewayDiscoveryHandle | null = null; private connectionMap: Map = new Map(); private connectionRateMap: Map = new Map(); private connectionStateMap: Map = new Map(); private webchatPushSubscriptions: Map = new Map(); private config: GatewayServerConfig; private startTime: number = Date.now(); constructor(config: GatewayServerConfig) { this.config = config; this.sessionBridge = new SessionBridge({ sessionManager: config.sessionManager, modelClient: config.modelClient, systemPrompt: config.systemPrompt, toolRegistry: config.toolRegistry, toolExecutor: config.toolExecutor, config: config.config, memoryStore: config.memoryStore, }); this.laneQueue = new LaneQueue(config.queue); this.canvasStore = new CanvasStore(); this.metrics = new MetricsCollector({ getQueueDepth: () => this.laneQueue.totalPending(), }); this.router = new Router(); this.registerHandlers(); } private registerHandlers(): void { const channelRegistry = this.config.channelRegistry; const runtimeConfig = this.config.config; const systemHandlers = createSystemHandlers({ startTime: this.startTime, version: this.config.version ?? '0.1.0', getSessionCount: () => this.sessionBridge.listSessions().length, getToolCount: () => this.config.toolRegistry.list().length, getConnectionCount: () => this.sessionBridge.connectionCount, getSessionAnalytics: ({ days, topLimit } = {}) => this.config.sessionManager.getSessionAnalytics({ days, topLimit }), restart: this.config.restart, getChannels: channelRegistry ? () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })) : undefined, getServices: runtimeConfig && channelRegistry ? () => discoverServices(runtimeConfig, channelRegistry) : undefined, getPresence: channelRegistry ? (opts) => channelRegistry.getPresence(opts) : undefined, getNodeLocations: ({ role, nodeId, limit } = {}) => { const entries: Array<{ nodeId: string; role: string; connectionId: string; location: NonNullable; }> = []; for (const [connectionId, state] of this.connectionStateMap.entries()) { if (!state.node || !state.location) { continue; } if (role && state.node.role !== role) { continue; } if (nodeId && state.node.nodeId !== nodeId) { continue; } entries.push({ nodeId: state.node.nodeId, role: state.node.role, connectionId, location: state.location, }); } const sorted = entries.sort((a, b) => b.location.receivedAt - a.location.receivedAt); if (typeof limit === 'number' && Number.isFinite(limit) && limit > 0) { return sorted.slice(0, Math.floor(limit)); } return sorted; }, getNodes: ({ role, platform, limit } = {}) => { const entries: Array<{ connectionId: string; nodeId: string; role: string; identity?: string; protocolVersion: number; capabilities: string[]; registeredAt: number; location?: NodeConnectionState['location']; status?: NodeConnectionState['status']; push?: { provider: NonNullable['provider']; tokenPreview: string; topic?: string; environment?: NonNullable['environment']; registeredAt: number; }; }> = []; for (const [connectionId, state] of this.connectionStateMap.entries()) { if (!state.node) { continue; } if (role && state.node.role !== role) { continue; } if (platform && state.status?.platform !== platform) { continue; } entries.push({ connectionId, nodeId: state.node.nodeId, role: state.node.role, identity: state.identity, protocolVersion: state.node.protocolVersion, capabilities: state.node.capabilities, registeredAt: state.node.registeredAt, location: state.location, status: state.status, push: state.pushToken ? { provider: state.pushToken.provider, tokenPreview: maskToken(state.pushToken.token), topic: state.pushToken.topic, environment: state.pushToken.environment, registeredAt: state.pushToken.registeredAt, } : undefined, }); } const sorted = entries.sort((a, b) => b.registeredAt - a.registeredAt); if (typeof limit === 'number' && Number.isFinite(limit) && limit > 0) { return sorted.slice(0, Math.floor(limit)); } return sorted; }, getUsage: () => ({ totalSessions: this.config.sessionManager.listSessions().length, activeConnections: this.sessionBridge.connectionCount, }), getTokenUsage: this.config.getTokenUsage, getContextUsage: this.config.getContextUsage, getMetrics: () => this.metrics.getSnapshot(), getEvents: (opts) => this.metrics.getEvents(opts), getActiveRequests: () => this.metrics.getActiveRequests(), }); const sessionHandlers = createSessionHandlers({ sessionManager: this.config.sessionManager, sessionBridge: this.sessionBridge, }); const historyHandlers = createHistoryHandlers({ sessionManager: this.config.sessionManager, }); const canvasHandlers = createCanvasHandlers({ store: this.canvasStore, }); const toolHandlers = createToolHandlers({ toolRegistry: this.config.toolRegistry, toolExecutor: this.config.toolExecutor, }); const agentHandlers = createAgentHandlers({ sessionBridge: this.sessionBridge, laneQueue: this.laneQueue, resolveQueuePolicy: ({ sessionId, channel }) => { const resolved: Partial = {}; const channelPolicy = this.config.queue?.overrides?.channels?.[channel]; if (channelPolicy) { Object.assign(resolved, channelPolicy); } const configuredSessionPolicy = sessionId ? this.config.queue?.overrides?.sessions?.[sessionId] : undefined; if (configuredSessionPolicy) { Object.assign(resolved, configuredSessionPolicy); } if (sessionId) { const runtimeMode = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode'); const runtimeCap = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.cap'); const runtimeOverflow = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.overflow'); const runtimeDebounce = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.debounce_ms'); const runtimeSummarize = this.config.sessionManager.getSessionConfig('ws', sessionId, 'queue.summarize_overflow'); if (runtimeMode && ['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(runtimeMode)) { resolved.mode = runtimeMode as LaneQueueConfig['mode']; } if (runtimeCap) { const cap = Number.parseInt(runtimeCap, 10); if (Number.isFinite(cap) && cap >= 1 && cap <= 1000) { resolved.cap = cap; } } if (runtimeOverflow && (runtimeOverflow === 'drop_old' || runtimeOverflow === 'drop_new')) { resolved.overflow = runtimeOverflow; } if (runtimeDebounce) { const debounceMs = Number.parseInt(runtimeDebounce, 10); if (Number.isFinite(debounceMs) && debounceMs >= 0 && debounceMs <= 60_000) { resolved.debounceMs = debounceMs; } } if (runtimeSummarize === 'true' || runtimeSummarize === 'false') { resolved.summarizeOverflow = runtimeSummarize === 'true'; } } return resolved; }, metrics: this.metrics, sessionManager: this.config.sessionManager, commandRegistry: this.config.commandRegistry, }); const intentHandlers = createIntentHandlers({ intentRegistry: this.config.intentRegistry, enabled: this.config.config?.intents.enabled ?? false, }); const routingHandlers = createRoutingHandlers({ intentRegistry: this.config.intentRegistry, routingPolicy: this.config.routingPolicy, }); const nodeHandlers = createNodeHandlers({ enabled: this.config.nodes?.enabled ?? false, locationEnabled: this.config.nodes?.locationEnabled ?? false, pushEnabled: this.config.nodes?.pushEnabled ?? false, allowedRoles: this.config.nodes?.allowedRoles ?? [], featureGates: this.config.nodes?.featureGates ?? {}, getConnectionState: (connectionId) => this.connectionStateMap.get(connectionId), setNodeRegistration: (connectionId, registration) => { const existing = this.connectionStateMap.get(connectionId); if (!existing) { return; } this.connectionStateMap.set(connectionId, { ...existing, node: registration, }); }, setNodeLocation: (connectionId, location) => { const existing = this.connectionStateMap.get(connectionId); if (!existing) { return; } this.connectionStateMap.set(connectionId, { ...existing, location, }); }, setNodeStatus: (connectionId, status) => { const existing = this.connectionStateMap.get(connectionId); if (!existing) { return; } this.connectionStateMap.set(connectionId, { ...existing, status, }); }, setNodePushToken: (connectionId, pushToken) => { const existing = this.connectionStateMap.get(connectionId); if (!existing) { return; } const provider = pushToken.provider === 'fcm' ? 'fcm' : 'apns'; const normalizedPushToken: NonNullable = { provider, token: pushToken.token, topic: pushToken.topic, environment: provider === 'apns' ? pushToken.environment : undefined, registeredAt: pushToken.registeredAt, }; this.connectionStateMap.set(connectionId, { ...existing, pushToken: normalizedPushToken, }); }, }); // Config handlers (only if config object is provided) if (this.config.config) { const configHandlers = createConfigHandlers({ config: this.config.config, persistConfig: this.config.persistConfig, }); for (const [method, handler] of Object.entries(configHandlers)) { this.router.register(method, handler); } } // Pairing handlers (only if pairing manager is provided) if (this.config.pairingManager) { const pairingHandlers = createPairingHandlers({ pairingManager: this.config.pairingManager }); for (const [method, handler] of Object.entries(pairingHandlers)) { this.router.register(method, handler); } } // Register all methods for (const [method, handler] of Object.entries(systemHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(sessionHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(historyHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(canvasHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(toolHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(agentHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(intentHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(routingHandlers)) { this.router.register(method, handler); } for (const [method, handler] of Object.entries(nodeHandlers)) { this.router.register(method, handler); } } async start(): Promise { const host = this.config.host ?? '127.0.0.1'; const { port } = this.config; return new Promise((resolve) => { // Create HTTP server first — handles static file requests this.httpServer = createServer((req: IncomingMessage, res: ServerResponse) => { this.handleHttpRequest(req, res); }); // Attach WebSocket server to the shared HTTP server (no separate port) this.wss = new WebSocketServer({ server: this.httpServer }); this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { // Auth check on upgrade — only WS connections require auth const authResult = authenticateRequest(req, this.config.auth ?? {}); if (!authResult.authenticated) { ws.close(4001, authResult.error ?? 'Authentication failed'); return; } this.handleConnection(ws, authResult.identity); }); // Register system.lock handler (needs access to connectionMap) this.router.register('system.lock', async (request) => { return makeResponse(request.id, { locked: this.config.lock ?? false, activeClients: this.connectionMap.size, maxClients: this.config.lock ? 1 : null, }); }); this.httpServer.listen(port, host, () => { console.log(`Gateway server listening on ${host}:${port}`); void this.startDiscovery(host, port).finally(() => { resolve(); }); }); }); } async stop(): Promise { if (this.discoveryHandle) { try { await this.discoveryHandle.stop(); } catch (err) { console.error('Failed to stop mDNS discovery:', err instanceof Error ? err.message : err); } finally { this.discoveryHandle = null; } } // Close all WebSocket connections first. // Await disconnects so end-of-session summary/memory writes complete before process exit. const disconnects: Array> = []; for (const [ws, connectionId] of this.connectionMap) { disconnects.push(this.sessionBridge.disconnect(connectionId)); ws.close(1001, 'Server shutting down'); } await Promise.allSettled(disconnects); this.connectionMap.clear(); this.connectionStateMap.clear(); // Close WSS first, then the underlying HTTP server await new Promise((resolve) => { if (!this.wss) { resolve(); return; } this.wss.close(() => { this.wss = null; resolve(); }); }); await new Promise((resolve) => { if (!this.httpServer) { resolve(); return; } this.httpServer.close(() => { this.httpServer = null; resolve(); }); }); } private handleConnection(ws: WebSocket, identity?: string): void { // Gateway lock — reject if another client is already connected if (this.config.lock && this.connectionMap.size > 0) { ws.close(4003, 'Gateway locked — another client is already connected'); return; } const connectionId = randomUUID(); this.sessionBridge.connect(connectionId); this.connectionMap.set(ws, connectionId); this.connectionStateMap.set(connectionId, { identity }); this.connectionRateMap.set(connectionId, { tokens: this.getWsRateConfig().capacity, lastRefillMs: Date.now(), violations: 0, windowStartMs: Date.now(), }); ws.on('message', async (data) => { const limit = this.consumeConnectionRateToken(connectionId); if (!limit.allowed) { this.send(ws, makeError(0, ErrorCode.InternalError, `Rate limit exceeded. Retry in ${limit.retryMs}ms.`)); if (limit.close) { ws.close(4008, 'Rate limit exceeded'); } return; } const raw = data.toString(); await this.handleMessage(ws, connectionId, raw); }); ws.on('close', () => { void this.sessionBridge.disconnect(connectionId).catch((error) => { console.warn('Session disconnect failed:', error); }); this.connectionMap.delete(ws); this.connectionRateMap.delete(connectionId); this.connectionStateMap.delete(connectionId); }); ws.on('error', (err) => { console.error(`WebSocket error (${connectionId}):`, err.message); }); } private getWsRateConfig(): Required> { const raw = this.config.wsRateLimit ?? {}; return { enabled: raw.enabled ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.enabled, capacity: raw.capacity ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.capacity, refillPerSec: raw.refillPerSec ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.refillPerSec, maxViolations: raw.maxViolations ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.maxViolations, violationWindowMs: raw.violationWindowMs ?? GatewayServer.DEFAULT_WS_RATE_LIMIT.violationWindowMs, }; } private consumeConnectionRateToken(connectionId: string): { allowed: boolean; close: boolean; retryMs: number } { const cfg = this.getWsRateConfig(); if (!cfg.enabled) { return { allowed: true, close: false, retryMs: 0 }; } const now = Date.now(); const state = this.connectionRateMap.get(connectionId); if (!state) { return { allowed: true, close: false, retryMs: 0 }; } const elapsedMs = Math.max(0, now - state.lastRefillMs); state.tokens = Math.min(cfg.capacity, state.tokens + (elapsedMs / 1000) * cfg.refillPerSec); state.lastRefillMs = now; if (state.tokens >= 1) { state.tokens -= 1; this.connectionRateMap.set(connectionId, state); return { allowed: true, close: false, retryMs: 0 }; } if (now - state.windowStartMs > cfg.violationWindowMs) { state.windowStartMs = now; state.violations = 0; } state.violations += 1; this.connectionRateMap.set(connectionId, state); const deficit = 1 - state.tokens; const retryMs = Math.max(1, Math.ceil((deficit / cfg.refillPerSec) * 1000)); return { allowed: false, close: state.violations >= cfg.maxViolations, retryMs, }; } private getWebchatPushConfig(): { enabled: boolean; vapidPublicKey?: string; maxSubscriptions: number } { const runtimeConfig = this.config.config?.server.webchat_push; const override = this.config.webchatPush; const enabled = override?.enabled ?? runtimeConfig?.enabled ?? false; const vapidPublicKey = override?.vapidPublicKey ?? runtimeConfig?.vapid_public_key; const maxSubscriptions = override?.maxSubscriptions ?? runtimeConfig?.max_subscriptions ?? 5000; return { enabled, vapidPublicKey, maxSubscriptions }; } private async handleWebchatPushRequest(req: IncomingMessage, res: ServerResponse): Promise { if (!req.url?.startsWith('/webchat/push')) { return false; } const parsed = new URL(req.url, `http://${req.headers.host ?? 'localhost'}`); const pathname = parsed.pathname; const cfg = this.getWebchatPushConfig(); if (pathname === '/webchat/push/public-key' && req.method === 'GET') { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ enabled: cfg.enabled, vapidPublicKey: cfg.vapidPublicKey ?? null, })); return true; } if (pathname === '/webchat/push/subscriptions' && req.method === 'GET') { res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ enabled: cfg.enabled, count: this.webchatPushSubscriptions.size, maxSubscriptions: cfg.maxSubscriptions, })); return true; } if (pathname === '/webchat/push/subscriptions' && req.method === 'POST') { if (!cfg.enabled) { res.writeHead(409, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'WebChat push is disabled' })); return true; } let rawBody: string; try { rawBody = await this.readRequestBody(req); } catch (err) { if (err instanceof RequestBodyTooLargeError) { res.writeHead(413, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Payload too large' })); return true; } res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid request body' })); return true; } let parsedBody: unknown; try { parsedBody = JSON.parse(rawBody); } catch { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid JSON' })); return true; } const body = parsedBody as { endpoint?: unknown; keys?: { p256dh?: unknown; auth?: unknown }; userAgent?: unknown; }; const endpoint = typeof body.endpoint === 'string' ? body.endpoint.trim() : ''; const p256dh = typeof body.keys?.p256dh === 'string' ? body.keys.p256dh.trim() : ''; const auth = typeof body.keys?.auth === 'string' ? body.keys.auth.trim() : ''; const userAgent = typeof body.userAgent === 'string' ? body.userAgent.trim() : undefined; if (!endpoint || !p256dh || !auth) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Missing subscription endpoint or keys' })); return true; } if (!this.webchatPushSubscriptions.has(endpoint) && this.webchatPushSubscriptions.size >= cfg.maxSubscriptions) { res.writeHead(429, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: `Subscription cap reached (${cfg.maxSubscriptions})` })); return true; } const now = Date.now(); const previous = this.webchatPushSubscriptions.get(endpoint); this.webchatPushSubscriptions.set(endpoint, { endpoint, keys: { p256dh, auth }, userAgent, createdAt: previous?.createdAt ?? now, updatedAt: now, }); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ stored: true, count: this.webchatPushSubscriptions.size, })); return true; } if (pathname === '/webchat/push/subscriptions' && req.method === 'DELETE') { let rawBody = ''; try { rawBody = await this.readRequestBody(req); } catch (err) { if (err instanceof RequestBodyTooLargeError) { res.writeHead(413, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Payload too large' })); return true; } res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid request body' })); return true; } let endpoint = ''; if (rawBody.trim().length > 0) { try { const body = JSON.parse(rawBody) as { endpoint?: unknown }; endpoint = typeof body.endpoint === 'string' ? body.endpoint.trim() : ''; } catch { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid JSON' })); return true; } } else { endpoint = parsed.searchParams.get('endpoint')?.trim() ?? ''; } if (!endpoint) { res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Missing subscription endpoint' })); return true; } const removed = this.webchatPushSubscriptions.delete(endpoint); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ removed, count: this.webchatPushSubscriptions.size, })); return true; } return false; } /** * Handle incoming HTTP requests. * Optionally applies auth (when authHttp is enabled and a token is configured). * Routes webhook requests before auth; delegates to serveStatic for UI files. */ private async handleHttpRequest(req: IncomingMessage, res: ServerResponse): Promise { // Webhook routes bypass gateway auth (they have their own HMAC auth) if (this.config.webhookHandler && req.method === 'POST' && req.url) { const match = req.url.match(/^\/webhooks\/([^/?]+)/); if (match) { const webhookName = decodeURIComponent(match[1]); await this.config.webhookHandler.handleRequest(webhookName, req, res); return; } } // Health endpoint — unauthenticated for Docker HEALTHCHECK / monitoring if (req.method === 'GET' && req.url === '/health') { const channelList = this.config.channelRegistry?.list().map(a => a.name) ?? []; const body = JSON.stringify({ status: 'ok', uptime: Math.floor((Date.now() - this.startTime) / 1000), version: this.config.version ?? '0.1.0', sessions: this.sessionBridge.listSessions().length, connections: this.sessionBridge.connectionCount, tools: this.config.toolRegistry.list().length, channels: channelList, }); res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(body); return; } // Gmail Pub/Sub push route — bypass gateway auth (Google sends push notifications directly) if (this.config.gmailHandler && req.method === 'POST' && req.url?.startsWith('/gmail/push')) { try { const body = await this.readRequestBody(req); const parsed = JSON.parse(body) as { message?: { data?: string } }; const data = parsed?.message?.data; if (data) { await this.config.gmailHandler.handlePushNotification(data); } res.writeHead(200, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ ok: true })); } catch (err) { if (err instanceof RequestBodyTooLargeError) { res.writeHead(413, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Payload too large' })); } else { console.error('Gmail push handler error:', err instanceof Error ? err.message : err); res.writeHead(400, { 'Content-Type': 'application/json' }); res.end(JSON.stringify({ error: 'Invalid request' })); } } return; } // Teams Bot Framework events route — bypass gateway auth (Bot Framework posts directly) if (this.config.teamsHandler && req.method === 'POST' && req.url?.startsWith('/teams/events')) { await this.config.teamsHandler.handleRequest(req, res); return; } // Google Chat events route — bypass gateway auth (Google Chat posts directly) if (this.config.googleChatHandler && req.method === 'POST' && req.url?.startsWith('/google-chat/events')) { await this.config.googleChatHandler.handleRequest(req, res); return; } // BlueBubbles events route — bypass gateway auth (BlueBubbles webhook posts directly) if (this.config.blueBubblesHandler && req.method === 'POST' && req.url?.startsWith('/bluebubbles/events')) { await this.config.blueBubblesHandler.handleRequest(req, res); return; } // LINE events route — bypass gateway auth (LINE webhook posts directly) if (this.config.lineHandler && req.method === 'POST' && req.url?.startsWith('/line/events')) { await this.config.lineHandler.handleRequest(req, res); return; } // Feishu events route — bypass gateway auth (Feishu webhook posts directly) if (this.config.feishuHandler && req.method === 'POST' && req.url?.startsWith('/feishu/events')) { await this.config.feishuHandler.handleRequest(req, res); return; } // Zalo events route — bypass gateway auth (Zalo webhook posts directly) if (this.config.zaloHandler && req.method === 'POST' && req.url?.startsWith('/zalo/events')) { await this.config.zaloHandler.handleRequest(req, res); return; } // Apply auth to HTTP requests when configured const authConfig = this.config.auth ?? {}; if (this.config.authHttp !== false && authConfig.token) { const authResult = authenticateRequest(req, authConfig); if (!authResult.authenticated) { res.writeHead(401, { 'Content-Type': 'text/plain', 'WWW-Authenticate': 'Bearer', }); res.end(authResult.error ?? 'Unauthorized'); return; } } // WebChat PWA push-subscription endpoints (auth-protected) if (await this.handleWebchatPushRequest(req, res)) { return; } const uiDir = this.config.uiDir; if (uiDir) { const served = await serveStatic(req, res, uiDir); if (served) {return;} } // No UI directory configured, or file not found res.writeHead(404, { 'Content-Type': 'text/plain' }); res.end('Not Found'); } private async handleMessage(ws: WebSocket, connectionId: string, raw: string): Promise { const request = parseMessage(raw); if (!request) { this.send(ws, makeError(0, ErrorCode.ParseError, 'Invalid JSON or missing required fields')); return; } // Inject connectionId into params so handlers can identify the client if (!request.params) {request.params = {};} request.params.connectionId = connectionId; const nodeAuth = authorizeNodeMethod({ enabled: this.config.nodes?.enabled ?? false, method: request.method, nodeRole: this.connectionStateMap.get(connectionId)?.node?.role, allowedRoles: this.config.nodes?.allowedRoles ?? [], roleScopes: { companion: ['node.capabilities.get', 'node.location.set', 'node.location.get', 'node.status.set', 'node.push_token.set'], observer: ['node.capabilities.get', 'node.location.get'], automation: ['node.capabilities.get', 'node.location.get'], }, }); if (!nodeAuth.authenticated) { this.send(ws, makeError(request.id, ErrorCode.AuthFailed, nodeAuth.error ?? 'Node authorization failed')); return; } const send = (msg: OutboundMessage) => this.send(ws, msg); const response = await this.router.dispatch(request, send); if (response) { this.send(ws, response); } } private send(ws: WebSocket, msg: OutboundMessage): void { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify(msg)); } } /** Get the underlying WebSocketServer (for testing). */ getWss(): WebSocketServer | null { return this.wss; } /** Get the underlying HTTP server (for testing). */ getHttpServer(): HttpServer | null { return this.httpServer; } /** Get the session bridge (for testing/debugging). */ getSessionBridge(): SessionBridge { return this.sessionBridge; } /** Get the metrics collector (for external wiring). */ getMetrics(): MetricsCollector { return this.metrics; } /** Get list of registered methods. */ getMethods(): string[] { return this.router.listMethods(); } /** Set the webhook handler for inbound webhook HTTP routes (late binding). */ setWebhookHandler(handler: WebhookHandler): void { this.config.webhookHandler = handler; } /** Set the Gmail handler for Pub/Sub push notifications (late binding). */ setGmailHandler(handler: GmailWatcher): void { this.config.gmailHandler = handler; } /** Set the Teams handler for inbound Bot Framework activity HTTP routes (late binding). */ setTeamsHandler(handler: Pick): void { this.config.teamsHandler = handler; } /** Set the Google Chat handler for inbound event HTTP routes (late binding). */ setGoogleChatHandler(handler: Pick): void { this.config.googleChatHandler = handler; } /** Set the BlueBubbles handler for inbound webhook HTTP routes (late binding). */ setBlueBubblesHandler(handler: Pick): void { this.config.blueBubblesHandler = handler; } /** Set the LINE handler for inbound webhook HTTP routes (late binding). */ setLineHandler(handler: Pick): void { this.config.lineHandler = handler; } /** Set the Feishu handler for inbound webhook HTTP routes (late binding). */ setFeishuHandler(handler: Pick): void { this.config.feishuHandler = handler; } /** Set the Zalo handler for inbound webhook HTTP routes (late binding). */ setZaloHandler(handler: Pick): void { this.config.zaloHandler = handler; } private async startDiscovery(host: string, port: number): Promise { const discovery = this.config.discovery; if (!discovery?.enabled) { return; } if (host === '127.0.0.1' || host === '::1') { console.warn('mDNS discovery is enabled, but server.localhost=true restricts gateway to loopback; skipping advertisement'); return; } try { const txtRecord: Record = { instance: `pid-${process.pid}`, version: this.config.version ?? '0.1.0', ...(discovery.txtRecord ?? {}), }; this.discoveryHandle = await startGatewayDiscovery({ serviceName: discovery.serviceName, serviceType: discovery.serviceType, port, txtRecord, }); console.log(`mDNS discovery enabled: ${discovery.serviceName}.${discovery.serviceType}.local:${port}`); } catch (err) { console.warn(`mDNS discovery failed to start: ${err instanceof Error ? err.message : String(err)}`); } } /** Read the full request body as a string. */ private readRequestBody(req: IncomingMessage): Promise { const maxBytes = this.config.maxRequestBodyBytes ?? GatewayServer.DEFAULT_MAX_REQUEST_BODY_BYTES; return readRequestBody(req, { maxBytes }); } } function maskToken(token: string): string { if (token.length <= 8) { return '****'; } return `***${token.slice(-8)}`; }