Add companion reconnect state recovery and handoff helper

This commit is contained in:
William Valentin
2026-02-26 17:01:16 -08:00
parent e9873ad22b
commit 184dc2c688
8 changed files with 602 additions and 4 deletions
+29
View File
@@ -12,6 +12,7 @@ const {
connect: ReturnType<typeof vi.fn>;
registerNode: ReturnType<typeof vi.fn>;
setNodeStatus: ReturnType<typeof vi.fn>;
sendAgentMessage: ReturnType<typeof vi.fn>;
subscribeAgentStream: ReturnType<typeof vi.fn>;
subscribeAgentTyping: ReturnType<typeof vi.fn>;
subscribeConnectionEvents: ReturnType<typeof vi.fn>;
@@ -58,6 +59,7 @@ vi.mock('../companion/index.js', () => ({
capabilities: { declared: capabilities, enabled: capabilities },
}));
setNodeStatus = vi.fn(async () => ({ updated: true, node: { id: 'n', role: 'companion' } }));
sendAgentMessage = vi.fn(async () => ({ content: 'handoff response' }));
subscribeAgentStream = vi.fn(() => () => undefined);
subscribeAgentTyping = vi.fn(() => () => undefined);
subscribeConnectionEvents = vi.fn((handler: (event: { status: string }) => void) => {
@@ -146,6 +148,33 @@ describe('companion command', () => {
errSpy.mockRestore();
});
it('executes optional message handoff after registration', async () => {
const logSpy = vi.spyOn(console, 'log').mockImplementation(() => undefined);
const errSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined);
const program = new Command();
const { registerCompanionCommand } = await import('./companion.js');
registerCompanionCommand(program);
await program.parseAsync([
'node',
'test',
'companion',
'--once',
'--handoff',
'status update?',
'--handoff-timeout',
'5000',
]);
expect(mockRuntimeInstances[0]?.sendAgentMessage).toHaveBeenCalledWith({
message: 'status update?',
timeoutMs: 5000,
});
expect(errSpy).not.toHaveBeenCalled();
logSpy.mockRestore();
errSpy.mockRestore();
});
it('sets process exit code when options are invalid', async () => {
const errSpy = vi.spyOn(console, 'error').mockImplementation(() => undefined);
const program = new Command();
+23
View File
@@ -16,6 +16,8 @@ interface CompanionCommandOptions {
platform?: CompanionPlatform;
capability?: string[];
heartbeat?: string;
handoff?: string;
handoffTimeout?: string;
once?: boolean;
}
@@ -72,6 +74,15 @@ function parseHeartbeatSeconds(value: string | undefined): number {
return parsed;
}
function parseHandoffTimeoutMs(value: string | undefined): number {
const raw = value ?? '120000';
const parsed = Number.parseInt(raw, 10);
if (!Number.isFinite(parsed) || parsed < 1000 || parsed > 600000) {
throw new Error('handoff-timeout must be an integer between 1000 and 600000 milliseconds');
}
return parsed;
}
async function publishHeartbeat(
runtime: CompanionRuntimeClient,
platform: CompanionPlatform,
@@ -92,6 +103,8 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro
const nodeId = resolveNodeId(options, platform);
const capabilities = resolveCapabilities(platform, options.capability);
const heartbeatSeconds = parseHeartbeatSeconds(options.heartbeat);
const handoffMessage = options.handoff?.trim();
const handoffTimeoutMs = parseHandoffTimeoutMs(options.handoffTimeout);
const runtime = new CompanionRuntimeClient({
url: gatewayUrl,
@@ -206,6 +219,14 @@ export async function runCompanionSession(options: CompanionCommandOptions): Pro
await registerAndHeartbeat('connected');
skipConnectRegistration = false;
if (handoffMessage) {
const handoff = await runtime.sendAgentMessage({
message: handoffMessage,
timeoutMs: handoffTimeoutMs,
});
console.log(`[handoff.done] ${handoff.content}`);
}
if (options.once) {
cleanup();
return;
@@ -232,6 +253,8 @@ export function registerCompanionCommand(program: Command): void {
.option('--platform <platform>', 'Node platform (macos|ios|android|linux|windows|unknown)', 'macos')
.option('--capability <name...>', 'Capability list override')
.option('--heartbeat <seconds>', 'Heartbeat interval in seconds', '30')
.option('--handoff <message>', 'Optional one-shot agent message handoff after registration')
.option('--handoff-timeout <ms>', 'Handoff timeout in milliseconds', '120000')
.option('--once', 'Connect, register, publish one heartbeat, then exit', false)
.action(async (opts: CompanionCommandOptions) => {
try {
+2
View File
@@ -29,6 +29,8 @@ export type {
SetNodeStatusInput,
SetNodeLocationInput,
SetNodePushTokenInput,
SendAgentMessageInput,
AgentSendResult,
PutCanvasArtifactInput,
GetCanvasArtifactInput,
DeleteCanvasArtifactInput,
@@ -101,10 +101,11 @@ afterAll(async () => {
await server.stop();
});
function createRuntime(): CompanionRuntimeClient {
function createRuntime(overrides: Partial<ConstructorParameters<typeof CompanionRuntimeClient>[0]> = {}): CompanionRuntimeClient {
return new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
...overrides,
});
}
@@ -478,4 +479,102 @@ describe('platform clients integration', () => {
client.disconnect();
}
});
it('supports message handoff through companion wrapper via agent.send done events', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const runtime = createRuntime();
const client = new MacOSCompanionClient({ runtime, nodeId: 'macos-handoff-e2e' });
await client.connect();
try {
await client.register();
const handoff = await client.sendMessageHandoff({
message: 'hello from companion handoff',
timeoutMs: 10_000,
});
expect(handoff.content).toContain('Hello from Flynn!');
} finally {
client.disconnect();
}
});
it('replays iOS registration/status/push after reconnect and supports token refresh', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const runtime = createRuntime({
autoReconnect: true,
reconnectDelayMs: 25,
reconnectMaxDelayMs: 100,
});
const client = new IOSCompanionClient({ runtime, nodeId: 'ios-reconnect-e2e' });
await client.connect();
try {
await client.register();
await client.setStatus({
statusText: 'background-awake',
powerSource: 'battery',
batteryPct: 61,
});
await client.registerPushToken({
token: '1'.repeat(64),
topic: 'dev.flynn.ios',
environment: 'sandbox',
});
const reconnected = new Promise<void>((resolve, reject) => {
let connectedCount = 0;
let timer: NodeJS.Timeout | undefined;
const unsubscribe = runtime.subscribeConnectionEvents((event) => {
if (event.status !== 'connected') {
return;
}
connectedCount += 1;
if (connectedCount >= 2) {
if (timer) {
clearTimeout(timer);
}
unsubscribe();
resolve();
}
});
timer = setTimeout(() => {
unsubscribe();
reject(new Error('timed out waiting for reconnect'));
}, 10_000);
});
const ws = (runtime as unknown as {
ws?: { close: (code?: number, reason?: string) => void } | null;
}).ws;
ws?.close(4001, 'transport reset');
await reconnected;
const beforeRefresh = await client.listNodes();
const entry = beforeRefresh.nodes.find((node) => node.nodeId === 'ios-reconnect-e2e');
expect(entry?.status?.platform).toBe('ios');
expect(entry?.status?.statusText).toBe('background-awake');
expect(entry?.push?.provider).toBe('apns');
const firstPreview = entry?.push?.tokenPreview;
await client.registerPushToken({
token: '2'.repeat(64),
topic: 'dev.flynn.ios',
environment: 'sandbox',
});
const afterRefresh = await client.listNodes();
const refreshed = afterRefresh.nodes.find((node) => node.nodeId === 'ios-reconnect-e2e');
expect(refreshed?.push?.provider).toBe('apns');
expect(refreshed?.push?.tokenPreview).not.toBe(firstPreview);
} finally {
client.disconnect();
}
});
});
+49
View File
@@ -18,6 +18,8 @@ function createRuntimeMock(): {
setNodeLocation: ReturnType<typeof vi.fn>;
getNodeLocation: ReturnType<typeof vi.fn>;
setNodePushToken: ReturnType<typeof vi.fn>;
sendAgentMessage: ReturnType<typeof vi.fn>;
enableNodeStateRecovery: ReturnType<typeof vi.fn>;
getSystemCapabilities: ReturnType<typeof vi.fn>;
listSystemNodes: ReturnType<typeof vi.fn>;
putCanvasArtifact: ReturnType<typeof vi.fn>;
@@ -68,6 +70,8 @@ function createRuntimeMock(): {
const setNodeLocation = vi.fn(async () => ({ updated: true }));
const getNodeLocation = vi.fn(async () => ({ node: { id: 'n1', role: 'companion' }, location: null }));
const setNodePushToken = vi.fn(async () => ({ updated: true }));
const sendAgentMessage = vi.fn(async () => ({ content: 'handoff complete' }));
const enableNodeStateRecovery = vi.fn(() => undefined);
const getSystemCapabilities = vi.fn(async () => ({ protocol: { version: 1 }, nodes: { enabled: true, locationEnabled: true, pushEnabled: true, allowedRoles: ['companion'], registered: true }, featureGates: {} }));
const listSystemNodes = vi.fn(async () => ({ nodes: [], summary: { total: 0 } }));
const putCanvasArtifact = vi.fn(async () => ({ upserted: true, artifact: { id: 'a1' } }));
@@ -129,6 +133,8 @@ function createRuntimeMock(): {
setNodeLocation,
getNodeLocation,
setNodePushToken,
sendAgentMessage,
enableNodeStateRecovery,
getSystemCapabilities,
listSystemNodes,
putCanvasArtifact,
@@ -191,6 +197,8 @@ function createRuntimeMock(): {
setNodeLocation,
getNodeLocation,
setNodePushToken,
sendAgentMessage,
enableNodeStateRecovery,
getSystemCapabilities,
listSystemNodes,
putCanvasArtifact,
@@ -227,6 +235,31 @@ function createRuntimeMock(): {
}
describe('platform companion clients', () => {
it('enables node-state recovery by default for platform wrappers', () => {
const mac = createRuntimeMock();
const ios = createRuntimeMock();
const android = createRuntimeMock();
new MacOSCompanionClient({ runtime: mac.runtime, nodeId: 'mac-node' });
new IOSCompanionClient({ runtime: ios.runtime, nodeId: 'ios-node' });
new AndroidCompanionClient({ runtime: android.runtime, nodeId: 'android-node' });
expect(mac.enableNodeStateRecovery).toHaveBeenCalledWith(true);
expect(ios.enableNodeStateRecovery).toHaveBeenCalledWith(true);
expect(android.enableNodeStateRecovery).toHaveBeenCalledWith(true);
});
it('can disable node-state recovery per platform wrapper', () => {
const mock = createRuntimeMock();
new IOSCompanionClient({
runtime: mock.runtime,
nodeId: 'ios-node',
recoverNodeStateOnReconnect: false,
});
expect(mock.enableNodeStateRecovery).toHaveBeenCalledWith(false);
});
it('macOS client uses macos platform status and APNs push', async () => {
const mock = createRuntimeMock();
const client = new MacOSCompanionClient({ runtime: mock.runtime, nodeId: 'mac-node' });
@@ -341,6 +374,22 @@ describe('platform companion clients', () => {
unsubscribeEvent();
});
it('platform handoff helper forwards to runtime sendAgentMessage', async () => {
const mock = createRuntimeMock();
const client = new IOSCompanionClient({ runtime: mock.runtime, nodeId: 'ios-node' });
const result = await client.sendMessageHandoff({
message: 'handoff this message',
timeoutMs: 1500,
});
expect(mock.sendAgentMessage).toHaveBeenCalledWith({
message: 'handoff this message',
timeoutMs: 1500,
});
expect(result).toEqual({ content: 'handoff complete' });
});
it('platform clearEventSubscriptions forwards to runtime client', async () => {
const mock = createRuntimeMock();
const client = new IOSCompanionClient({ runtime: mock.runtime, nodeId: 'ios-node' });
+18
View File
@@ -24,6 +24,8 @@ import type {
PendingWorkSnapshot,
NodePushTokenSetResult,
SetNodeLocationInput,
SendAgentMessageInput,
AgentSendResult,
SystemCapabilitiesResult,
SystemNodesResult,
WaitForIdleOptions,
@@ -42,6 +44,7 @@ export interface PlatformClientOptions {
capabilities?: string[];
protocolVersion?: number;
defaultSessionId?: string;
recoverNodeStateOnReconnect?: boolean;
}
export interface RegisterPushTokenInput {
@@ -100,6 +103,7 @@ export class MacOSCompanionClient {
this.capabilities = options.capabilities ?? ['ui.canvas', 'node.location.write', 'node.push.register'];
this.protocolVersion = options.protocolVersion;
this.defaultSessionId = options.defaultSessionId;
this.runtime.enableNodeStateRecovery(options.recoverNodeStateOnReconnect ?? true);
}
connect(): Promise<void> {
@@ -362,6 +366,10 @@ export class MacOSCompanionClient {
return this.runtime.waitForIdle(options);
}
sendMessageHandoff(input: SendAgentMessageInput): Promise<AgentSendResult> {
return this.runtime.sendAgentMessage(input);
}
private resolveSessionId(sessionId?: string): string {
const resolved = sessionId ?? this.defaultSessionId;
if (!resolved) {
@@ -386,6 +394,7 @@ export class IOSCompanionClient {
this.capabilities = options.capabilities ?? ['node.location.write', 'node.push.register'];
this.protocolVersion = options.protocolVersion;
this.defaultSessionId = options.defaultSessionId;
this.runtime.enableNodeStateRecovery(options.recoverNodeStateOnReconnect ?? true);
}
connect(): Promise<void> {
@@ -648,6 +657,10 @@ export class IOSCompanionClient {
return this.runtime.waitForIdle(options);
}
sendMessageHandoff(input: SendAgentMessageInput): Promise<AgentSendResult> {
return this.runtime.sendAgentMessage(input);
}
private resolveSessionId(sessionId?: string): string {
const resolved = sessionId ?? this.defaultSessionId;
if (!resolved) {
@@ -672,6 +685,7 @@ export class AndroidCompanionClient {
this.capabilities = options.capabilities ?? ['node.location.write', 'node.push.register'];
this.protocolVersion = options.protocolVersion;
this.defaultSessionId = options.defaultSessionId;
this.runtime.enableNodeStateRecovery(options.recoverNodeStateOnReconnect ?? true);
}
connect(): Promise<void> {
@@ -932,6 +946,10 @@ export class AndroidCompanionClient {
return this.runtime.waitForIdle(options);
}
sendMessageHandoff(input: SendAgentMessageInput): Promise<AgentSendResult> {
return this.runtime.sendAgentMessage(input);
}
private resolveSessionId(sessionId?: string): string {
const resolved = sessionId ?? this.defaultSessionId;
if (!resolved) {
+193
View File
@@ -920,6 +920,199 @@ describe('CompanionRuntimeClient', () => {
vi.useRealTimers();
});
it('sendAgentMessage resolves when done event arrives', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(payload: string, callback?: (error?: Error) => void): void {
const req = JSON.parse(payload) as { id: number; method: string };
if (req.method === 'agent.send') {
queueMicrotask(() => {
this.emit('message', JSON.stringify({
id: req.id,
event: 'run_state',
data: { state: 'start', timestamp: Date.now() },
}));
this.emit('message', JSON.stringify({
id: req.id,
event: 'done',
data: { content: 'handoff complete' },
}));
});
}
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close', 1000, Buffer.from(''));
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
autoConnect: true,
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await expect(client.sendAgentMessage({
message: 'send this to assistant',
timeoutMs: 2000,
})).resolves.toEqual({
content: 'handoff complete',
});
});
it('sendAgentMessage rejects on error event payloads', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(payload: string, callback?: (error?: Error) => void): void {
const req = JSON.parse(payload) as { id: number; method: string };
if (req.method === 'agent.send') {
queueMicrotask(() => {
this.emit('message', JSON.stringify({
id: req.id,
event: 'error',
data: {
code: -2,
message: 'invalid handoff',
},
}));
});
}
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close', 1000, Buffer.from(''));
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
autoConnect: true,
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await expect(client.sendAgentMessage({
message: 'fail this handoff',
timeoutMs: 2000,
})).rejects.toBeInstanceOf(GatewayRpcError);
});
it('replays node registration/state after reconnect when recovery is enabled', async () => {
vi.useFakeTimers();
const sockets: Array<{ methods: string[]; ws: EventEmitter & { readyState: number; close: () => void } }> = [];
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
methods: string[] = [];
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(payload: string, callback?: (error?: Error) => void): void {
const req = JSON.parse(payload) as { id: number; method: string; params?: Record<string, unknown> };
this.methods.push(req.method);
queueMicrotask(() => {
this.emit('message', JSON.stringify({
id: req.id,
result: req.method === 'node.register'
? {
registered: true,
node: { id: String(req.params?.nodeId ?? 'n'), role: String(req.params?.role ?? 'companion') },
protocol: { serverVersion: 1, clientVersion: 1, negotiatedVersion: 1 },
capabilities: { declared: [], enabled: [] },
}
: { updated: true },
}));
});
callback?.();
}
close(): void {
this.readyState = WebSocket.CLOSED;
this.emit('close', 1006, Buffer.from('drop'));
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
autoReconnect: true,
reconnectDelayMs: 10,
reconnectMaxDelayMs: 10,
websocketFactory: () => {
const ws = new FakeWebSocket() as unknown as EventEmitter & { readyState: number; close: () => void; methods: string[] };
sockets.push({ methods: ws.methods, ws });
return ws as unknown as WebSocket;
},
});
client.enableNodeStateRecovery(true);
await client.connect();
await client.registerNode({
nodeId: 'recover-node',
role: 'companion',
capabilities: ['ui.canvas'],
});
await client.setNodeStatus({
platform: 'ios',
statusText: 'background-awake',
powerSource: 'battery',
});
await client.setNodePushToken({
provider: 'apns',
token: 'a'.repeat(64),
topic: 'dev.flynn.ios',
environment: 'sandbox',
});
expect(sockets).toHaveLength(1);
expect(sockets[0]?.methods).toEqual([
'node.register',
'node.status.set',
'node.push_token.set',
]);
sockets[0]?.ws.close();
await vi.advanceTimersByTimeAsync(10);
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
expect(sockets).toHaveLength(2);
expect(sockets[1]?.methods).toEqual([
'node.register',
'node.status.set',
'node.push_token.set',
]);
vi.useRealTimers();
});
it('manual disconnect metadata is not overwritten by local close event', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
+188 -3
View File
@@ -3,6 +3,7 @@ import type {
NodeLocationSetParams,
NodePushTokenSetParams,
NodeStatusSetParams,
GatewayAttachment,
} from '../gateway/protocol.js';
import { GATEWAY_PROTOCOL_VERSION } from '../gateway/protocol.js';
@@ -33,6 +34,12 @@ type PendingRequest = {
timeout: NodeJS.Timeout;
};
type PendingAgentSend = {
resolve: (value: AgentSendResult) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
};
export interface CompanionRuntimeClientOptions {
url: string;
token?: string;
@@ -252,6 +259,21 @@ export interface SetNodeLocationInput extends Omit<NodeLocationSetParams, 'conne
export interface SetNodePushTokenInput extends Omit<NodePushTokenSetParams, 'connectionId'> {}
export interface SendAgentMessageInput {
message?: string;
attachments?: GatewayAttachment[];
metadata?: {
isCommand?: boolean;
command?: string;
commandArgs?: string;
};
timeoutMs?: number;
}
export interface AgentSendResult {
content: string;
}
export interface CanvasArtifact {
id: string;
type: string;
@@ -326,6 +348,7 @@ export class CompanionRuntimeClient {
private connectPromise: Promise<void> | null = null;
private nextId = 1;
private pending = new Map<number, PendingRequest>();
private readonly pendingAgentSends = new Map<number, PendingAgentSend>();
private readonly eventHandlers = new Set<CompanionEventHandler>();
private readonly connectionHandlers = new Set<CompanionConnectionHandler>();
private readonly pendingEventWaits = new Set<(error: Error) => void>();
@@ -335,6 +358,11 @@ export class CompanionRuntimeClient {
private reconnectTimer: NodeJS.Timeout | null = null;
private reconnectAttempt = 0;
private shouldReconnect = false;
private recoverNodeStateOnReconnect = false;
private lastNodeRegistrationInput?: RegisterNodeInput;
private lastNodeStatusInput?: SetNodeStatusInput;
private lastNodeLocationInput?: SetNodeLocationInput;
private lastNodePushTokenInput?: SetNodePushTokenInput;
constructor(options: CompanionRuntimeClientOptions) {
const requestTimeoutMs = options.requestTimeoutMs ?? 15_000;
@@ -369,7 +397,7 @@ export class CompanionRuntimeClient {
}
get pendingRequestCount(): number {
return this.pending.size;
return this.pending.size + this.pendingAgentSends.size;
}
get pendingEventWaitCount(): number {
@@ -392,6 +420,14 @@ export class CompanionRuntimeClient {
return this._lastDisconnectReason;
}
get nodeStateRecoveryEnabled(): boolean {
return this.recoverNodeStateOnReconnect;
}
enableNodeStateRecovery(enabled = true): void {
this.recoverNodeStateOnReconnect = enabled;
}
getPendingWorkSnapshot(): PendingWorkSnapshot {
return {
pendingRequestCount: this.pendingRequestCount,
@@ -450,6 +486,7 @@ export class CompanionRuntimeClient {
cleanup();
settled = true;
this.ws = ws;
const shouldRecover = this.recoverNodeStateOnReconnect && this.reconnectAttempt > 0;
this.resetReconnectDelay();
this._lastDisconnectCode = undefined;
this._lastDisconnectReason = undefined;
@@ -473,8 +510,15 @@ export class CompanionRuntimeClient {
this.ws.on('error', () => {
// close event handles pending rejection
});
this.emitConnectionEvent({ status: 'connected' });
resolve();
const finalize = () => {
this.emitConnectionEvent({ status: 'connected' });
resolve();
};
if (shouldRecover) {
void this.restoreNodeStateAfterReconnect().finally(finalize);
} else {
finalize();
}
};
const onError = (err: Error) => {
@@ -885,6 +929,62 @@ export class CompanionRuntimeClient {
return Object.values(COMPANION_EVENT_NAMES);
}
async sendAgentMessage(input: SendAgentMessageInput): Promise<AgentSendResult> {
const hasMessage = typeof input.message === 'string' && input.message.trim().length > 0;
const hasAttachments = Array.isArray(input.attachments) && input.attachments.length > 0;
const isCommand = Boolean(input.metadata?.isCommand);
if (!hasMessage && !hasAttachments && !isCommand) {
throw new Error('sendAgentMessage requires message or attachments (or command metadata)');
}
if (!this.connected) {
if (!this.autoConnect) {
throw new Error('WebSocket is not connected');
}
await this.connect();
}
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket is not connected');
}
const id = this.nextId++;
const timeoutMs = input.timeoutMs ?? this.requestTimeoutMs;
if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) {
throw new Error('timeoutMs must be a positive number');
}
return new Promise<AgentSendResult>((resolve, reject) => {
const timeout = setTimeout(() => {
this.pendingAgentSends.delete(id);
reject(new Error('Timed out waiting for agent response'));
}, timeoutMs);
this.pendingAgentSends.set(id, { resolve, reject, timeout });
this.ws?.send(JSON.stringify({
id,
method: 'agent.send',
params: {
...(hasMessage ? { message: input.message } : {}),
...(hasAttachments ? { attachments: input.attachments } : {}),
...(isCommand ? { metadata: input.metadata } : {}),
},
}), (err) => {
if (!err) {
return;
}
const pending = this.pendingAgentSends.get(id);
if (!pending) {
return;
}
clearTimeout(pending.timeout);
this.pendingAgentSends.delete(id);
reject(err);
});
});
}
async call<T>(method: string, params?: Record<string, unknown>): Promise<T> {
if (!this.connected) {
if (!this.autoConnect) {
@@ -939,6 +1039,14 @@ export class CompanionRuntimeClient {
role: input.role,
protocolVersion: input.protocolVersion ?? GATEWAY_PROTOCOL_VERSION,
capabilities: input.capabilities,
}).then((result) => {
this.lastNodeRegistrationInput = {
nodeId: input.nodeId,
role: input.role,
protocolVersion: input.protocolVersion ?? GATEWAY_PROTOCOL_VERSION,
capabilities: [...input.capabilities],
};
return result;
});
}
@@ -974,6 +1082,9 @@ export class CompanionRuntimeClient {
statusText: input.statusText,
batteryPct: input.batteryPct,
powerSource: input.powerSource,
}).then((result) => {
this.lastNodeStatusInput = { ...input };
return result;
});
}
@@ -987,6 +1098,9 @@ export class CompanionRuntimeClient {
speedMps: input.speedMps,
source: input.source,
capturedAt: input.capturedAt,
}).then((result) => {
this.lastNodeLocationInput = { ...input };
return result;
});
}
@@ -1000,6 +1114,9 @@ export class CompanionRuntimeClient {
token: input.token,
topic: input.topic,
environment: input.environment,
}).then((result) => {
this.lastNodePushTokenInput = { ...input };
return result;
});
}
@@ -1061,6 +1178,30 @@ export class CompanionRuntimeClient {
}
if ('event' in parsed) {
const pendingAgentSend = this.pendingAgentSends.get(parsed.id);
if (pendingAgentSend) {
if (parsed.event === 'done') {
clearTimeout(pendingAgentSend.timeout);
this.pendingAgentSends.delete(parsed.id);
const content = (parsed.data as { content?: unknown })?.content;
pendingAgentSend.resolve({
content: typeof content === 'string' ? content : '',
});
} else if (parsed.event === 'error') {
clearTimeout(pendingAgentSend.timeout);
this.pendingAgentSends.delete(parsed.id);
const err = parsed.data as { code?: number; message?: unknown } | undefined;
if (typeof err?.message === 'string') {
pendingAgentSend.reject(new GatewayRpcError(
typeof err.code === 'number' ? err.code : -1,
err.message,
));
} else {
pendingAgentSend.reject(new Error('Agent request failed'));
}
}
}
for (const handler of this.eventHandlers) {
try {
handler(parsed.event, parsed.data);
@@ -1071,6 +1212,24 @@ export class CompanionRuntimeClient {
return;
}
const pendingAgentSend = this.pendingAgentSends.get(parsed.id);
if (pendingAgentSend) {
clearTimeout(pendingAgentSend.timeout);
this.pendingAgentSends.delete(parsed.id);
if ('error' in parsed) {
pendingAgentSend.reject(new GatewayRpcError(parsed.error.code, parsed.error.message));
} else {
const content = (parsed.result as { response?: unknown; content?: unknown } | undefined);
const responseText = typeof content?.response === 'string'
? content.response
: typeof content?.content === 'string'
? content.content
: '';
pendingAgentSend.resolve({ content: responseText });
}
return;
}
const pending = this.pending.get(parsed.id);
if (!pending) {
return;
@@ -1092,6 +1251,12 @@ export class CompanionRuntimeClient {
pending.reject(error);
}
this.pending.clear();
for (const [, pending] of this.pendingAgentSends) {
clearTimeout(pending.timeout);
pending.reject(error);
}
this.pendingAgentSends.clear();
}
private rejectEventWaits(error: Error): number {
@@ -1102,6 +1267,26 @@ export class CompanionRuntimeClient {
this.pendingEventWaits.clear();
return cancelled;
}
private async restoreNodeStateAfterReconnect(): Promise<void> {
if (!this.lastNodeRegistrationInput) {
return;
}
try {
await this.registerNode(this.lastNodeRegistrationInput);
if (this.lastNodeStatusInput) {
await this.setNodeStatus(this.lastNodeStatusInput);
}
if (this.lastNodeLocationInput) {
await this.setNodeLocation(this.lastNodeLocationInput);
}
if (this.lastNodePushTokenInput) {
await this.setNodePushToken(this.lastNodePushTokenInput);
}
} catch {
// Best-effort replay. Callers still receive connected status and can re-register manually.
}
}
}
function withToken(url: string, token?: string): string {