feat(companion): add reconnect resilience

This commit is contained in:
William Valentin
2026-02-25 11:12:21 -08:00
parent 7b170cff4d
commit ac60fa5be3
9 changed files with 297 additions and 27 deletions
+16 -6
View File
@@ -7,13 +7,14 @@ const {
mockRuntimeCtorArgs,
mockRuntimeInstances,
} = vi.hoisted(() => {
const runtimeCtorArgs: Array<{ url: string; token?: string }> = [];
const runtimeCtorArgs: Array<{ url: string; token?: string; autoReconnect?: boolean }> = [];
const runtimeInstances: Array<{
connect: ReturnType<typeof vi.fn>;
registerNode: ReturnType<typeof vi.fn>;
setNodeStatus: ReturnType<typeof vi.fn>;
subscribeAgentStream: ReturnType<typeof vi.fn>;
subscribeAgentTyping: ReturnType<typeof vi.fn>;
subscribeConnectionEvents: ReturnType<typeof vi.fn>;
disconnect: ReturnType<typeof vi.fn>;
}> = [];
@@ -43,7 +44,13 @@ vi.mock('./shared.js', () => ({
vi.mock('../companion/index.js', () => ({
CompanionRuntimeClient: class {
connect = vi.fn(async () => undefined);
private connectionHandlers: Array<(event: { status: string }) => void> = [];
connect = vi.fn(async () => {
for (const handler of this.connectionHandlers) {
handler({ status: 'connected' });
}
return undefined;
});
registerNode = vi.fn(async ({ nodeId, role, capabilities }: { nodeId: string; role: string; capabilities: string[] }) => ({
registered: true,
node: { id: nodeId, role },
@@ -53,9 +60,13 @@ vi.mock('../companion/index.js', () => ({
setNodeStatus = vi.fn(async () => ({ updated: true, node: { id: 'n', role: 'companion' } }));
subscribeAgentStream = vi.fn(() => () => undefined);
subscribeAgentTyping = vi.fn(() => () => undefined);
subscribeConnectionEvents = vi.fn((handler: (event: { status: string }) => void) => {
this.connectionHandlers.push(handler);
return () => undefined;
});
disconnect = vi.fn(() => undefined);
constructor(opts: { url: string; token?: string }) {
constructor(opts: { url: string; token?: string; autoReconnect?: boolean }) {
mockRuntimeCtorArgs.push(opts);
mockRuntimeInstances.push(this);
}
@@ -89,7 +100,7 @@ describe('companion command', () => {
await program.parseAsync(['node', 'test', 'companion', '--once']);
expect(mockGetConfigPath).toHaveBeenCalledOnce();
expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://127.0.0.1:18888', token: 'config-token' }]);
expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://127.0.0.1:18888', token: 'config-token', autoReconnect: false }]);
expect(mockRuntimeInstances[0]?.connect).toHaveBeenCalledOnce();
expect(mockRuntimeInstances[0]?.registerNode).toHaveBeenCalledOnce();
expect(mockRuntimeInstances[0]?.setNodeStatus).toHaveBeenCalledOnce();
@@ -124,7 +135,7 @@ describe('companion command', () => {
'node.push.register',
]);
expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://10.0.0.5:19000', token: 'override-token' }]);
expect(mockRuntimeCtorArgs).toEqual([{ url: 'ws://10.0.0.5:19000', token: 'override-token', autoReconnect: false }]);
expect(mockRuntimeInstances[0]?.registerNode).toHaveBeenCalledWith(expect.objectContaining({
nodeId: 'test-node',
capabilities: ['ui.canvas', 'node.push.register'],
@@ -149,4 +160,3 @@ describe('companion command', () => {
errSpy.mockRestore();
});
});
+75 -19
View File
@@ -96,10 +96,13 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro
const runtime = new CompanionRuntimeClient({
url: gatewayUrl,
token: gatewayToken,
autoReconnect: !options.once,
});
const stopSignals: NodeJS.Signals[] = ['SIGINT', 'SIGTERM'];
let heartbeatTimer: NodeJS.Timeout | null = null;
let registrationPromise: Promise<void> | null = null;
let skipConnectRegistration = true;
const cleanup = (): void => {
if (heartbeatTimer) {
@@ -109,6 +112,53 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro
runtime.disconnect(1000, 'Companion shutting down');
};
const startHeartbeat = (): void => {
if (options.once || heartbeatTimer) {
return;
}
heartbeatTimer = setInterval(() => {
void publishHeartbeat(runtime, platform).catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
console.error(`Heartbeat failed: ${message}`);
});
}, heartbeatSeconds * 1000);
};
const stopHeartbeat = (): void => {
if (!heartbeatTimer) {
return;
}
clearInterval(heartbeatTimer);
heartbeatTimer = null;
};
const registerAndHeartbeat = async (label: 'connected' | 'reconnected'): Promise<void> => {
if (registrationPromise) {
return registrationPromise;
}
registrationPromise = (async () => {
const register = await runtime.registerNode({
nodeId,
role,
capabilities,
});
await publishHeartbeat(runtime, platform);
const verb = label === 'connected' ? 'Connected' : 'Reconnected';
console.log(`${verb} companion node ${register.node.id} (${platform}, role=${role})`);
console.log(`Gateway: ${gatewayUrl}`);
console.log(`Capabilities: ${capabilities.join(', ') || '(none)'}`);
startHeartbeat();
})();
try {
await registrationPromise;
} finally {
registrationPromise = null;
}
};
for (const signal of stopSignals) {
process.once(signal, cleanup);
}
@@ -128,32 +178,39 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro
console.log(`[agent.typing${session}] ${phase}`);
});
runtime.subscribeConnectionEvents((event) => {
if (event.status === 'connected') {
if (skipConnectRegistration) {
skipConnectRegistration = false;
return;
}
void registerAndHeartbeat('reconnected').catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
console.error(`Companion re-registration failed: ${message}`);
});
return;
}
if (event.status === 'reconnecting') {
console.log(`Gateway disconnected. Reconnecting in ${Math.ceil(event.delayMs / 1000)}s (attempt ${event.attempt})...`);
return;
}
stopHeartbeat();
const reason = event.reason ? ` (${event.reason})` : '';
console.log(`Gateway disconnected${reason}.`);
});
try {
await runtime.connect();
const register = await runtime.registerNode({
nodeId,
role,
capabilities,
});
await publishHeartbeat(runtime, platform);
console.log(`Connected companion node ${register.node.id} (${platform}, role=${role})`);
console.log(`Gateway: ${gatewayUrl}`);
console.log(`Capabilities: ${capabilities.join(', ') || '(none)'}`);
await registerAndHeartbeat('connected');
skipConnectRegistration = false;
if (options.once) {
cleanup();
return;
}
heartbeatTimer = setInterval(() => {
void publishHeartbeat(runtime, platform).catch((error: unknown) => {
const message = error instanceof Error ? error.message : String(error);
console.error(`Heartbeat failed: ${message}`);
});
}, heartbeatSeconds * 1000);
await new Promise<void>(() => {
// Keep process alive until interrupted.
});
@@ -186,4 +243,3 @@ export function registerCompanionCommand(program: Command): void {
}
});
}
+2
View File
@@ -22,6 +22,8 @@ export type {
CompanionEventName,
CompanionEventPredicate,
CompanionEventEnvelope,
CompanionConnectionEvent,
CompanionConnectionHandler,
RegisterNodeInput,
ListNodesInput,
SetNodeStatusInput,
+75
View File
@@ -108,6 +108,22 @@ describe('CompanionRuntimeClient', () => {
}).toThrow('requestTimeoutMs must be a positive number');
});
it('validates reconnect delay options', () => {
expect(() => {
new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
reconnectDelayMs: 0,
});
}).toThrow('reconnectDelayMs must be a positive number');
expect(() => {
new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
reconnectMaxDelayMs: 0,
});
}).toThrow('reconnectMaxDelayMs must be a positive number');
});
it('dispatches gateway events to subscribed handlers and supports unsubscribe', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
@@ -845,6 +861,65 @@ describe('CompanionRuntimeClient', () => {
});
});
it('emits connection events and reconnects when enabled', async () => {
vi.useFakeTimers();
const events: Array<{ status: string }> = [];
let created = 0;
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close', 1006, Buffer.from('drop'));
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
autoReconnect: true,
reconnectDelayMs: 10,
reconnectMaxDelayMs: 10,
websocketFactory: () => {
created += 1;
return new FakeWebSocket() as unknown as WebSocket;
},
});
client.subscribeConnectionEvents((event) => {
events.push({ status: event.status });
});
await client.connect();
expect(created).toBe(1);
expect(events.map((event) => event.status)).toEqual(['connected']);
const ws = (client as unknown as { ws: WebSocket | null }).ws;
ws?.close();
expect(events.map((event) => event.status)).toEqual(['connected', 'disconnected', 'reconnecting']);
await vi.advanceTimersByTimeAsync(10);
await Promise.resolve();
await Promise.resolve();
expect(created).toBe(2);
expect(events.map((event) => event.status)).toEqual(['connected', 'disconnected', 'reconnecting', 'connected']);
vi.useRealTimers();
});
it('manual disconnect metadata is not overwritten by local close event', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
+104
View File
@@ -38,6 +38,9 @@ export interface CompanionRuntimeClientOptions {
token?: string;
requestTimeoutMs?: number;
autoConnect?: boolean;
autoReconnect?: boolean;
reconnectDelayMs?: number;
reconnectMaxDelayMs?: number;
websocketFactory?: (url: string) => WebSocket;
}
@@ -83,6 +86,13 @@ export type CompanionEventEnvelope<TData = unknown> = {
data: TData;
};
export type CompanionConnectionEvent =
| { status: 'connected' }
| { status: 'disconnected'; code?: number; reason?: string }
| { status: 'reconnecting'; attempt: number; delayMs: number };
export type CompanionConnectionHandler = (event: CompanionConnectionEvent) => void;
export const COMPANION_EVENT_NAMES = {
agentStream: 'agent.stream',
agentTyping: 'agent.typing',
@@ -307,6 +317,9 @@ export class CompanionRuntimeClient {
private readonly token?: string;
private readonly requestTimeoutMs: number;
private readonly autoConnect: boolean;
private readonly autoReconnect: boolean;
private readonly reconnectInitialDelayMs: number;
private readonly reconnectMaxDelayMs: number;
private readonly websocketFactory: (url: string) => WebSocket;
private ws: WebSocket | null = null;
@@ -314,19 +327,36 @@ export class CompanionRuntimeClient {
private nextId = 1;
private pending = new Map<number, PendingRequest>();
private readonly eventHandlers = new Set<CompanionEventHandler>();
private readonly connectionHandlers = new Set<CompanionConnectionHandler>();
private readonly pendingEventWaits = new Set<(error: Error) => void>();
private _lastDisconnectCode: number | undefined;
private _lastDisconnectReason: string | undefined;
private reconnectDelayMs: number;
private reconnectTimer: NodeJS.Timeout | null = null;
private reconnectAttempt = 0;
private shouldReconnect = false;
constructor(options: CompanionRuntimeClientOptions) {
const requestTimeoutMs = options.requestTimeoutMs ?? 15_000;
if (!Number.isFinite(requestTimeoutMs) || requestTimeoutMs <= 0) {
throw new Error('requestTimeoutMs must be a positive number');
}
const reconnectDelayMs = options.reconnectDelayMs ?? 1_000;
if (!Number.isFinite(reconnectDelayMs) || reconnectDelayMs <= 0) {
throw new Error('reconnectDelayMs must be a positive number');
}
const reconnectMaxDelayMs = options.reconnectMaxDelayMs ?? 30_000;
if (!Number.isFinite(reconnectMaxDelayMs) || reconnectMaxDelayMs <= 0) {
throw new Error('reconnectMaxDelayMs must be a positive number');
}
this.url = options.url;
this.token = options.token;
this.requestTimeoutMs = requestTimeoutMs;
this.autoConnect = options.autoConnect ?? false;
this.autoReconnect = options.autoReconnect ?? false;
this.reconnectInitialDelayMs = reconnectDelayMs;
this.reconnectMaxDelayMs = Math.max(reconnectDelayMs, reconnectMaxDelayMs);
this.reconnectDelayMs = this.reconnectInitialDelayMs;
this.websocketFactory = options.websocketFactory ?? ((url) => new WebSocket(url));
}
@@ -400,6 +430,8 @@ export class CompanionRuntimeClient {
return this.connectPromise;
}
this.shouldReconnect = true;
this.clearReconnectTimer();
this.connectPromise = this.openConnection();
try {
await this.connectPromise;
@@ -418,6 +450,7 @@ export class CompanionRuntimeClient {
cleanup();
settled = true;
this.ws = ws;
this.resetReconnectDelay();
this._lastDisconnectCode = undefined;
this._lastDisconnectReason = undefined;
this.ws.on('message', (raw) => this.handleMessage(raw.toString()));
@@ -429,11 +462,18 @@ export class CompanionRuntimeClient {
this.ws = null;
this.rejectAllPending(new Error('WebSocket closed'));
this.rejectEventWaits(new Error('WebSocket closed'));
this.emitConnectionEvent({
status: 'disconnected',
code,
reason: this._lastDisconnectReason,
});
this.scheduleReconnect();
}
});
this.ws.on('error', () => {
// close event handles pending rejection
});
this.emitConnectionEvent({ status: 'connected' });
resolve();
};
@@ -463,11 +503,66 @@ export class CompanionRuntimeClient {
});
}
private emitConnectionEvent(event: CompanionConnectionEvent): void {
for (const handler of this.connectionHandlers) {
try {
handler(event);
} catch {
// Connection handlers are userland callbacks; isolate failures.
}
}
}
private resetReconnectDelay(): void {
this.reconnectDelayMs = this.reconnectInitialDelayMs;
this.reconnectAttempt = 0;
}
private clearReconnectTimer(): void {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
private scheduleReconnect(): void {
if (!this.autoReconnect || !this.shouldReconnect) {
return;
}
if (this.reconnectTimer) {
return;
}
const delayMs = this.reconnectDelayMs;
this.reconnectAttempt += 1;
this.emitConnectionEvent({
status: 'reconnecting',
attempt: this.reconnectAttempt,
delayMs,
});
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
if (!this.shouldReconnect) {
return;
}
this.connect()
.then(() => {
this.resetReconnectDelay();
})
.catch(() => {
this.reconnectDelayMs = Math.min(this.reconnectDelayMs * 2, this.reconnectMaxDelayMs);
this.scheduleReconnect();
});
}, delayMs);
}
disconnect(code?: number, reason?: string): void {
this._lastDisconnectCode = code;
this._lastDisconnectReason = reason;
this.shouldReconnect = false;
this.clearReconnectTimer();
if (!this.ws) {
this.rejectEventWaits(new Error('Disconnected'));
this.emitConnectionEvent({ status: 'disconnected', code, reason });
return;
}
@@ -476,11 +571,13 @@ export class CompanionRuntimeClient {
this.rejectAllPending(new Error('Disconnected'));
this.rejectEventWaits(new Error('Disconnected'));
ws.close(code, reason);
this.emitConnectionEvent({ status: 'disconnected', code, reason });
}
dispose(code?: number, reason?: string): void {
this.disconnect(code, reason);
this.clearEventSubscriptions();
this.connectionHandlers.clear();
}
subscribeEvents(handler: CompanionEventHandler): () => void {
@@ -490,6 +587,13 @@ export class CompanionRuntimeClient {
};
}
subscribeConnectionEvents(handler: CompanionConnectionHandler): () => void {
this.connectionHandlers.add(handler);
return () => {
this.connectionHandlers.delete(handler);
};
}
clearEventSubscriptions(): ClearEventSubscriptionsResult {
const clearedSubscriptions = this.eventHandlers.size;
this.eventHandlers.clear();