Files
flynn/src/gateway/server.ts
T
William Valentin 9f81c01603 feat(session): persist model tier overrides per session
Store per-session config in SQLite and route /model and /reset through command fast-paths so channel sessions keep independent model selection across reconnects and restarts.
2026-02-13 01:04:26 -08:00

447 lines
15 KiB
TypeScript

import { WebSocketServer, WebSocket } from 'ws';
import { randomUUID } from 'crypto';
import { createServer, type Server as HttpServer, type IncomingMessage, type ServerResponse } from 'http';
import { Router } from './router.js';
import { serveStatic } from './static.js';
import { SessionBridge } from './session-bridge.js';
import type { SessionBridgeConfig } from './session-bridge.js';
import { LaneQueue } from './lane-queue.js';
import { MetricsCollector } from './metrics.js';
import { authenticateRequest } from './auth.js';
import type { AuthConfig } from './auth.js';
import {
parseMessage,
makeError,
makeResponse,
ErrorCode,
type OutboundMessage,
} from './protocol.js';
import {
createSystemHandlers,
createSessionHandlers,
createToolHandlers,
createAgentHandlers,
createConfigHandlers,
createPairingHandlers,
createIntentHandlers,
createRoutingHandlers,
createHistoryHandlers,
} from './handlers/index.js';
import type { TokenUsageEntry } from './handlers/system.js';
import type { SessionManager } from '../session/manager.js';
import type { Config } from '../config/index.js';
import type { ToolRegistry } from '../tools/registry.js';
import type { ToolExecutor } from '../tools/executor.js';
import type { WebhookHandler } from '../automation/webhooks.js';
import type { GmailWatcher } from '../automation/gmail.js';
import type { PairingManager } from '../channels/pairing.js';
import type { MemoryStore } from '../memory/store.js';
import type { CommandRegistry } from '../commands/index.js';
import type { ComponentRegistry } from '../intents/index.js';
import type { RoutingPolicy } from '../routing/index.js';
export interface GatewayServerConfig {
port: number;
host?: string;
sessionManager: SessionManager;
modelClient: SessionBridgeConfig['modelClient'];
systemPrompt: string;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
version?: string;
auth?: AuthConfig;
/** When true, only one WebSocket client can be connected at a time. */
lock?: boolean;
/** Whether to apply token auth to HTTP requests too (default: true when token is set). */
authHttp?: boolean;
uiDir?: string;
config?: Config;
/** Optional callback for system.restart. Should trigger graceful shutdown + process restart. */
restart?: () => Promise<void>;
channelRegistry?: { list(): Array<{ readonly name: string; readonly status: string }> };
/** Optional webhook handler for inbound webhook HTTP routes. */
webhookHandler?: WebhookHandler;
/** Optional Gmail handler for Pub/Sub push notifications. */
gmailHandler?: GmailWatcher;
/** Optional callback to retrieve per-session token usage data for the dashboard. */
getTokenUsage?: () => TokenUsageEntry[];
/** Optional pairing manager for DM pairing code management via gateway. */
pairingManager?: PairingManager;
memoryStore?: MemoryStore;
commandRegistry?: CommandRegistry;
intentRegistry?: ComponentRegistry;
routingPolicy?: RoutingPolicy;
}
export class GatewayServer {
private wss: WebSocketServer | null = null;
private httpServer: HttpServer | null = null;
private router: Router;
private sessionBridge: SessionBridge;
private laneQueue: LaneQueue;
private metrics: MetricsCollector;
private connectionMap: Map<WebSocket, string> = new Map();
private config: GatewayServerConfig;
private startTime: number = Date.now();
constructor(config: GatewayServerConfig) {
this.config = config;
this.sessionBridge = new SessionBridge({
sessionManager: config.sessionManager,
modelClient: config.modelClient,
systemPrompt: config.systemPrompt,
toolRegistry: config.toolRegistry,
toolExecutor: config.toolExecutor,
config: config.config,
memoryStore: config.memoryStore,
});
this.laneQueue = new LaneQueue();
this.metrics = new MetricsCollector({
getQueueDepth: () => this.laneQueue.totalPending(),
});
this.router = new Router();
this.registerHandlers();
}
private registerHandlers(): void {
const systemHandlers = createSystemHandlers({
startTime: this.startTime,
version: this.config.version ?? '0.1.0',
getSessionCount: () => this.sessionBridge.listSessions().length,
getToolCount: () => this.config.toolRegistry.list().length,
getConnectionCount: () => this.sessionBridge.connectionCount,
restart: this.config.restart,
getChannels: this.config.channelRegistry
? () => this.config.channelRegistry!.list().map(a => ({ name: a.name, status: a.status }))
: undefined,
getUsage: () => ({
totalSessions: this.config.sessionManager.listSessions().length,
activeConnections: this.sessionBridge.connectionCount,
}),
getTokenUsage: this.config.getTokenUsage,
getMetrics: () => this.metrics.getSnapshot(),
getEvents: (opts) => this.metrics.getEvents(opts),
getActiveRequests: () => this.metrics.getActiveRequests(),
});
const sessionHandlers = createSessionHandlers({
sessionManager: this.config.sessionManager,
sessionBridge: this.sessionBridge,
});
const historyHandlers = createHistoryHandlers({
sessionManager: this.config.sessionManager,
});
const toolHandlers = createToolHandlers({
toolRegistry: this.config.toolRegistry,
toolExecutor: this.config.toolExecutor,
});
const agentHandlers = createAgentHandlers({
sessionBridge: this.sessionBridge,
laneQueue: this.laneQueue,
metrics: this.metrics,
sessionManager: this.config.sessionManager,
commandRegistry: this.config.commandRegistry,
});
const intentHandlers = createIntentHandlers({
intentRegistry: this.config.intentRegistry,
enabled: this.config.config?.intents.enabled ?? false,
});
const routingHandlers = createRoutingHandlers({
intentRegistry: this.config.intentRegistry,
routingPolicy: this.config.routingPolicy,
});
// Config handlers (only if config object is provided)
if (this.config.config) {
const configHandlers = createConfigHandlers({ config: this.config.config });
for (const [method, handler] of Object.entries(configHandlers)) {
this.router.register(method, handler);
}
}
// Pairing handlers (only if pairing manager is provided)
if (this.config.pairingManager) {
const pairingHandlers = createPairingHandlers({ pairingManager: this.config.pairingManager });
for (const [method, handler] of Object.entries(pairingHandlers)) {
this.router.register(method, handler);
}
}
// Register all methods
for (const [method, handler] of Object.entries(systemHandlers)) {
this.router.register(method, handler);
}
for (const [method, handler] of Object.entries(sessionHandlers)) {
this.router.register(method, handler);
}
for (const [method, handler] of Object.entries(historyHandlers)) {
this.router.register(method, handler);
}
for (const [method, handler] of Object.entries(toolHandlers)) {
this.router.register(method, handler);
}
for (const [method, handler] of Object.entries(agentHandlers)) {
this.router.register(method, handler);
}
for (const [method, handler] of Object.entries(intentHandlers)) {
this.router.register(method, handler);
}
for (const [method, handler] of Object.entries(routingHandlers)) {
this.router.register(method, handler);
}
}
async start(): Promise<void> {
const host = this.config.host ?? '127.0.0.1';
const { port } = this.config;
return new Promise((resolve) => {
// Create HTTP server first — handles static file requests
this.httpServer = createServer((req: IncomingMessage, res: ServerResponse) => {
this.handleHttpRequest(req, res);
});
// Attach WebSocket server to the shared HTTP server (no separate port)
this.wss = new WebSocketServer({ server: this.httpServer });
this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => {
// Auth check on upgrade — only WS connections require auth
const authResult = authenticateRequest(req, this.config.auth ?? {});
if (!authResult.authenticated) {
ws.close(4001, authResult.error ?? 'Authentication failed');
return;
}
this.handleConnection(ws, authResult.identity);
});
// Register system.lock handler (needs access to connectionMap)
this.router.register('system.lock', async (request) => {
return makeResponse(request.id, {
locked: this.config.lock ?? false,
activeClients: this.connectionMap.size,
maxClients: this.config.lock ? 1 : null,
});
});
this.httpServer.listen(port, host, () => {
console.log(`Gateway server listening on ${host}:${port}`);
resolve();
});
});
}
async stop(): Promise<void> {
// Close all WebSocket connections first
for (const [ws, connectionId] of this.connectionMap) {
this.sessionBridge.disconnect(connectionId);
ws.close(1001, 'Server shutting down');
}
this.connectionMap.clear();
// Close WSS first, then the underlying HTTP server
await new Promise<void>((resolve) => {
if (!this.wss) {
resolve();
return;
}
this.wss.close(() => {
this.wss = null;
resolve();
});
});
await new Promise<void>((resolve) => {
if (!this.httpServer) {
resolve();
return;
}
this.httpServer.close(() => {
this.httpServer = null;
resolve();
});
});
}
private handleConnection(ws: WebSocket, identity?: string): void {
// Gateway lock — reject if another client is already connected
if (this.config.lock && this.connectionMap.size > 0) {
ws.close(4003, 'Gateway locked — another client is already connected');
return;
}
const connectionId = randomUUID();
this.sessionBridge.connect(connectionId);
this.connectionMap.set(ws, connectionId);
ws.on('message', async (data) => {
const raw = data.toString();
await this.handleMessage(ws, connectionId, raw);
});
ws.on('close', () => {
this.sessionBridge.disconnect(connectionId);
this.connectionMap.delete(ws);
});
ws.on('error', (err) => {
console.error(`WebSocket error (${connectionId}):`, err.message);
});
}
/**
* Handle incoming HTTP requests.
* Optionally applies auth (when authHttp is enabled and a token is configured).
* Routes webhook requests before auth; delegates to serveStatic for UI files.
*/
private async handleHttpRequest(req: IncomingMessage, res: ServerResponse): Promise<void> {
// Webhook routes bypass gateway auth (they have their own HMAC auth)
if (this.config.webhookHandler && req.method === 'POST' && req.url) {
const match = req.url.match(/^\/webhooks\/([^/?]+)/);
if (match) {
const webhookName = decodeURIComponent(match[1]);
await this.config.webhookHandler.handleRequest(webhookName, req, res);
return;
}
}
// Health endpoint — unauthenticated for Docker HEALTHCHECK / monitoring
if (req.method === 'GET' && req.url === '/health') {
const channelList = this.config.channelRegistry?.list().map(a => a.name) ?? [];
const body = JSON.stringify({
status: 'ok',
uptime: Math.floor((Date.now() - this.startTime) / 1000),
version: this.config.version ?? '0.1.0',
sessions: this.sessionBridge.listSessions().length,
connections: this.sessionBridge.connectionCount,
tools: this.config.toolRegistry.list().length,
channels: channelList,
});
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(body);
return;
}
// Gmail Pub/Sub push route — bypass gateway auth (Google sends push notifications directly)
if (this.config.gmailHandler && req.method === 'POST' && req.url?.startsWith('/gmail/push')) {
try {
const body = await this.readRequestBody(req);
const parsed = JSON.parse(body) as { message?: { data?: string } };
const data = parsed?.message?.data;
if (data) {
await this.config.gmailHandler.handlePushNotification(data);
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ ok: true }));
} catch (err) {
console.error('Gmail push handler error:', err instanceof Error ? err.message : err);
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid request' }));
}
return;
}
// Apply auth to HTTP requests when configured
const authConfig = this.config.auth ?? {};
if (this.config.authHttp !== false && authConfig.token) {
const authResult = authenticateRequest(req, authConfig);
if (!authResult.authenticated) {
res.writeHead(401, {
'Content-Type': 'text/plain',
'WWW-Authenticate': 'Bearer',
});
res.end(authResult.error ?? 'Unauthorized');
return;
}
}
const uiDir = this.config.uiDir;
if (uiDir) {
const served = await serveStatic(req, res, uiDir);
if (served) {return;}
}
// No UI directory configured, or file not found
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('Not Found');
}
private async handleMessage(ws: WebSocket, connectionId: string, raw: string): Promise<void> {
const request = parseMessage(raw);
if (!request) {
this.send(ws, makeError(0, ErrorCode.ParseError, 'Invalid JSON or missing required fields'));
return;
}
// Inject connectionId into params so handlers can identify the client
if (!request.params) {request.params = {};}
request.params.connectionId = connectionId;
const send = (msg: OutboundMessage) => this.send(ws, msg);
const response = await this.router.dispatch(request, send);
if (response) {
this.send(ws, response);
}
}
private send(ws: WebSocket, msg: OutboundMessage): void {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(msg));
}
}
/** Get the underlying WebSocketServer (for testing). */
getWss(): WebSocketServer | null {
return this.wss;
}
/** Get the underlying HTTP server (for testing). */
getHttpServer(): HttpServer | null {
return this.httpServer;
}
/** Get the session bridge (for testing/debugging). */
getSessionBridge(): SessionBridge {
return this.sessionBridge;
}
/** Get the metrics collector (for external wiring). */
getMetrics(): MetricsCollector {
return this.metrics;
}
/** Get list of registered methods. */
getMethods(): string[] {
return this.router.listMethods();
}
/** Set the webhook handler for inbound webhook HTTP routes (late binding). */
setWebhookHandler(handler: WebhookHandler): void {
this.config.webhookHandler = handler;
}
/** Set the Gmail handler for Pub/Sub push notifications (late binding). */
setGmailHandler(handler: GmailWatcher): void {
this.config.gmailHandler = handler;
}
/** Read the full request body as a string. */
private readRequestBody(req: IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
req.on('data', (chunk: Buffer) => chunks.push(chunk));
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8')));
req.on('error', reject);
});
}
}