import { WebSocket } from 'ws'; import type { NodeLocationSetParams, NodePushTokenSetParams, NodeStatusSetParams, } from '../gateway/protocol.js'; import { GATEWAY_PROTOCOL_VERSION } from '../gateway/protocol.js'; interface RpcSuccess { id: number; result: unknown; } interface RpcFailure { id: number; error: { code: number; message: string; }; } interface RpcEvent { id: number; event: string; data: unknown; } type RpcMessage = RpcSuccess | RpcFailure | RpcEvent; type PendingRequest = { resolve: (value: unknown) => void; reject: (error: Error) => void; timeout: NodeJS.Timeout; }; export interface CompanionRuntimeClientOptions { url: string; token?: string; requestTimeoutMs?: number; autoConnect?: boolean; websocketFactory?: (url: string) => WebSocket; } export type CompanionEventHandler = (event: string, data: unknown) => void; export type CompanionTypedEventHandler = (data: TData) => void; export type CompanionEventPredicate = (data: TData) => boolean; export type CompanionEventEnvelope = { event: string; data: TData; }; export const COMPANION_EVENT_NAMES = { agentStream: 'agent.stream', agentTyping: 'agent.typing', contextWarning: 'context_warning', } as const; export type CompanionEventName = (typeof COMPANION_EVENT_NAMES)[keyof typeof COMPANION_EVENT_NAMES]; export interface RegisterNodeInput { nodeId: string; role: string; capabilities: string[]; protocolVersion?: number; } export interface NodeIdentitySummary { id: string; role: string; } export interface NodeRegisterResult { registered: boolean; node: NodeIdentitySummary; protocol: { serverVersion: number; clientVersion: number; negotiatedVersion: number; }; capabilities: { declared: string[]; enabled: string[]; }; } export interface NodeCapabilitiesResult { protocol: { serverVersion: number; nodeVersion: number; negotiatedVersion: number; }; node: { id: string; role: string; registeredAt: number; }; capabilities: { declared: string[]; enabled: string[]; featureGates: Record; }; } export interface NodeBootstrapResult { register: NodeRegisterResult; capabilities: NodeCapabilitiesResult; systemCapabilities?: SystemCapabilitiesResult; } export interface NodeStatus { platform: 'macos' | 'ios' | 'android' | 'linux' | 'windows' | 'unknown'; appVersion?: string; deviceName?: string; statusText?: string; batteryPct?: number; powerSource: 'ac' | 'battery' | 'unknown'; reportedAt: number; } export interface NodeLocation { latitude: number; longitude: number; accuracyMeters?: number; altitudeMeters?: number; headingDegrees?: number; speedMps?: number; source: 'gps' | 'network' | 'manual' | 'unknown'; capturedAt: number; receivedAt: number; } export interface NodePushSummary { provider: 'apns' | 'fcm'; tokenPreview: string; topic?: string; environment?: 'sandbox' | 'production'; registeredAt: number; } export interface NodeStatusSetResult { updated: boolean; node: NodeIdentitySummary; status: NodeStatus; } export interface NodeLocationSetResult { updated: boolean; node: NodeIdentitySummary; location: NodeLocation; } export interface NodePushTokenSetResult { updated: boolean; node: NodeIdentitySummary; push: NodePushSummary; } export interface NodeLocationGetResult { node: NodeIdentitySummary; location: NodeLocation | null; } export interface SystemCapabilitiesResult { protocol: { version: number; }; nodes: { enabled: boolean; locationEnabled: boolean; pushEnabled: boolean; allowedRoles: string[]; registered: boolean; role?: string; nodeId?: string; }; featureGates: Record; } export interface SystemNodeEntry { connectionId: string; nodeId: string; role: string; identity?: string; protocolVersion: number; capabilities: string[]; registeredAt: number; location?: NodeLocation; status?: NodeStatus; push?: NodePushSummary; } export interface SystemNodesResult { nodes: SystemNodeEntry[]; summary: { total: number; }; } export interface ListNodesInput { role?: string; platform?: string; limit?: number; } export interface SetNodeStatusInput extends Omit {} export interface SetNodeLocationInput extends Omit {} export interface SetNodePushTokenInput extends Omit {} export interface CanvasArtifact { id: string; type: string; title?: string; content: unknown; metadata?: Record; createdAt: number; updatedAt: number; } export interface PutCanvasArtifactInput { sessionId: string; artifactId?: string; type: string; title?: string; content: unknown; metadata?: Record; } export interface GetCanvasArtifactInput { sessionId: string; artifactId: string; } export interface DeleteCanvasArtifactInput { sessionId: string; artifactId: string; } export interface CanvasPutResult { artifact: CanvasArtifact; upserted: boolean; } export interface CanvasGetResult { artifact: CanvasArtifact; } export interface CanvasListResult { artifacts: CanvasArtifact[]; } export interface CanvasDeleteResult { deleted: boolean; } export interface CanvasClearResult { cleared: number; } export class GatewayRpcError extends Error { readonly code: number; constructor(code: number, message: string) { super(message); this.name = 'GatewayRpcError'; this.code = code; } } export class CompanionRuntimeClient { private readonly url: string; private readonly token?: string; private readonly requestTimeoutMs: number; private readonly autoConnect: boolean; private readonly websocketFactory: (url: string) => WebSocket; private ws: WebSocket | null = null; private connectPromise: Promise | null = null; private nextId = 1; private pending = new Map(); private readonly eventHandlers = new Set(); private readonly pendingEventWaits = new Set<(error: Error) => void>(); constructor(options: CompanionRuntimeClientOptions) { const requestTimeoutMs = options.requestTimeoutMs ?? 15_000; if (!Number.isFinite(requestTimeoutMs) || requestTimeoutMs <= 0) { throw new Error('requestTimeoutMs must be a positive number'); } this.url = options.url; this.token = options.token; this.requestTimeoutMs = requestTimeoutMs; this.autoConnect = options.autoConnect ?? false; this.websocketFactory = options.websocketFactory ?? ((url) => new WebSocket(url)); } get connected(): boolean { return this.ws?.readyState === WebSocket.OPEN; } get eventSubscriptionCount(): number { return this.eventHandlers.size; } async connect(): Promise { if (this.connected) { return; } if (this.connectPromise) { return this.connectPromise; } this.connectPromise = this.openConnection(); try { await this.connectPromise; } finally { this.connectPromise = null; } } private async openConnection(): Promise { const ws = this.websocketFactory(withToken(this.url, this.token)); await new Promise((resolve, reject) => { let settled = false; const onOpen = () => { cleanup(); settled = true; this.ws = ws; this.ws.on('message', (raw) => this.handleMessage(raw.toString())); this.ws.on('close', () => this.rejectAllPending(new Error('WebSocket closed'))); this.ws.on('error', () => { // close event handles pending rejection }); resolve(); }; const onError = (err: Error) => { cleanup(); settled = true; reject(err); }; const onClose = () => { cleanup(); if (!settled) { settled = true; reject(new Error('WebSocket closed before connection established')); } }; const cleanup = () => { ws.off('open', onOpen); ws.off('error', onError); ws.off('close', onClose); }; ws.once('open', onOpen); ws.once('error', onError); ws.once('close', onClose); }); } disconnect(code?: number, reason?: string): void { if (!this.ws) { this.rejectEventWaits(new Error('Disconnected')); return; } const ws = this.ws; this.ws = null; this.rejectAllPending(new Error('Disconnected')); this.rejectEventWaits(new Error('Disconnected')); ws.close(code, reason); } dispose(code?: number, reason?: string): void { this.disconnect(code, reason); this.clearEventSubscriptions(); } subscribeEvents(handler: CompanionEventHandler): () => void { this.eventHandlers.add(handler); return () => { this.eventHandlers.delete(handler); }; } clearEventSubscriptions(): void { this.eventHandlers.clear(); this.rejectEventWaits(new Error('Event subscriptions cleared')); } subscribeEvent( eventName: CompanionEventName | string, handler: CompanionTypedEventHandler, ): () => void { return this.subscribeEvents((event, data) => { if (event !== eventName) { return; } handler(data as TData); }); } subscribeAgentStream( handler: CompanionTypedEventHandler, ): () => void { return this.subscribeEvent(COMPANION_EVENT_NAMES.agentStream, handler); } subscribeAgentTyping( handler: CompanionTypedEventHandler, ): () => void { return this.subscribeEvent(COMPANION_EVENT_NAMES.agentTyping, handler); } subscribeContextWarning( handler: CompanionTypedEventHandler, ): () => void { return this.subscribeEvent(COMPANION_EVENT_NAMES.contextWarning, handler); } waitForEvent( eventName: CompanionEventName | string, options?: { timeoutMs?: number; predicate?: CompanionEventPredicate; signal?: AbortSignal; }, ): Promise { const timeoutMs = options?.timeoutMs ?? this.requestTimeoutMs; const predicate = options?.predicate; const signal = options?.signal; return new Promise((resolve, reject) => { let settled = false; let abortCleanup: (() => void) | null = null; const finish = (fn: () => void) => { if (settled) { return; } settled = true; clearTimeout(timeout); unsubscribe(); if (abortCleanup) { abortCleanup(); abortCleanup = null; } this.pendingEventWaits.delete(cancelWait); fn(); }; const cancelWait = (error: Error) => { finish(() => reject(error)); }; this.pendingEventWaits.add(cancelWait); const unsubscribe = this.subscribeEvent(eventName, (data) => { if (predicate && !predicate(data)) { return; } finish(() => resolve(data)); }); const timeout = setTimeout(() => { finish(() => reject(new Error(`Timed out waiting for event ${eventName}`))); }, timeoutMs); if (signal) { const onAbort = () => { cancelWait(new Error(`Aborted while waiting for event ${eventName}`)); }; signal.addEventListener('abort', onAbort, { once: true }); abortCleanup = () => { signal.removeEventListener('abort', onAbort); }; if (signal.aborted) { onAbort(); } } }); } waitForAnyEvent( eventNames: readonly (CompanionEventName | string)[], options?: { timeoutMs?: number; predicate?: (event: string, data: TData) => boolean; signal?: AbortSignal; }, ): Promise> { if (eventNames.length === 0) { throw new Error('eventNames must contain at least one event name'); } const eventNameSet = new Set(eventNames); const timeoutMs = options?.timeoutMs ?? this.requestTimeoutMs; const predicate = options?.predicate; const signal = options?.signal; return new Promise>((resolve, reject) => { let settled = false; let abortCleanup: (() => void) | null = null; const finish = (fn: () => void) => { if (settled) { return; } settled = true; clearTimeout(timeout); unsubscribe(); if (abortCleanup) { abortCleanup(); abortCleanup = null; } this.pendingEventWaits.delete(cancelWait); fn(); }; const cancelWait = (error: Error) => { finish(() => reject(error)); }; this.pendingEventWaits.add(cancelWait); const unsubscribe = this.subscribeEvents((event, data) => { if (!eventNameSet.has(event)) { return; } const castData = data as TData; if (predicate && !predicate(event, castData)) { return; } finish(() => resolve({ event, data: castData })); }); const timeout = setTimeout(() => { cancelWait(new Error(`Timed out waiting for any event in [${eventNames.join(', ')}]`)); }, timeoutMs); if (signal) { const onAbort = () => { cancelWait(new Error(`Aborted while waiting for events [${eventNames.join(', ')}]`)); }; signal.addEventListener('abort', onAbort, { once: true }); abortCleanup = () => { signal.removeEventListener('abort', onAbort); }; if (signal.aborted) { onAbort(); } } }); } waitForAgentStream(options?: { timeoutMs?: number; predicate?: CompanionEventPredicate; signal?: AbortSignal; }): Promise { return this.waitForEvent(COMPANION_EVENT_NAMES.agentStream, options); } waitForAgentTyping(options?: { timeoutMs?: number; predicate?: CompanionEventPredicate; signal?: AbortSignal; }): Promise { return this.waitForEvent(COMPANION_EVENT_NAMES.agentTyping, options); } waitForContextWarning(options?: { timeoutMs?: number; predicate?: CompanionEventPredicate; signal?: AbortSignal; }): Promise { return this.waitForEvent(COMPANION_EVENT_NAMES.contextWarning, options); } listKnownEventNames(): CompanionEventName[] { return Object.values(COMPANION_EVENT_NAMES); } async call(method: string, params?: Record): Promise { if (!this.connected) { if (!this.autoConnect) { throw new Error('WebSocket is not connected'); } await this.connect(); } if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { throw new Error('WebSocket is not connected'); } const id = this.nextId++; return new Promise((resolve, reject) => { const timeout = setTimeout(() => { this.pending.delete(id); reject(new Error(`RPC timeout for method ${method}`)); }, this.requestTimeoutMs); this.pending.set(id, { resolve: (value) => { clearTimeout(timeout); resolve(value as T); }, reject: (error) => { clearTimeout(timeout); reject(error); }, timeout, }); this.ws?.send(JSON.stringify({ id, method, params: params ?? {} }), (err) => { if (!err) { return; } const pending = this.pending.get(id); if (!pending) { return; } clearTimeout(pending.timeout); this.pending.delete(id); reject(err); }); }); } registerNode(input: RegisterNodeInput): Promise { return this.call('node.register', { nodeId: input.nodeId, role: input.role, protocolVersion: input.protocolVersion ?? GATEWAY_PROTOCOL_VERSION, capabilities: input.capabilities, }); } async bootstrapNode( input: RegisterNodeInput, options?: { includeSystemCapabilities?: boolean }, ): Promise { const register = await this.registerNode(input); const capabilities = await this.getNodeCapabilities(); if (options?.includeSystemCapabilities) { const systemCapabilities = await this.getSystemCapabilities(); return { register, capabilities, systemCapabilities, }; } return { register, capabilities, }; } getNodeCapabilities(): Promise { return this.call('node.capabilities.get'); } setNodeStatus(input: SetNodeStatusInput): Promise { return this.call('node.status.set', { platform: input.platform, appVersion: input.appVersion, deviceName: input.deviceName, statusText: input.statusText, batteryPct: input.batteryPct, powerSource: input.powerSource, }); } setNodeLocation(input: SetNodeLocationInput): Promise { return this.call('node.location.set', { latitude: input.latitude, longitude: input.longitude, accuracyMeters: input.accuracyMeters, altitudeMeters: input.altitudeMeters, headingDegrees: input.headingDegrees, speedMps: input.speedMps, source: input.source, capturedAt: input.capturedAt, }); } getNodeLocation(): Promise { return this.call('node.location.get'); } setNodePushToken(input: SetNodePushTokenInput): Promise { return this.call('node.push_token.set', { provider: input.provider, token: input.token, topic: input.topic, environment: input.environment, }); } getSystemCapabilities(): Promise { return this.call('system.capabilities'); } listSystemNodes(input?: ListNodesInput): Promise { return this.call('system.nodes', { role: input?.role, platform: input?.platform, limit: input?.limit, }); } putCanvasArtifact(input: PutCanvasArtifactInput): Promise { return this.call('canvas.put', { sessionId: input.sessionId, artifactId: input.artifactId, type: input.type, title: input.title, content: input.content, metadata: input.metadata, }); } getCanvasArtifact(input: GetCanvasArtifactInput): Promise { return this.call('canvas.get', { sessionId: input.sessionId, artifactId: input.artifactId, }); } listCanvasArtifacts(sessionId: string): Promise { return this.call('canvas.list', { sessionId }); } deleteCanvasArtifact(input: DeleteCanvasArtifactInput): Promise { return this.call('canvas.delete', { sessionId: input.sessionId, artifactId: input.artifactId, }); } clearCanvasArtifacts(sessionId: string): Promise { return this.call('canvas.clear', { sessionId }); } private handleMessage(raw: string): void { let parsed: RpcMessage; try { parsed = JSON.parse(raw) as RpcMessage; } catch { return; } if (!('id' in parsed) || typeof parsed.id !== 'number') { return; } if ('event' in parsed) { for (const handler of this.eventHandlers) { try { handler(parsed.event, parsed.data); } catch { // Event subscribers are userland callbacks; isolate failures. } } return; } const pending = this.pending.get(parsed.id); if (!pending) { return; } this.pending.delete(parsed.id); if ('error' in parsed) { pending.reject(new GatewayRpcError(parsed.error.code, parsed.error.message)); return; } pending.resolve(parsed.result); } private rejectAllPending(error: Error): void { for (const [, pending] of this.pending) { clearTimeout(pending.timeout); pending.reject(error); } this.pending.clear(); } private rejectEventWaits(error: Error): void { for (const cancel of this.pendingEventWaits) { cancel(error); } this.pendingEventWaits.clear(); } } function withToken(url: string, token?: string): string { if (!token) { return url; } const parsed = new URL(url); parsed.searchParams.set('token', token); return parsed.toString(); }