From 184dc2c6889e62f8838767a24a4d7eaf58a6b8f1 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Thu, 26 Feb 2026 17:01:16 -0800 Subject: [PATCH] Add companion reconnect state recovery and handoff helper --- src/cli/companion.test.ts | 29 +++ src/cli/companion.ts | 23 +++ src/companion/index.ts | 2 + .../platformClients.integration.test.ts | 101 ++++++++- src/companion/platformClients.test.ts | 49 +++++ src/companion/platformClients.ts | 18 ++ src/companion/runtimeClient.test.ts | 193 ++++++++++++++++++ src/companion/runtimeClient.ts | 191 ++++++++++++++++- 8 files changed, 602 insertions(+), 4 deletions(-) diff --git a/src/cli/companion.test.ts b/src/cli/companion.test.ts index 3842cd4..f00bc13 100644 --- a/src/cli/companion.test.ts +++ b/src/cli/companion.test.ts @@ -12,6 +12,7 @@ const { connect: ReturnType; registerNode: ReturnType; setNodeStatus: ReturnType; + sendAgentMessage: ReturnType; subscribeAgentStream: ReturnType; subscribeAgentTyping: ReturnType; subscribeConnectionEvents: ReturnType; @@ -58,6 +59,7 @@ vi.mock('../companion/index.js', () => ({ capabilities: { declared: capabilities, enabled: capabilities }, })); setNodeStatus = vi.fn(async () => ({ updated: true, node: { id: 'n', role: 'companion' } })); + sendAgentMessage = vi.fn(async () => ({ content: 'handoff response' })); subscribeAgentStream = vi.fn(() => () => undefined); subscribeAgentTyping = vi.fn(() => () => undefined); subscribeConnectionEvents = vi.fn((handler: (event: { status: string }) => void) => { @@ -146,6 +148,33 @@ describe('companion command', () => { errSpy.mockRestore(); }); + it('executes optional message handoff after registration', async () => { + const logSpy = vi.spyOn(console, 'log').mockImplementation(() => undefined); + const errSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined); + const program = new Command(); + const { registerCompanionCommand } = await import('./companion.js'); + registerCompanionCommand(program); + + await program.parseAsync([ + 'node', + 'test', + 'companion', + '--once', + '--handoff', + 'status update?', + '--handoff-timeout', + '5000', + ]); + + expect(mockRuntimeInstances[0]?.sendAgentMessage).toHaveBeenCalledWith({ + message: 'status update?', + timeoutMs: 5000, + }); + expect(errSpy).not.toHaveBeenCalled(); + logSpy.mockRestore(); + errSpy.mockRestore(); + }); + it('sets process exit code when options are invalid', async () => { const errSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined); const program = new Command(); diff --git a/src/cli/companion.ts b/src/cli/companion.ts index d1bb3a1..28f7f64 100644 --- a/src/cli/companion.ts +++ b/src/cli/companion.ts @@ -16,6 +16,8 @@ interface CompanionCommandOptions { platform?: CompanionPlatform; capability?: string[]; heartbeat?: string; + handoff?: string; + handoffTimeout?: string; once?: boolean; } @@ -72,6 +74,15 @@ function parseHeartbeatSeconds(value: string | undefined): number { return parsed; } +function parseHandoffTimeoutMs(value: string | undefined): number { + const raw = value ?? '120000'; + const parsed = Number.parseInt(raw, 10); + if (!Number.isFinite(parsed) || parsed < 1000 || parsed > 600000) { + throw new Error('handoff-timeout must be an integer between 1000 and 600000 milliseconds'); + } + return parsed; +} + async function publishHeartbeat( runtime: CompanionRuntimeClient, platform: CompanionPlatform, @@ -92,6 +103,8 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro const nodeId = resolveNodeId(options, platform); const capabilities = resolveCapabilities(platform, options.capability); const heartbeatSeconds = parseHeartbeatSeconds(options.heartbeat); + const handoffMessage = options.handoff?.trim(); + const handoffTimeoutMs = parseHandoffTimeoutMs(options.handoffTimeout); const runtime = new CompanionRuntimeClient({ url: gatewayUrl, @@ -206,6 +219,14 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro await registerAndHeartbeat('connected'); skipConnectRegistration = false; + if (handoffMessage) { + const handoff = await runtime.sendAgentMessage({ + message: handoffMessage, + timeoutMs: handoffTimeoutMs, + }); + console.log(`[handoff.done] ${handoff.content}`); + } + if (options.once) { cleanup(); return; @@ -232,6 +253,8 @@ export function registerCompanionCommand(program: Command): void { .option('--platform ', 'Node platform (macos|ios|android|linux|windows|unknown)', 'macos') .option('--capability ', 'Capability list override') .option('--heartbeat ', 'Heartbeat interval in seconds', '30') + .option('--handoff ', 'Optional one-shot agent message handoff after registration') + .option('--handoff-timeout ', 'Handoff timeout in milliseconds', '120000') .option('--once', 'Connect, register, publish one heartbeat, then exit', false) .action(async (opts: CompanionCommandOptions) => { try { diff --git a/src/companion/index.ts b/src/companion/index.ts index ecc84a7..1e37c1a 100644 --- a/src/companion/index.ts +++ b/src/companion/index.ts @@ -29,6 +29,8 @@ export type { SetNodeStatusInput, SetNodeLocationInput, SetNodePushTokenInput, + SendAgentMessageInput, + AgentSendResult, PutCanvasArtifactInput, GetCanvasArtifactInput, DeleteCanvasArtifactInput, diff --git a/src/companion/platformClients.integration.test.ts b/src/companion/platformClients.integration.test.ts index 1b50493..61bdaaf 100644 --- a/src/companion/platformClients.integration.test.ts +++ b/src/companion/platformClients.integration.test.ts @@ -101,10 +101,11 @@ afterAll(async () => { await server.stop(); }); -function createRuntime(): CompanionRuntimeClient { +function createRuntime(overrides: Partial[0]> = {}): CompanionRuntimeClient { return new CompanionRuntimeClient({ url: `ws://127.0.0.1:${TEST_PORT}`, token: TEST_TOKEN, + ...overrides, }); } @@ -478,4 +479,102 @@ describe('platform clients integration', () => { client.disconnect(); } }); + + it('supports message handoff through companion wrapper via agent.send done events', async () => { + if (!LISTEN_ALLOWED) { + return; + } + + const runtime = createRuntime(); + const client = new MacOSCompanionClient({ runtime, nodeId: 'macos-handoff-e2e' }); + await client.connect(); + + try { + await client.register(); + const handoff = await client.sendMessageHandoff({ + message: 'hello from companion handoff', + timeoutMs: 10_000, + }); + expect(handoff.content).toContain('Hello from Flynn!'); + } finally { + client.disconnect(); + } + }); + + it('replays iOS registration/status/push after reconnect and supports token refresh', async () => { + if (!LISTEN_ALLOWED) { + return; + } + + const runtime = createRuntime({ + autoReconnect: true, + reconnectDelayMs: 25, + reconnectMaxDelayMs: 100, + }); + const client = new IOSCompanionClient({ runtime, nodeId: 'ios-reconnect-e2e' }); + await client.connect(); + + try { + await client.register(); + await client.setStatus({ + statusText: 'background-awake', + powerSource: 'battery', + batteryPct: 61, + }); + await client.registerPushToken({ + token: '1'.repeat(64), + topic: 'dev.flynn.ios', + environment: 'sandbox', + }); + + const reconnected = new Promise((resolve, reject) => { + let connectedCount = 0; + let timer: NodeJS.Timeout | undefined; + const unsubscribe = runtime.subscribeConnectionEvents((event) => { + if (event.status !== 'connected') { + return; + } + connectedCount += 1; + if (connectedCount >= 2) { + if (timer) { + clearTimeout(timer); + } + unsubscribe(); + resolve(); + } + }); + timer = setTimeout(() => { + unsubscribe(); + reject(new Error('timed out waiting for reconnect')); + }, 10_000); + }); + + const ws = (runtime as unknown as { + ws?: { close: (code?: number, reason?: string) => void } | null; + }).ws; + ws?.close(4001, 'transport reset'); + + await reconnected; + + const beforeRefresh = await client.listNodes(); + const entry = beforeRefresh.nodes.find((node) => node.nodeId === 'ios-reconnect-e2e'); + expect(entry?.status?.platform).toBe('ios'); + expect(entry?.status?.statusText).toBe('background-awake'); + expect(entry?.push?.provider).toBe('apns'); + const firstPreview = entry?.push?.tokenPreview; + + await client.registerPushToken({ + token: '2'.repeat(64), + topic: 'dev.flynn.ios', + environment: 'sandbox', + }); + + const afterRefresh = await client.listNodes(); + const refreshed = afterRefresh.nodes.find((node) => node.nodeId === 'ios-reconnect-e2e'); + expect(refreshed?.push?.provider).toBe('apns'); + expect(refreshed?.push?.tokenPreview).not.toBe(firstPreview); + } finally { + client.disconnect(); + } + }); }); diff --git a/src/companion/platformClients.test.ts b/src/companion/platformClients.test.ts index af72ac0..015a7bb 100644 --- a/src/companion/platformClients.test.ts +++ b/src/companion/platformClients.test.ts @@ -18,6 +18,8 @@ function createRuntimeMock(): { setNodeLocation: ReturnType; getNodeLocation: ReturnType; setNodePushToken: ReturnType; + sendAgentMessage: ReturnType; + enableNodeStateRecovery: ReturnType; getSystemCapabilities: ReturnType; listSystemNodes: ReturnType; putCanvasArtifact: ReturnType; @@ -68,6 +70,8 @@ function createRuntimeMock(): { const setNodeLocation = vi.fn(async () => ({ updated: true })); const getNodeLocation = vi.fn(async () => ({ node: { id: 'n1', role: 'companion' }, location: null })); const setNodePushToken = vi.fn(async () => ({ updated: true })); + const sendAgentMessage = vi.fn(async () => ({ content: 'handoff complete' })); + const enableNodeStateRecovery = vi.fn(() => undefined); const getSystemCapabilities = vi.fn(async () => ({ protocol: { version: 1 }, nodes: { enabled: true, locationEnabled: true, pushEnabled: true, allowedRoles: ['companion'], registered: true }, featureGates: {} })); const listSystemNodes = vi.fn(async () => ({ nodes: [], summary: { total: 0 } })); const putCanvasArtifact = vi.fn(async () => ({ upserted: true, artifact: { id: 'a1' } })); @@ -129,6 +133,8 @@ function createRuntimeMock(): { setNodeLocation, getNodeLocation, setNodePushToken, + sendAgentMessage, + enableNodeStateRecovery, getSystemCapabilities, listSystemNodes, putCanvasArtifact, @@ -191,6 +197,8 @@ function createRuntimeMock(): { setNodeLocation, getNodeLocation, setNodePushToken, + sendAgentMessage, + enableNodeStateRecovery, getSystemCapabilities, listSystemNodes, putCanvasArtifact, @@ -227,6 +235,31 @@ function createRuntimeMock(): { } describe('platform companion clients', () => { + it('enables node-state recovery by default for platform wrappers', () => { + const mac = createRuntimeMock(); + const ios = createRuntimeMock(); + const android = createRuntimeMock(); + + new MacOSCompanionClient({ runtime: mac.runtime, nodeId: 'mac-node' }); + new IOSCompanionClient({ runtime: ios.runtime, nodeId: 'ios-node' }); + new AndroidCompanionClient({ runtime: android.runtime, nodeId: 'android-node' }); + + expect(mac.enableNodeStateRecovery).toHaveBeenCalledWith(true); + expect(ios.enableNodeStateRecovery).toHaveBeenCalledWith(true); + expect(android.enableNodeStateRecovery).toHaveBeenCalledWith(true); + }); + + it('can disable node-state recovery per platform wrapper', () => { + const mock = createRuntimeMock(); + new IOSCompanionClient({ + runtime: mock.runtime, + nodeId: 'ios-node', + recoverNodeStateOnReconnect: false, + }); + + expect(mock.enableNodeStateRecovery).toHaveBeenCalledWith(false); + }); + it('macOS client uses macos platform status and APNs push', async () => { const mock = createRuntimeMock(); const client = new MacOSCompanionClient({ runtime: mock.runtime, nodeId: 'mac-node' }); @@ -341,6 +374,22 @@ describe('platform companion clients', () => { unsubscribeEvent(); }); + it('platform handoff helper forwards to runtime sendAgentMessage', async () => { + const mock = createRuntimeMock(); + const client = new IOSCompanionClient({ runtime: mock.runtime, nodeId: 'ios-node' }); + + const result = await client.sendMessageHandoff({ + message: 'handoff this message', + timeoutMs: 1500, + }); + + expect(mock.sendAgentMessage).toHaveBeenCalledWith({ + message: 'handoff this message', + timeoutMs: 1500, + }); + expect(result).toEqual({ content: 'handoff complete' }); + }); + it('platform clearEventSubscriptions forwards to runtime client', async () => { const mock = createRuntimeMock(); const client = new IOSCompanionClient({ runtime: mock.runtime, nodeId: 'ios-node' }); diff --git a/src/companion/platformClients.ts b/src/companion/platformClients.ts index 30fd9fa..8d04884 100644 --- a/src/companion/platformClients.ts +++ b/src/companion/platformClients.ts @@ -24,6 +24,8 @@ import type { PendingWorkSnapshot, NodePushTokenSetResult, SetNodeLocationInput, + SendAgentMessageInput, + AgentSendResult, SystemCapabilitiesResult, SystemNodesResult, WaitForIdleOptions, @@ -42,6 +44,7 @@ export interface PlatformClientOptions { capabilities?: string[]; protocolVersion?: number; defaultSessionId?: string; + recoverNodeStateOnReconnect?: boolean; } export interface RegisterPushTokenInput { @@ -100,6 +103,7 @@ export class MacOSCompanionClient { this.capabilities = options.capabilities ?? ['ui.canvas', 'node.location.write', 'node.push.register']; this.protocolVersion = options.protocolVersion; this.defaultSessionId = options.defaultSessionId; + this.runtime.enableNodeStateRecovery(options.recoverNodeStateOnReconnect ?? true); } connect(): Promise { @@ -362,6 +366,10 @@ export class MacOSCompanionClient { return this.runtime.waitForIdle(options); } + sendMessageHandoff(input: SendAgentMessageInput): Promise { + return this.runtime.sendAgentMessage(input); + } + private resolveSessionId(sessionId?: string): string { const resolved = sessionId ?? this.defaultSessionId; if (!resolved) { @@ -386,6 +394,7 @@ export class IOSCompanionClient { this.capabilities = options.capabilities ?? ['node.location.write', 'node.push.register']; this.protocolVersion = options.protocolVersion; this.defaultSessionId = options.defaultSessionId; + this.runtime.enableNodeStateRecovery(options.recoverNodeStateOnReconnect ?? true); } connect(): Promise { @@ -648,6 +657,10 @@ export class IOSCompanionClient { return this.runtime.waitForIdle(options); } + sendMessageHandoff(input: SendAgentMessageInput): Promise { + return this.runtime.sendAgentMessage(input); + } + private resolveSessionId(sessionId?: string): string { const resolved = sessionId ?? this.defaultSessionId; if (!resolved) { @@ -672,6 +685,7 @@ export class AndroidCompanionClient { this.capabilities = options.capabilities ?? ['node.location.write', 'node.push.register']; this.protocolVersion = options.protocolVersion; this.defaultSessionId = options.defaultSessionId; + this.runtime.enableNodeStateRecovery(options.recoverNodeStateOnReconnect ?? true); } connect(): Promise { @@ -932,6 +946,10 @@ export class AndroidCompanionClient { return this.runtime.waitForIdle(options); } + sendMessageHandoff(input: SendAgentMessageInput): Promise { + return this.runtime.sendAgentMessage(input); + } + private resolveSessionId(sessionId?: string): string { const resolved = sessionId ?? this.defaultSessionId; if (!resolved) { diff --git a/src/companion/runtimeClient.test.ts b/src/companion/runtimeClient.test.ts index b364085..589ce4e 100644 --- a/src/companion/runtimeClient.test.ts +++ b/src/companion/runtimeClient.test.ts @@ -920,6 +920,199 @@ describe('CompanionRuntimeClient', () => { vi.useRealTimers(); }); + it('sendAgentMessage resolves when done event arrives', async () => { + class FakeWebSocket extends EventEmitter { + readyState: number = WebSocket.CONNECTING; + + constructor() { + super(); + queueMicrotask(() => { + this.readyState = WebSocket.OPEN; + this.emit('open'); + }); + } + + send(payload: string, callback?: (error?: Error) => void): void { + const req = JSON.parse(payload) as { id: number; method: string }; + if (req.method === 'agent.send') { + queueMicrotask(() => { + this.emit('message', JSON.stringify({ + id: req.id, + event: 'run_state', + data: { state: 'start', timestamp: Date.now() }, + })); + this.emit('message', JSON.stringify({ + id: req.id, + event: 'done', + data: { content: 'handoff complete' }, + })); + }); + } + callback?.(); + } + + close(_code?: number, _reason?: string): void { + this.readyState = WebSocket.CLOSED; + this.emit('close', 1000, Buffer.from('')); + } + } + + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + autoConnect: true, + websocketFactory: () => new FakeWebSocket() as unknown as WebSocket, + }); + + await expect(client.sendAgentMessage({ + message: 'send this to assistant', + timeoutMs: 2000, + })).resolves.toEqual({ + content: 'handoff complete', + }); + }); + + it('sendAgentMessage rejects on error event payloads', async () => { + class FakeWebSocket extends EventEmitter { + readyState: number = WebSocket.CONNECTING; + + constructor() { + super(); + queueMicrotask(() => { + this.readyState = WebSocket.OPEN; + this.emit('open'); + }); + } + + send(payload: string, callback?: (error?: Error) => void): void { + const req = JSON.parse(payload) as { id: number; method: string }; + if (req.method === 'agent.send') { + queueMicrotask(() => { + this.emit('message', JSON.stringify({ + id: req.id, + event: 'error', + data: { + code: -2, + message: 'invalid handoff', + }, + })); + }); + } + callback?.(); + } + + close(_code?: number, _reason?: string): void { + this.readyState = WebSocket.CLOSED; + this.emit('close', 1000, Buffer.from('')); + } + } + + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + autoConnect: true, + websocketFactory: () => new FakeWebSocket() as unknown as WebSocket, + }); + + await expect(client.sendAgentMessage({ + message: 'fail this handoff', + timeoutMs: 2000, + })).rejects.toBeInstanceOf(GatewayRpcError); + }); + + it('replays node registration/state after reconnect when recovery is enabled', async () => { + vi.useFakeTimers(); + const sockets: Array<{ methods: string[]; ws: EventEmitter & { readyState: number; close: () => void } }> = []; + + class FakeWebSocket extends EventEmitter { + readyState: number = WebSocket.CONNECTING; + methods: string[] = []; + + constructor() { + super(); + queueMicrotask(() => { + this.readyState = WebSocket.OPEN; + this.emit('open'); + }); + } + + send(payload: string, callback?: (error?: Error) => void): void { + const req = JSON.parse(payload) as { id: number; method: string; params?: Record }; + this.methods.push(req.method); + queueMicrotask(() => { + this.emit('message', JSON.stringify({ + id: req.id, + result: req.method === 'node.register' + ? { + registered: true, + node: { id: String(req.params?.nodeId ?? 'n'), role: String(req.params?.role ?? 'companion') }, + protocol: { serverVersion: 1, clientVersion: 1, negotiatedVersion: 1 }, + capabilities: { declared: [], enabled: [] }, + } + : { updated: true }, + })); + }); + callback?.(); + } + + close(): void { + this.readyState = WebSocket.CLOSED; + this.emit('close', 1006, Buffer.from('drop')); + } + } + + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + autoReconnect: true, + reconnectDelayMs: 10, + reconnectMaxDelayMs: 10, + websocketFactory: () => { + const ws = new FakeWebSocket() as unknown as EventEmitter & { readyState: number; close: () => void; methods: string[] }; + sockets.push({ methods: ws.methods, ws }); + return ws as unknown as WebSocket; + }, + }); + client.enableNodeStateRecovery(true); + + await client.connect(); + await client.registerNode({ + nodeId: 'recover-node', + role: 'companion', + capabilities: ['ui.canvas'], + }); + await client.setNodeStatus({ + platform: 'ios', + statusText: 'background-awake', + powerSource: 'battery', + }); + await client.setNodePushToken({ + provider: 'apns', + token: 'a'.repeat(64), + topic: 'dev.flynn.ios', + environment: 'sandbox', + }); + + expect(sockets).toHaveLength(1); + expect(sockets[0]?.methods).toEqual([ + 'node.register', + 'node.status.set', + 'node.push_token.set', + ]); + + sockets[0]?.ws.close(); + + await vi.advanceTimersByTimeAsync(10); + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); + + expect(sockets).toHaveLength(2); + expect(sockets[1]?.methods).toEqual([ + 'node.register', + 'node.status.set', + 'node.push_token.set', + ]); + vi.useRealTimers(); + }); + it('manual disconnect metadata is not overwritten by local close event', async () => { class FakeWebSocket extends EventEmitter { readyState: number = WebSocket.CONNECTING; diff --git a/src/companion/runtimeClient.ts b/src/companion/runtimeClient.ts index 6d0395e..5ec37a9 100644 --- a/src/companion/runtimeClient.ts +++ b/src/companion/runtimeClient.ts @@ -3,6 +3,7 @@ import type { NodeLocationSetParams, NodePushTokenSetParams, NodeStatusSetParams, + GatewayAttachment, } from '../gateway/protocol.js'; import { GATEWAY_PROTOCOL_VERSION } from '../gateway/protocol.js'; @@ -33,6 +34,12 @@ type PendingRequest = { timeout: NodeJS.Timeout; }; +type PendingAgentSend = { + resolve: (value: AgentSendResult) => void; + reject: (error: Error) => void; + timeout: NodeJS.Timeout; +}; + export interface CompanionRuntimeClientOptions { url: string; token?: string; @@ -252,6 +259,21 @@ export interface SetNodeLocationInput extends Omit {} +export interface SendAgentMessageInput { + message?: string; + attachments?: GatewayAttachment[]; + metadata?: { + isCommand?: boolean; + command?: string; + commandArgs?: string; + }; + timeoutMs?: number; +} + +export interface AgentSendResult { + content: string; +} + export interface CanvasArtifact { id: string; type: string; @@ -326,6 +348,7 @@ export class CompanionRuntimeClient { private connectPromise: Promise | null = null; private nextId = 1; private pending = new Map(); + private readonly pendingAgentSends = new Map(); private readonly eventHandlers = new Set(); private readonly connectionHandlers = new Set(); private readonly pendingEventWaits = new Set<(error: Error) => void>(); @@ -335,6 +358,11 @@ export class CompanionRuntimeClient { private reconnectTimer: NodeJS.Timeout | null = null; private reconnectAttempt = 0; private shouldReconnect = false; + private recoverNodeStateOnReconnect = false; + private lastNodeRegistrationInput?: RegisterNodeInput; + private lastNodeStatusInput?: SetNodeStatusInput; + private lastNodeLocationInput?: SetNodeLocationInput; + private lastNodePushTokenInput?: SetNodePushTokenInput; constructor(options: CompanionRuntimeClientOptions) { const requestTimeoutMs = options.requestTimeoutMs ?? 15_000; @@ -369,7 +397,7 @@ export class CompanionRuntimeClient { } get pendingRequestCount(): number { - return this.pending.size; + return this.pending.size + this.pendingAgentSends.size; } get pendingEventWaitCount(): number { @@ -392,6 +420,14 @@ export class CompanionRuntimeClient { return this._lastDisconnectReason; } + get nodeStateRecoveryEnabled(): boolean { + return this.recoverNodeStateOnReconnect; + } + + enableNodeStateRecovery(enabled = true): void { + this.recoverNodeStateOnReconnect = enabled; + } + getPendingWorkSnapshot(): PendingWorkSnapshot { return { pendingRequestCount: this.pendingRequestCount, @@ -450,6 +486,7 @@ export class CompanionRuntimeClient { cleanup(); settled = true; this.ws = ws; + const shouldRecover = this.recoverNodeStateOnReconnect && this.reconnectAttempt > 0; this.resetReconnectDelay(); this._lastDisconnectCode = undefined; this._lastDisconnectReason = undefined; @@ -473,8 +510,15 @@ export class CompanionRuntimeClient { this.ws.on('error', () => { // close event handles pending rejection }); - this.emitConnectionEvent({ status: 'connected' }); - resolve(); + const finalize = () => { + this.emitConnectionEvent({ status: 'connected' }); + resolve(); + }; + if (shouldRecover) { + void this.restoreNodeStateAfterReconnect().finally(finalize); + } else { + finalize(); + } }; const onError = (err: Error) => { @@ -885,6 +929,62 @@ export class CompanionRuntimeClient { return Object.values(COMPANION_EVENT_NAMES); } + async sendAgentMessage(input: SendAgentMessageInput): Promise { + const hasMessage = typeof input.message === 'string' && input.message.trim().length > 0; + const hasAttachments = Array.isArray(input.attachments) && input.attachments.length > 0; + const isCommand = Boolean(input.metadata?.isCommand); + if (!hasMessage && !hasAttachments && !isCommand) { + throw new Error('sendAgentMessage requires message or attachments (or command metadata)'); + } + + if (!this.connected) { + if (!this.autoConnect) { + throw new Error('WebSocket is not connected'); + } + await this.connect(); + } + + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket is not connected'); + } + + const id = this.nextId++; + const timeoutMs = input.timeoutMs ?? this.requestTimeoutMs; + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + throw new Error('timeoutMs must be a positive number'); + } + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.pendingAgentSends.delete(id); + reject(new Error('Timed out waiting for agent response')); + }, timeoutMs); + + this.pendingAgentSends.set(id, { resolve, reject, timeout }); + + this.ws?.send(JSON.stringify({ + id, + method: 'agent.send', + params: { + ...(hasMessage ? { message: input.message } : {}), + ...(hasAttachments ? { attachments: input.attachments } : {}), + ...(isCommand ? { metadata: input.metadata } : {}), + }, + }), (err) => { + if (!err) { + return; + } + const pending = this.pendingAgentSends.get(id); + if (!pending) { + return; + } + clearTimeout(pending.timeout); + this.pendingAgentSends.delete(id); + reject(err); + }); + }); + } + async call(method: string, params?: Record): Promise { if (!this.connected) { if (!this.autoConnect) { @@ -939,6 +1039,14 @@ export class CompanionRuntimeClient { role: input.role, protocolVersion: input.protocolVersion ?? GATEWAY_PROTOCOL_VERSION, capabilities: input.capabilities, + }).then((result) => { + this.lastNodeRegistrationInput = { + nodeId: input.nodeId, + role: input.role, + protocolVersion: input.protocolVersion ?? GATEWAY_PROTOCOL_VERSION, + capabilities: [...input.capabilities], + }; + return result; }); } @@ -974,6 +1082,9 @@ export class CompanionRuntimeClient { statusText: input.statusText, batteryPct: input.batteryPct, powerSource: input.powerSource, + }).then((result) => { + this.lastNodeStatusInput = { ...input }; + return result; }); } @@ -987,6 +1098,9 @@ export class CompanionRuntimeClient { speedMps: input.speedMps, source: input.source, capturedAt: input.capturedAt, + }).then((result) => { + this.lastNodeLocationInput = { ...input }; + return result; }); } @@ -1000,6 +1114,9 @@ export class CompanionRuntimeClient { token: input.token, topic: input.topic, environment: input.environment, + }).then((result) => { + this.lastNodePushTokenInput = { ...input }; + return result; }); } @@ -1061,6 +1178,30 @@ export class CompanionRuntimeClient { } if ('event' in parsed) { + const pendingAgentSend = this.pendingAgentSends.get(parsed.id); + if (pendingAgentSend) { + if (parsed.event === 'done') { + clearTimeout(pendingAgentSend.timeout); + this.pendingAgentSends.delete(parsed.id); + const content = (parsed.data as { content?: unknown })?.content; + pendingAgentSend.resolve({ + content: typeof content === 'string' ? content : '', + }); + } else if (parsed.event === 'error') { + clearTimeout(pendingAgentSend.timeout); + this.pendingAgentSends.delete(parsed.id); + const err = parsed.data as { code?: number; message?: unknown } | undefined; + if (typeof err?.message === 'string') { + pendingAgentSend.reject(new GatewayRpcError( + typeof err.code === 'number' ? err.code : -1, + err.message, + )); + } else { + pendingAgentSend.reject(new Error('Agent request failed')); + } + } + } + for (const handler of this.eventHandlers) { try { handler(parsed.event, parsed.data); @@ -1071,6 +1212,24 @@ export class CompanionRuntimeClient { return; } + const pendingAgentSend = this.pendingAgentSends.get(parsed.id); + if (pendingAgentSend) { + clearTimeout(pendingAgentSend.timeout); + this.pendingAgentSends.delete(parsed.id); + if ('error' in parsed) { + pendingAgentSend.reject(new GatewayRpcError(parsed.error.code, parsed.error.message)); + } else { + const content = (parsed.result as { response?: unknown; content?: unknown } | undefined); + const responseText = typeof content?.response === 'string' + ? content.response + : typeof content?.content === 'string' + ? content.content + : ''; + pendingAgentSend.resolve({ content: responseText }); + } + return; + } + const pending = this.pending.get(parsed.id); if (!pending) { return; @@ -1092,6 +1251,12 @@ export class CompanionRuntimeClient { pending.reject(error); } this.pending.clear(); + + for (const [, pending] of this.pendingAgentSends) { + clearTimeout(pending.timeout); + pending.reject(error); + } + this.pendingAgentSends.clear(); } private rejectEventWaits(error: Error): number { @@ -1102,6 +1267,26 @@ export class CompanionRuntimeClient { this.pendingEventWaits.clear(); return cancelled; } + + private async restoreNodeStateAfterReconnect(): Promise { + if (!this.lastNodeRegistrationInput) { + return; + } + try { + await this.registerNode(this.lastNodeRegistrationInput); + if (this.lastNodeStatusInput) { + await this.setNodeStatus(this.lastNodeStatusInput); + } + if (this.lastNodeLocationInput) { + await this.setNodeLocation(this.lastNodeLocationInput); + } + if (this.lastNodePushTokenInput) { + await this.setNodePushToken(this.lastNodePushTokenInput); + } + } catch { + // Best-effort replay. Callers still receive connected status and can re-register manually. + } + } } function withToken(url: string, token?: string): string {