From ac60fa5be3bc9cb19b33b2d1bc20ad494e94495b Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 25 Feb 2026 11:12:21 -0800 Subject: [PATCH] feat(companion): add reconnect resilience --- docs/api/PROTOCOL.md | 5 +- docs/architecture/AGENT_DIAGRAM.md | 1 + .../GATEWAY_SESSIONS_AND_QUEUE.md | 1 + docs/plans/state.json | 20 +++- src/cli/companion.test.ts | 22 +++- src/cli/companion.ts | 94 ++++++++++++---- src/companion/index.ts | 2 + src/companion/runtimeClient.test.ts | 75 +++++++++++++ src/companion/runtimeClient.ts | 104 ++++++++++++++++++ 9 files changed, 297 insertions(+), 27 deletions(-) diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index 8a30732..fd25957 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -1067,6 +1067,9 @@ Flynn now propagates a run-level abort signal into model/tool execution, so prov Register node role/capabilities for the current WebSocket connection. +Registration is scoped to the connection. If a companion reconnects it must call `node.register` again +to restore node identity, capabilities, and access to `node.*` methods. + **Request:** ```json { @@ -1840,5 +1843,5 @@ For more implementation details, see: - Protocol types: `src/gateway/protocol.ts` - Handlers: `src/gateway/handlers/` - Gateway server: `src/gateway/server.ts` -- Companion runtime client helper: `src/companion/runtimeClient.ts` (node + system + `canvas.*` typed RPC wrappers, optional `autoConnect`) +- Companion runtime client helper: `src/companion/runtimeClient.ts` (node + system + `canvas.*` typed RPC wrappers, optional `autoConnect`/`autoReconnect`, connection event subscriptions) - Platform companion wrappers: `src/companion/platformClients.ts` diff --git a/docs/architecture/AGENT_DIAGRAM.md b/docs/architecture/AGENT_DIAGRAM.md index 5f33795..a2865a8 100644 --- a/docs/architecture/AGENT_DIAGRAM.md +++ b/docs/architecture/AGENT_DIAGRAM.md @@ -143,6 +143,7 @@ Gateway streaming UX signals: - WebSocket `agent.send` emits `run_state` lifecycle events (`start`, `cancel_requested`, `cancelled`, `complete`, `error`) for UI/state rendering. - Routing applies reaction rules with deterministic priority/cooldown (and recursion guard) before intent routing. +- Companion nodes re-register `node.*` capabilities after reconnect; runtime clients can auto-reconnect and surface connection events. Key files: diff --git a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md index 99bb730..74bcdca 100644 --- a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md +++ b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md @@ -16,6 +16,7 @@ If you only want the protocol surface, see `docs/api/PROTOCOL.md`. - Backend routing outcomes are auditable via `backend.route` / `backend.success` / `backend.fallback`, which enables offline canary evaluation without changing gateway protocol methods. - Run lifecycle/cancel intent and reaction decisions are emitted to audit logs, and aggregated into `system.metrics` counters (runStates, cancelLatencyMs, reactions) for dashboards. - Reaction matching is deterministic (priority + cooldown + recursion guard) before intent/agent routing. +- Companion `node.*` registration is per WebSocket connection; reconnects must re-register capabilities before invoking node RPC methods. ## Component Map diff --git a/docs/plans/state.json b/docs/plans/state.json index 99ffde5..25a793a 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -6699,10 +6699,28 @@ "docs/plans/state.json" ], "test_status": "pnpm test:run src/automation/reactions.test.ts src/config/schema.test.ts src/daemon/routing.test.ts passing" + }, + "deeper-surfaces-phase3-companion-reconnect": { + "status": "completed", + "date": "2026-02-25", + "updated": "2026-02-25", + "summary": "Hardened companion runtime connectivity with auto-reconnect support, connection event subscriptions, CLI re-registration/heartbeat resilience, and updated protocol/architecture notes plus targeted tests.", + "files_modified": [ + "src/companion/runtimeClient.ts", + "src/companion/runtimeClient.test.ts", + "src/companion/index.ts", + "src/cli/companion.ts", + "src/cli/companion.test.ts", + "docs/api/PROTOCOL.md", + "docs/architecture/AGENT_DIAGRAM.md", + "docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/companion/runtimeClient.test.ts src/cli/companion.test.ts passing" } }, "overall_progress": { - "total_test_count": 2018, + "total_test_count": 2020, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", diff --git a/src/cli/companion.test.ts b/src/cli/companion.test.ts index 286e813..3842cd4 100644 --- a/src/cli/companion.test.ts +++ b/src/cli/companion.test.ts @@ -7,13 +7,14 @@ const { mockRuntimeCtorArgs, mockRuntimeInstances, } = vi.hoisted(() => { - const runtimeCtorArgs: Array<{ url: string; token?: string }> = []; + const runtimeCtorArgs: Array<{ url: string; token?: string; autoReconnect?: boolean }> = []; const runtimeInstances: Array<{ connect: ReturnType; registerNode: ReturnType; setNodeStatus: ReturnType; subscribeAgentStream: ReturnType; subscribeAgentTyping: ReturnType; + subscribeConnectionEvents: ReturnType; disconnect: ReturnType; }> = []; @@ -43,7 +44,13 @@ vi.mock('./shared.js', () => ({ vi.mock('../companion/index.js', () => ({ CompanionRuntimeClient: class { - connect = vi.fn(async () => undefined); + private connectionHandlers: Array<(event: { status: string }) => void> = []; + connect = vi.fn(async () => { + for (const handler of this.connectionHandlers) { + handler({ status: 'connected' }); + } + return undefined; + }); registerNode = vi.fn(async ({ nodeId, role, capabilities }: { nodeId: string; role: string; capabilities: string[] }) => ({ registered: true, node: { id: nodeId, role }, @@ -53,9 +60,13 @@ vi.mock('../companion/index.js', () => ({ setNodeStatus = vi.fn(async () => ({ updated: true, node: { id: 'n', role: 'companion' } })); subscribeAgentStream = vi.fn(() => () => undefined); subscribeAgentTyping = vi.fn(() => () => undefined); + subscribeConnectionEvents = vi.fn((handler: (event: { status: string }) => void) => { + this.connectionHandlers.push(handler); + return () => undefined; + }); disconnect = vi.fn(() => undefined); - constructor(opts: { url: string; token?: string }) { + constructor(opts: { url: string; token?: string; autoReconnect?: boolean }) { mockRuntimeCtorArgs.push(opts); mockRuntimeInstances.push(this); } @@ -89,7 +100,7 @@ describe('companion command', () => { await program.parseAsync(['node', 'test', 'companion', '--once']); expect(mockGetConfigPath).toHaveBeenCalledOnce(); - expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://127.0.0.1:18888', token: 'config-token' }]); + expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://127.0.0.1:18888', token: 'config-token', autoReconnect: false }]); expect(mockRuntimeInstances[0]?.connect).toHaveBeenCalledOnce(); expect(mockRuntimeInstances[0]?.registerNode).toHaveBeenCalledOnce(); expect(mockRuntimeInstances[0]?.setNodeStatus).toHaveBeenCalledOnce(); @@ -124,7 +135,7 @@ describe('companion command', () => { 'node.push.register', ]); - expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://10.0.0.5:19000', token: 'override-token' }]); + expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://10.0.0.5:19000', token: 'override-token', autoReconnect: false }]); expect(mockRuntimeInstances[0]?.registerNode).toHaveBeenCalledWith(expect.objectContaining({ nodeId: 'test-node', capabilities: ['ui.canvas', 'node.push.register'], @@ -149,4 +160,3 @@ describe('companion command', () => { errSpy.mockRestore(); }); }); - diff --git a/src/cli/companion.ts b/src/cli/companion.ts index a72b310..d1bb3a1 100644 --- a/src/cli/companion.ts +++ b/src/cli/companion.ts @@ -96,10 +96,13 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro const runtime = new CompanionRuntimeClient({ url: gatewayUrl, token: gatewayToken, + autoReconnect: !options.once, }); const stopSignals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM']; let heartbeatTimer: NodeJS.Timeout | null = null; + let registrationPromise: Promise | null = null; + let skipConnectRegistration = true; const cleanup = (): void => { if (heartbeatTimer) { @@ -109,6 +112,53 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro runtime.disconnect(1000, 'Companion shutting down'); }; + const startHeartbeat = (): void => { + if (options.once || heartbeatTimer) { + return; + } + heartbeatTimer = setInterval(() => { + void publishHeartbeat(runtime, platform).catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + console.error(`Heartbeat failed: ${message}`); + }); + }, heartbeatSeconds * 1000); + }; + + const stopHeartbeat = (): void => { + if (!heartbeatTimer) { + return; + } + clearInterval(heartbeatTimer); + heartbeatTimer = null; + }; + + const registerAndHeartbeat = async (label: 'connected' | 'reconnected'): Promise => { + if (registrationPromise) { + return registrationPromise; + } + registrationPromise = (async () => { + const register = await runtime.registerNode({ + nodeId, + role, + capabilities, + }); + + await publishHeartbeat(runtime, platform); + + const verb = label === 'connected' ? 'Connected' : 'Reconnected'; + console.log(`${verb} companion node ${register.node.id} (${platform}, role=${role})`); + console.log(`Gateway: ${gatewayUrl}`); + console.log(`Capabilities: ${capabilities.join(', ') || '(none)'}`); + + startHeartbeat(); + })(); + try { + await registrationPromise; + } finally { + registrationPromise = null; + } + }; + for (const signal of stopSignals) { process.once(signal, cleanup); } @@ -128,32 +178,39 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro console.log(`[agent.typing${session}] ${phase}`); }); + runtime.subscribeConnectionEvents((event) => { + if (event.status === 'connected') { + if (skipConnectRegistration) { + skipConnectRegistration = false; + return; + } + void registerAndHeartbeat('reconnected').catch((error: unknown) => { + const message = error instanceof Error ? error.message : String(error); + console.error(`Companion re-registration failed: ${message}`); + }); + return; + } + + if (event.status === 'reconnecting') { + console.log(`Gateway disconnected. Reconnecting in ${Math.ceil(event.delayMs / 1000)}s (attempt ${event.attempt})...`); + return; + } + + stopHeartbeat(); + const reason = event.reason ? ` (${event.reason})` : ''; + console.log(`Gateway disconnected${reason}.`); + }); + try { await runtime.connect(); - const register = await runtime.registerNode({ - nodeId, - role, - capabilities, - }); - - await publishHeartbeat(runtime, platform); - - console.log(`Connected companion node ${register.node.id} (${platform}, role=${role})`); - console.log(`Gateway: ${gatewayUrl}`); - console.log(`Capabilities: ${capabilities.join(', ') || '(none)'}`); + await registerAndHeartbeat('connected'); + skipConnectRegistration = false; if (options.once) { cleanup(); return; } - heartbeatTimer = setInterval(() => { - void publishHeartbeat(runtime, platform).catch((error: unknown) => { - const message = error instanceof Error ? error.message : String(error); - console.error(`Heartbeat failed: ${message}`); - }); - }, heartbeatSeconds * 1000); - await new Promise(() => { // Keep process alive until interrupted. }); @@ -186,4 +243,3 @@ export function registerCompanionCommand(program: Command): void { } }); } - diff --git a/src/companion/index.ts b/src/companion/index.ts index 5531f48..ecc84a7 100644 --- a/src/companion/index.ts +++ b/src/companion/index.ts @@ -22,6 +22,8 @@ export type { CompanionEventName, CompanionEventPredicate, CompanionEventEnvelope, + CompanionConnectionEvent, + CompanionConnectionHandler, RegisterNodeInput, ListNodesInput, SetNodeStatusInput, diff --git a/src/companion/runtimeClient.test.ts b/src/companion/runtimeClient.test.ts index 9b1efd0..b364085 100644 --- a/src/companion/runtimeClient.test.ts +++ b/src/companion/runtimeClient.test.ts @@ -108,6 +108,22 @@ describe('CompanionRuntimeClient', () => { }).toThrow('requestTimeoutMs must be a positive number'); }); + it('validates reconnect delay options', () => { + expect(() => { + new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + reconnectDelayMs: 0, + }); + }).toThrow('reconnectDelayMs must be a positive number'); + + expect(() => { + new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + reconnectMaxDelayMs: 0, + }); + }).toThrow('reconnectMaxDelayMs must be a positive number'); + }); + it('dispatches gateway events to subscribed handlers and supports unsubscribe', () => { const client = new CompanionRuntimeClient({ url: 'ws://127.0.0.1:1', @@ -845,6 +861,65 @@ describe('CompanionRuntimeClient', () => { }); }); + it('emits connection events and reconnects when enabled', async () => { + vi.useFakeTimers(); + const events: Array<{ status: string }> = []; + let created = 0; + + class FakeWebSocket extends EventEmitter { + readyState: number = WebSocket.CONNECTING; + + constructor() { + super(); + queueMicrotask(() => { + this.readyState = WebSocket.OPEN; + this.emit('open'); + }); + } + + send(_payload: string, callback?: (error?: Error) => void): void { + callback?.(); + } + + close(_code?: number, _reason?: string): void { + this.readyState = WebSocket.CLOSED; + this.emit('close', 1006, Buffer.from('drop')); + } + } + + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + autoReconnect: true, + reconnectDelayMs: 10, + reconnectMaxDelayMs: 10, + websocketFactory: () => { + created += 1; + return new FakeWebSocket() as unknown as WebSocket; + }, + }); + + client.subscribeConnectionEvents((event) => { + events.push({ status: event.status }); + }); + + await client.connect(); + expect(created).toBe(1); + expect(events.map((event) => event.status)).toEqual(['connected']); + + const ws = (client as unknown as { ws: WebSocket | null }).ws; + ws?.close(); + + expect(events.map((event) => event.status)).toEqual(['connected', 'disconnected', 'reconnecting']); + + await vi.advanceTimersByTimeAsync(10); + await Promise.resolve(); + await Promise.resolve(); + + expect(created).toBe(2); + expect(events.map((event) => event.status)).toEqual(['connected', 'disconnected', 'reconnecting', 'connected']); + vi.useRealTimers(); + }); + it('manual disconnect metadata is not overwritten by local close event', async () => { class FakeWebSocket extends EventEmitter { readyState: number = WebSocket.CONNECTING; diff --git a/src/companion/runtimeClient.ts b/src/companion/runtimeClient.ts index ba68ec7..6d0395e 100644 --- a/src/companion/runtimeClient.ts +++ b/src/companion/runtimeClient.ts @@ -38,6 +38,9 @@ export interface CompanionRuntimeClientOptions { token?: string; requestTimeoutMs?: number; autoConnect?: boolean; + autoReconnect?: boolean; + reconnectDelayMs?: number; + reconnectMaxDelayMs?: number; websocketFactory?: (url: string) => WebSocket; } @@ -83,6 +86,13 @@ export type CompanionEventEnvelope = { data: TData; }; +export type CompanionConnectionEvent = + | { status: 'connected' } + | { status: 'disconnected'; code?: number; reason?: string } + | { status: 'reconnecting'; attempt: number; delayMs: number }; + +export type CompanionConnectionHandler = (event: CompanionConnectionEvent) => void; + export const COMPANION_EVENT_NAMES = { agentStream: 'agent.stream', agentTyping: 'agent.typing', @@ -307,6 +317,9 @@ export class CompanionRuntimeClient { private readonly token?: string; private readonly requestTimeoutMs: number; private readonly autoConnect: boolean; + private readonly autoReconnect: boolean; + private readonly reconnectInitialDelayMs: number; + private readonly reconnectMaxDelayMs: number; private readonly websocketFactory: (url: string) => WebSocket; private ws: WebSocket | null = null; @@ -314,19 +327,36 @@ export class CompanionRuntimeClient { private nextId = 1; private pending = new Map(); private readonly eventHandlers = new Set(); + private readonly connectionHandlers = new Set(); private readonly pendingEventWaits = new Set<(error: Error) => void>(); private _lastDisconnectCode: number | undefined; private _lastDisconnectReason: string | undefined; + private reconnectDelayMs: number; + private reconnectTimer: NodeJS.Timeout | null = null; + private reconnectAttempt = 0; + private shouldReconnect = false; constructor(options: CompanionRuntimeClientOptions) { const requestTimeoutMs = options.requestTimeoutMs ?? 15_000; if (!Number.isFinite(requestTimeoutMs) || requestTimeoutMs <= 0) { throw new Error('requestTimeoutMs must be a positive number'); } + const reconnectDelayMs = options.reconnectDelayMs ?? 1_000; + if (!Number.isFinite(reconnectDelayMs) || reconnectDelayMs <= 0) { + throw new Error('reconnectDelayMs must be a positive number'); + } + const reconnectMaxDelayMs = options.reconnectMaxDelayMs ?? 30_000; + if (!Number.isFinite(reconnectMaxDelayMs) || reconnectMaxDelayMs <= 0) { + throw new Error('reconnectMaxDelayMs must be a positive number'); + } this.url = options.url; this.token = options.token; this.requestTimeoutMs = requestTimeoutMs; this.autoConnect = options.autoConnect ?? false; + this.autoReconnect = options.autoReconnect ?? false; + this.reconnectInitialDelayMs = reconnectDelayMs; + this.reconnectMaxDelayMs = Math.max(reconnectDelayMs, reconnectMaxDelayMs); + this.reconnectDelayMs = this.reconnectInitialDelayMs; this.websocketFactory = options.websocketFactory ?? ((url) => new WebSocket(url)); } @@ -400,6 +430,8 @@ export class CompanionRuntimeClient { return this.connectPromise; } + this.shouldReconnect = true; + this.clearReconnectTimer(); this.connectPromise = this.openConnection(); try { await this.connectPromise; @@ -418,6 +450,7 @@ export class CompanionRuntimeClient { cleanup(); settled = true; this.ws = ws; + this.resetReconnectDelay(); this._lastDisconnectCode = undefined; this._lastDisconnectReason = undefined; this.ws.on('message', (raw) => this.handleMessage(raw.toString())); @@ -429,11 +462,18 @@ export class CompanionRuntimeClient { this.ws = null; this.rejectAllPending(new Error('WebSocket closed')); this.rejectEventWaits(new Error('WebSocket closed')); + this.emitConnectionEvent({ + status: 'disconnected', + code, + reason: this._lastDisconnectReason, + }); + this.scheduleReconnect(); } }); this.ws.on('error', () => { // close event handles pending rejection }); + this.emitConnectionEvent({ status: 'connected' }); resolve(); }; @@ -463,11 +503,66 @@ export class CompanionRuntimeClient { }); } + private emitConnectionEvent(event: CompanionConnectionEvent): void { + for (const handler of this.connectionHandlers) { + try { + handler(event); + } catch { + // Connection handlers are userland callbacks; isolate failures. + } + } + } + + private resetReconnectDelay(): void { + this.reconnectDelayMs = this.reconnectInitialDelayMs; + this.reconnectAttempt = 0; + } + + private clearReconnectTimer(): void { + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + } + + private scheduleReconnect(): void { + if (!this.autoReconnect || !this.shouldReconnect) { + return; + } + if (this.reconnectTimer) { + return; + } + const delayMs = this.reconnectDelayMs; + this.reconnectAttempt += 1; + this.emitConnectionEvent({ + status: 'reconnecting', + attempt: this.reconnectAttempt, + delayMs, + }); + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + if (!this.shouldReconnect) { + return; + } + this.connect() + .then(() => { + this.resetReconnectDelay(); + }) + .catch(() => { + this.reconnectDelayMs = Math.min(this.reconnectDelayMs * 2, this.reconnectMaxDelayMs); + this.scheduleReconnect(); + }); + }, delayMs); + } + disconnect(code?: number, reason?: string): void { this._lastDisconnectCode = code; this._lastDisconnectReason = reason; + this.shouldReconnect = false; + this.clearReconnectTimer(); if (!this.ws) { this.rejectEventWaits(new Error('Disconnected')); + this.emitConnectionEvent({ status: 'disconnected', code, reason }); return; } @@ -476,11 +571,13 @@ export class CompanionRuntimeClient { this.rejectAllPending(new Error('Disconnected')); this.rejectEventWaits(new Error('Disconnected')); ws.close(code, reason); + this.emitConnectionEvent({ status: 'disconnected', code, reason }); } dispose(code?: number, reason?: string): void { this.disconnect(code, reason); this.clearEventSubscriptions(); + this.connectionHandlers.clear(); } subscribeEvents(handler: CompanionEventHandler): () => void { @@ -490,6 +587,13 @@ export class CompanionRuntimeClient { }; } + subscribeConnectionEvents(handler: CompanionConnectionHandler): () => void { + this.connectionHandlers.add(handler); + return () => { + this.connectionHandlers.delete(handler); + }; + } + clearEventSubscriptions(): ClearEventSubscriptionsResult { const clearedSubscriptions = this.eventHandlers.size; this.eventHandlers.clear();