feat(gateway): add WebSocket gateway with JSON-RPC protocol and auth

Phase 2 of the Flynn roadmap. Adds a WebSocket gateway server that
starts alongside the Telegram bot, providing real-time API access to
the agent, sessions, and tools.

Protocol: JSON-RPC-like (request/response/event) over WebSocket.
8 methods: agent.send, agent.cancel, sessions.list, sessions.history,
sessions.create, tools.list, tools.invoke, system.health.

Auth: Bearer token + Tailscale identity header support.
Session bridge: per-connection agent instances with shared model router.

New files: src/gateway/ (protocol, router, server, auth, session-bridge,
handlers for agent/sessions/tools/system).
57 new tests (181 total), typecheck clean.
This commit is contained in:
William Valentin
2026-02-05 19:11:25 -08:00
parent ad7fc241f1
commit f30a8bc318
21 changed files with 1878 additions and 2 deletions
+77
View File
@@ -0,0 +1,77 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import type { SendFn } from '../router.js';
import { makeEvent, makeError, ErrorCode } from '../protocol.js';
import type { SessionBridge } from '../session-bridge.js';
export interface AgentHandlerDeps {
sessionBridge: SessionBridge;
}
export function createAgentHandlers(deps: AgentHandlerDeps) {
return {
'agent.send': async (request: GatewayRequest, send: SendFn): Promise<OutboundMessage | void> => {
const params = request.params as { message?: string; connectionId?: string } | undefined;
if (!params?.message) {
return makeError(request.id, ErrorCode.InvalidRequest, 'message is required');
}
const connectionId = params.connectionId as string;
if (!connectionId) {
return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required (set by server)');
}
if (deps.sessionBridge.isBusy(connectionId)) {
return makeError(request.id, ErrorCode.AgentBusy, 'Agent is already processing a request');
}
const agent = deps.sessionBridge.getAgent(connectionId);
if (!agent) {
return makeError(request.id, ErrorCode.SessionNotFound, 'No agent for this connection');
}
deps.sessionBridge.setBusy(connectionId, true);
// Set up tool use callback to emit streaming events
deps.sessionBridge.setOnToolUse(connectionId, (event) => {
if (event.type === 'start') {
send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args }));
} else if (event.type === 'end') {
send(makeEvent(request.id, 'tool_end', {
tool: event.tool,
result: event.result ? {
success: event.result.success,
output: event.result.output,
error: event.result.error,
} : undefined,
}));
}
});
try {
const response = await agent.process(params.message);
send(makeEvent(request.id, 'done', { content: response }));
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
} finally {
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
}
},
'agent.cancel': async (request: GatewayRequest): Promise<OutboundMessage> => {
// Cancel is a placeholder — proper cancellation requires abort controller support in NativeAgent.
// For now, just report whether the agent was busy.
const params = request.params as { connectionId?: string } | undefined;
const connectionId = params?.connectionId as string;
if (!connectionId) {
return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required');
}
const wasBusy = deps.sessionBridge.isBusy(connectionId);
// TODO: Wire AbortController into NativeAgent for actual cancellation
return { id: request.id, result: { cancelled: wasBusy } };
},
};
}
+271
View File
@@ -0,0 +1,271 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createSystemHandlers } from './system.js';
import { createSessionHandlers } from './sessions.js';
import { createToolHandlers } from './tools.js';
import { createAgentHandlers } from './agent.js';
import { ErrorCode } from '../protocol.js';
import type { GatewayRequest, GatewayResponse, GatewayError, GatewayEvent, OutboundMessage } from '../protocol.js';
describe('system handlers', () => {
const deps = {
startTime: Date.now() - 60_000,
version: '0.1.0',
getSessionCount: () => 3,
getToolCount: () => 6,
getConnectionCount: () => 2,
};
const handlers = createSystemHandlers(deps);
it('system.health returns status info', async () => {
const req: GatewayRequest = { id: 1, method: 'system.health' };
const result = await handlers['system.health'](req) as GatewayResponse;
expect(result.id).toBe(1);
const r = result.result as Record<string, unknown>;
expect(r.status).toBe('ok');
expect(r.version).toBe('0.1.0');
expect(r.sessions).toBe(3);
expect(r.tools).toBe(6);
expect(r.connections).toBe(2);
expect(typeof r.uptime).toBe('number');
expect(r.uptime).toBeGreaterThanOrEqual(59);
});
});
describe('session handlers', () => {
const mockHistory = [
{ role: 'user' as const, content: 'hello' },
{ role: 'assistant' as const, content: 'hi' },
];
const mockSession = {
id: 'ws:test',
addMessage: vi.fn(),
getHistory: vi.fn(() => mockHistory),
clear: vi.fn(),
};
const mockSessionManager = {
listSessions: vi.fn(() => ['ws:test']),
getSession: vi.fn(() => mockSession),
transferSession: vi.fn(),
closeSession: vi.fn(),
};
const handlers = createSessionHandlers({
sessionManager: mockSessionManager as any,
});
beforeEach(() => {
vi.clearAllMocks();
mockSessionManager.listSessions.mockReturnValue(['ws:test']);
mockSessionManager.getSession.mockReturnValue(mockSession);
mockSession.getHistory.mockReturnValue(mockHistory);
});
it('sessions.list returns session list with message counts', async () => {
const req: GatewayRequest = { id: 1, method: 'sessions.list' };
const result = await handlers['sessions.list'](req) as GatewayResponse;
expect(result.id).toBe(1);
const r = result.result as { sessions: Array<{ id: string; messageCount: number }> };
expect(r.sessions).toHaveLength(1);
expect(r.sessions[0].id).toBe('ws:test');
expect(r.sessions[0].messageCount).toBe(2);
});
it('sessions.history returns messages with pagination', async () => {
const req: GatewayRequest = { id: 2, method: 'sessions.history', params: { sessionId: 'ws:test', limit: 1, offset: 0 } };
const result = await handlers['sessions.history'](req) as GatewayResponse;
const r = result.result as { messages: unknown[]; total: number };
expect(r.messages).toHaveLength(1);
expect(r.total).toBe(2);
});
it('sessions.history requires sessionId', async () => {
const req: GatewayRequest = { id: 3, method: 'sessions.history', params: {} };
const result = await handlers['sessions.history'](req) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
});
it('sessions.create creates a new session', async () => {
const req: GatewayRequest = { id: 4, method: 'sessions.create', params: { sessionId: 'ws:new' } };
const result = await handlers['sessions.create'](req) as GatewayResponse;
const r = result.result as { sessionId: string };
expect(r.sessionId).toBe('ws:new');
expect(mockSessionManager.getSession).toHaveBeenCalledWith('ws', 'new');
});
it('sessions.create auto-generates session ID', async () => {
const req: GatewayRequest = { id: 5, method: 'sessions.create' };
const result = await handlers['sessions.create'](req) as GatewayResponse;
const r = result.result as { sessionId: string };
expect(r.sessionId).toMatch(/^ws:\d+$/);
});
});
describe('tool handlers', () => {
const mockTool = {
name: 'test.tool',
description: 'A test tool',
inputSchema: { type: 'object' as const, properties: {} },
execute: vi.fn(),
};
const mockRegistry = {
list: vi.fn(() => [mockTool]),
get: vi.fn((name: string) => (name === 'test.tool' ? mockTool : undefined)),
register: vi.fn(),
toAnthropicFormat: vi.fn(),
toOpenAIFormat: vi.fn(),
};
const mockExecutor = {
execute: vi.fn(async () => ({ success: true, output: 'done' })),
};
const handlers = createToolHandlers({
toolRegistry: mockRegistry as any,
toolExecutor: mockExecutor as any,
});
beforeEach(() => {
vi.clearAllMocks();
mockRegistry.list.mockReturnValue([mockTool]);
mockRegistry.get.mockImplementation((name: string) => (name === 'test.tool' ? mockTool : undefined));
mockExecutor.execute.mockResolvedValue({ success: true, output: 'done' });
});
it('tools.list returns tool definitions', async () => {
const req: GatewayRequest = { id: 1, method: 'tools.list' };
const result = await handlers['tools.list'](req) as GatewayResponse;
const r = result.result as { tools: Array<{ name: string }> };
expect(r.tools).toHaveLength(1);
expect(r.tools[0].name).toBe('test.tool');
});
it('tools.invoke executes a tool', async () => {
const req: GatewayRequest = { id: 2, method: 'tools.invoke', params: { tool: 'test.tool', args: {} } };
const result = await handlers['tools.invoke'](req) as GatewayResponse;
expect(result.result).toEqual({ success: true, output: 'done' });
expect(mockExecutor.execute).toHaveBeenCalledWith('test.tool', {});
});
it('tools.invoke errors on missing tool name', async () => {
const req: GatewayRequest = { id: 3, method: 'tools.invoke', params: {} };
const result = await handlers['tools.invoke'](req) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
});
it('tools.invoke errors on unknown tool', async () => {
const req: GatewayRequest = { id: 4, method: 'tools.invoke', params: { tool: 'unknown' } };
const result = await handlers['tools.invoke'](req) as GatewayError;
expect(result.error.code).toBe(ErrorCode.ToolNotFound);
});
});
describe('agent handlers', () => {
const mockAgent = {
process: vi.fn(async () => 'response text'),
setOnToolUse: vi.fn(),
};
const mockBridge = {
getAgent: vi.fn(() => mockAgent),
getSessionId: vi.fn(() => 'ws:conn-1'),
isBusy: vi.fn(() => false),
setBusy: vi.fn(),
setOnToolUse: vi.fn(),
};
const handlers = createAgentHandlers({
sessionBridge: mockBridge as any,
});
beforeEach(() => {
vi.clearAllMocks();
mockBridge.isBusy.mockReturnValue(false);
mockBridge.getAgent.mockReturnValue(mockAgent);
mockAgent.process.mockResolvedValue('response text');
});
it('agent.send processes message and sends done event', async () => {
const req: GatewayRequest = { id: 1, method: 'agent.send', params: { message: 'hello', connectionId: 'conn-1' } };
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
await handlers['agent.send'](req, send);
expect(mockAgent.process).toHaveBeenCalledWith('hello');
expect(sent).toHaveLength(1);
const doneEvent = sent[0] as GatewayEvent;
expect(doneEvent.event).toBe('done');
expect((doneEvent.data as any).content).toBe('response text');
});
it('agent.send requires message', async () => {
const req: GatewayRequest = { id: 2, method: 'agent.send', params: { connectionId: 'conn-1' } };
const send = vi.fn();
const result = await handlers['agent.send'](req, send) as GatewayError;
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
expect(result.error.message).toContain('message');
});
it('agent.send rejects when busy', async () => {
mockBridge.isBusy.mockReturnValue(true);
const req: GatewayRequest = { id: 3, method: 'agent.send', params: { message: 'hi', connectionId: 'conn-1' } };
const send = vi.fn();
const result = await handlers['agent.send'](req, send) as GatewayError;
expect(result.error.code).toBe(ErrorCode.AgentBusy);
});
it('agent.send handles errors gracefully', async () => {
mockAgent.process.mockRejectedValue(new Error('model failed'));
const req: GatewayRequest = { id: 4, method: 'agent.send', params: { message: 'hi', connectionId: 'conn-1' } };
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
await handlers['agent.send'](req, send);
const errorEvent = sent[0] as GatewayEvent;
expect(errorEvent.event).toBe('error');
expect((errorEvent.data as any).message).toBe('model failed');
});
it('agent.send sets and cleans up tool use callback', async () => {
const req: GatewayRequest = { id: 5, method: 'agent.send', params: { message: 'hi', connectionId: 'conn-1' } };
const send = vi.fn();
await handlers['agent.send'](req, send);
// setOnToolUse called twice: once to set callback, once to clear it
expect(mockBridge.setOnToolUse).toHaveBeenCalledTimes(2);
expect(mockBridge.setOnToolUse).toHaveBeenLastCalledWith('conn-1', undefined);
});
it('agent.send sets busy state correctly', async () => {
const req: GatewayRequest = { id: 6, method: 'agent.send', params: { message: 'hi', connectionId: 'conn-1' } };
const send = vi.fn();
await handlers['agent.send'](req, send);
expect(mockBridge.setBusy).toHaveBeenCalledWith('conn-1', true);
expect(mockBridge.setBusy).toHaveBeenCalledWith('conn-1', false);
});
it('agent.cancel returns cancelled state', async () => {
mockBridge.isBusy.mockReturnValue(true);
const req: GatewayRequest = { id: 7, method: 'agent.cancel', params: { connectionId: 'conn-1' } };
const result = await handlers['agent.cancel'](req) as GatewayResponse;
expect((result.result as any).cancelled).toBe(true);
});
});
+8
View File
@@ -0,0 +1,8 @@
export { createSystemHandlers } from './system.js';
export type { SystemHandlerDeps } from './system.js';
export { createSessionHandlers } from './sessions.js';
export type { SessionHandlerDeps } from './sessions.js';
export { createToolHandlers } from './tools.js';
export type { ToolHandlerDeps } from './tools.js';
export { createAgentHandlers } from './agent.js';
export type { AgentHandlerDeps } from './agent.js';
+59
View File
@@ -0,0 +1,59 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import { makeResponse, makeError, ErrorCode } from '../protocol.js';
import type { SessionManager } from '../../session/manager.js';
export interface SessionHandlerDeps {
sessionManager: SessionManager;
}
export function createSessionHandlers(deps: SessionHandlerDeps) {
return {
'sessions.list': async (request: GatewayRequest): Promise<OutboundMessage> => {
const sessionIds = deps.sessionManager.listSessions();
const sessions = sessionIds.map(id => ({
id,
messageCount: deps.sessionManager.getSession(
id.split(':')[0],
id.split(':').slice(1).join(':')
).getHistory().length,
}));
return makeResponse(request.id, { sessions });
},
'sessions.history': async (request: GatewayRequest): Promise<OutboundMessage> => {
const params = request.params as { sessionId?: string; limit?: number; offset?: number } | undefined;
if (!params?.sessionId) {
return makeError(request.id, ErrorCode.InvalidRequest, 'sessionId is required');
}
const { sessionId, limit, offset } = params;
const parts = sessionId.split(':');
const frontend = parts[0];
const userId = parts.slice(1).join(':');
const session = deps.sessionManager.getSession(frontend, userId);
const allMessages = session.getHistory();
const start = offset ?? 0;
const end = limit ? start + limit : allMessages.length;
const messages = allMessages.slice(start, end);
return makeResponse(request.id, {
messages,
total: allMessages.length,
});
},
'sessions.create': async (request: GatewayRequest): Promise<OutboundMessage> => {
const params = request.params as { sessionId?: string } | undefined;
const sessionId = params?.sessionId ?? `ws:${Date.now()}`;
const parts = sessionId.split(':');
const frontend = parts[0];
const userId = parts.slice(1).join(':');
// Creating a session via getSession is idempotent
deps.sessionManager.getSession(frontend, userId);
return makeResponse(request.id, { sessionId });
},
};
}
+25
View File
@@ -0,0 +1,25 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import { makeResponse } from '../protocol.js';
export interface SystemHandlerDeps {
startTime: number;
version: string;
getSessionCount: () => number;
getToolCount: () => number;
getConnectionCount: () => number;
}
export function createSystemHandlers(deps: SystemHandlerDeps) {
return {
'system.health': async (request: GatewayRequest): Promise<OutboundMessage> => {
return makeResponse(request.id, {
status: 'ok',
uptime: Math.floor((Date.now() - deps.startTime) / 1000),
version: deps.version,
sessions: deps.getSessionCount(),
tools: deps.getToolCount(),
connections: deps.getConnectionCount(),
});
},
};
}
+37
View File
@@ -0,0 +1,37 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import { makeResponse, makeError, ErrorCode } from '../protocol.js';
import type { ToolRegistry } from '../../tools/registry.js';
import type { ToolExecutor } from '../../tools/executor.js';
export interface ToolHandlerDeps {
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
}
export function createToolHandlers(deps: ToolHandlerDeps) {
return {
'tools.list': async (request: GatewayRequest): Promise<OutboundMessage> => {
const tools = deps.toolRegistry.list().map(t => ({
name: t.name,
description: t.description,
inputSchema: t.inputSchema,
}));
return makeResponse(request.id, { tools });
},
'tools.invoke': async (request: GatewayRequest): Promise<OutboundMessage> => {
const params = request.params as { tool?: string; args?: Record<string, unknown> } | undefined;
if (!params?.tool) {
return makeError(request.id, ErrorCode.InvalidRequest, 'tool name is required');
}
const tool = deps.toolRegistry.get(params.tool);
if (!tool) {
return makeError(request.id, ErrorCode.ToolNotFound, `Tool not found: ${params.tool}`);
}
const result = await deps.toolExecutor.execute(params.tool, params.args ?? {});
return makeResponse(request.id, result);
},
};
}