Add companion runtime gateway client foundation

This commit is contained in:
William Valentin
2026-02-16 13:35:28 -08:00
parent 9b76c75e82
commit 542a8cb70f
8 changed files with 744 additions and 2 deletions
+25
View File
@@ -0,0 +1,25 @@
export {
CompanionRuntimeClient,
GatewayRpcError,
} from './runtimeClient.js';
export type {
CompanionRuntimeClientOptions,
RegisterNodeInput,
ListNodesInput,
SetNodeStatusInput,
SetNodeLocationInput,
SetNodePushTokenInput,
NodeRegisterResult,
NodeCapabilitiesResult,
NodeStatusSetResult,
NodeLocationSetResult,
NodeLocationGetResult,
NodePushTokenSetResult,
SystemCapabilitiesResult,
SystemNodesResult,
SystemNodeEntry,
NodeLocation,
NodeStatus,
NodePushSummary,
} from './runtimeClient.js';
+227
View File
@@ -0,0 +1,227 @@
import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest';
import { resolve } from 'path';
import { createServer } from 'net';
import type { GatewayServerConfig } from '../gateway/server.js';
import { GatewayServer } from '../gateway/server.js';
import { CompanionRuntimeClient, GatewayRpcError } from './runtimeClient.js';
async function canListenOnLocalhost(): Promise<boolean> {
return new Promise((resolvePromise) => {
const s = createServer();
s.once('error', () => resolvePromise(false));
s.listen(0, '127.0.0.1', () => {
s.close(() => resolvePromise(true));
});
});
}
const mockSession = {
id: 'test',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
setHistory: vi.fn(),
replaceHistory: vi.fn(),
};
const mockSessionManager = {
getSession: vi.fn(() => mockSession),
listSessions: vi.fn(() => ['ws:test']),
transferSession: vi.fn(),
closeSession: vi.fn(),
};
const mockModelClient = {
chat: vi.fn(async () => ({
content: 'Hello from Flynn!',
stopReason: 'end_turn',
usage: { inputTokens: 10, outputTokens: 5 },
})),
};
const mockToolRegistry = {
register: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
filteredList: vi.fn(() => []),
toAnthropicFormat: vi.fn(() => []),
toOpenAIFormat: vi.fn(() => []),
filteredToAnthropicFormat: vi.fn(() => []),
filteredToOpenAIFormat: vi.fn(() => []),
};
const mockToolExecutor = {
execute: vi.fn(async () => ({ success: true, output: 'ok' })),
};
const TEST_PORT = 18911;
const TEST_TOKEN = 'runtime-client-token';
let LISTEN_ALLOWED = true;
let server: GatewayServer;
beforeAll(async () => {
LISTEN_ALLOWED = await canListenOnLocalhost();
if (!LISTEN_ALLOWED) {
return;
}
server = new GatewayServer({
port: TEST_PORT,
sessionManager: mockSessionManager as unknown as GatewayServerConfig['sessionManager'],
modelClient: mockModelClient,
systemPrompt: 'Test prompt',
toolRegistry: mockToolRegistry as unknown as GatewayServerConfig['toolRegistry'],
toolExecutor: mockToolExecutor as unknown as GatewayServerConfig['toolExecutor'],
version: '0.1.0-test',
uiDir: resolve(import.meta.dirname, '../gateway/ui'),
auth: { token: TEST_TOKEN },
nodes: {
enabled: true,
allowedRoles: ['companion'],
featureGates: { 'ui.canvas': true },
locationEnabled: true,
pushEnabled: true,
},
});
await server.start();
});
afterAll(async () => {
if (!LISTEN_ALLOWED) {
return;
}
await server.stop();
});
describe('CompanionRuntimeClient', () => {
it('connects and performs node registration + capability discovery', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
expect(client.connected).toBe(true);
try {
const register = await client.registerNode({
nodeId: 'macos-companion',
role: 'companion',
capabilities: ['ui.canvas', 'node.location.read'],
});
expect(register.registered).toBe(true);
expect(register.node.id).toBe('macos-companion');
expect(register.protocol.serverVersion).toBe(1);
expect(register.capabilities.enabled).toContain('ui.canvas');
const nodeCaps = await client.getNodeCapabilities();
expect(nodeCaps.node.id).toBe('macos-companion');
expect(nodeCaps.capabilities.enabled).toContain('ui.canvas');
const systemCaps = await client.getSystemCapabilities();
expect(systemCaps.nodes.enabled).toBe(true);
expect(systemCaps.nodes.registered).toBe(true);
expect(systemCaps.nodes.nodeId).toBe('macos-companion');
} finally {
client.disconnect();
}
});
it('updates status/location/push and exposes them through system.nodes', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
try {
await client.registerNode({
nodeId: 'ios-companion',
role: 'companion',
capabilities: ['node.location.write', 'node.push.register'],
});
const status = await client.setNodeStatus({
platform: 'ios',
appVersion: '1.2.3',
batteryPct: 77,
powerSource: 'battery',
});
expect(status.updated).toBe(true);
expect(status.status.platform).toBe('ios');
const location = await client.setNodeLocation({
latitude: 37.78,
longitude: -122.41,
source: 'gps',
});
expect(location.updated).toBe(true);
expect(location.location.source).toBe('gps');
const locationGet = await client.getNodeLocation();
expect(locationGet.location?.latitude).toBe(37.78);
const push = await client.setNodePushToken({
provider: 'apns',
token: 'abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890',
topic: 'dev.flynn.companion',
environment: 'production',
});
expect(push.updated).toBe(true);
expect(push.push.provider).toBe('apns');
expect(push.push.tokenPreview.startsWith('***')).toBe(true);
const nodes = await client.listSystemNodes({ role: 'companion', platform: 'ios' });
expect(nodes.summary.total).toBeGreaterThanOrEqual(1);
const current = nodes.nodes.find((entry) => entry.nodeId === 'ios-companion');
expect(current).toBeTruthy();
expect(current?.status?.platform).toBe('ios');
expect(current?.push?.provider).toBe('apns');
} finally {
client.disconnect();
}
});
it('surfaces gateway errors as GatewayRpcError', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
try {
await expect(client.registerNode({
nodeId: 'observer-node',
role: 'observer',
capabilities: ['node.location.read'],
})).rejects.toBeInstanceOf(GatewayRpcError);
await expect(client.registerNode({
nodeId: 'observer-node',
role: 'observer',
capabilities: ['node.location.read'],
})).rejects.toMatchObject({
code: -5,
});
} finally {
client.disconnect();
}
});
});
+429
View File
@@ -0,0 +1,429 @@
import { WebSocket } from 'ws';
import type {
NodeLocationSetParams,
NodePushTokenSetParams,
NodeStatusSetParams,
} from '../gateway/protocol.js';
import { GATEWAY_PROTOCOL_VERSION } from '../gateway/protocol.js';
interface RpcSuccess {
id: number;
result: unknown;
}
interface RpcFailure {
id: number;
error: {
code: number;
message: string;
};
}
interface RpcEvent {
id: number;
event: string;
data: unknown;
}
type RpcMessage = RpcSuccess | RpcFailure | RpcEvent;
type PendingRequest = {
resolve: (value: unknown) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
};
export interface CompanionRuntimeClientOptions {
url: string;
token?: string;
requestTimeoutMs?: number;
websocketFactory?: (url: string) => WebSocket;
}
export interface RegisterNodeInput {
nodeId: string;
role: string;
capabilities: string[];
protocolVersion?: number;
}
export interface NodeIdentitySummary {
id: string;
role: string;
}
export interface NodeRegisterResult {
registered: boolean;
node: NodeIdentitySummary;
protocol: {
serverVersion: number;
clientVersion: number;
negotiatedVersion: number;
};
capabilities: {
declared: string[];
enabled: string[];
};
}
export interface NodeCapabilitiesResult {
protocol: {
serverVersion: number;
nodeVersion: number;
negotiatedVersion: number;
};
node: {
id: string;
role: string;
registeredAt: number;
};
capabilities: {
declared: string[];
enabled: string[];
featureGates: Record<string, boolean>;
};
}
export interface NodeStatus {
platform: 'macos' | 'ios' | 'android' | 'linux' | 'windows' | 'unknown';
appVersion?: string;
deviceName?: string;
statusText?: string;
batteryPct?: number;
powerSource: 'ac' | 'battery' | 'unknown';
reportedAt: number;
}
export interface NodeLocation {
latitude: number;
longitude: number;
accuracyMeters?: number;
altitudeMeters?: number;
headingDegrees?: number;
speedMps?: number;
source: 'gps' | 'network' | 'manual' | 'unknown';
capturedAt: number;
receivedAt: number;
}
export interface NodePushSummary {
provider: 'apns' | 'fcm';
tokenPreview: string;
topic?: string;
environment?: 'sandbox' | 'production';
registeredAt: number;
}
export interface NodeStatusSetResult {
updated: boolean;
node: NodeIdentitySummary;
status: NodeStatus;
}
export interface NodeLocationSetResult {
updated: boolean;
node: NodeIdentitySummary;
location: NodeLocation;
}
export interface NodePushTokenSetResult {
updated: boolean;
node: NodeIdentitySummary;
push: NodePushSummary;
}
export interface NodeLocationGetResult {
node: NodeIdentitySummary;
location: NodeLocation | null;
}
export interface SystemCapabilitiesResult {
protocol: {
version: number;
};
nodes: {
enabled: boolean;
locationEnabled: boolean;
pushEnabled: boolean;
allowedRoles: string[];
registered: boolean;
role?: string;
nodeId?: string;
};
featureGates: Record<string, boolean>;
}
export interface SystemNodeEntry {
connectionId: string;
nodeId: string;
role: string;
identity?: string;
protocolVersion: number;
capabilities: string[];
registeredAt: number;
location?: NodeLocation;
status?: NodeStatus;
push?: NodePushSummary;
}
export interface SystemNodesResult {
nodes: SystemNodeEntry[];
summary: {
total: number;
};
}
export interface ListNodesInput {
role?: string;
platform?: string;
limit?: number;
}
export interface SetNodeStatusInput extends Omit<NodeStatusSetParams, 'connectionId'> {}
export interface SetNodeLocationInput extends Omit<NodeLocationSetParams, 'connectionId'> {}
export interface SetNodePushTokenInput extends Omit<NodePushTokenSetParams, 'connectionId'> {}
export class GatewayRpcError extends Error {
readonly code: number;
constructor(code: number, message: string) {
super(message);
this.name = 'GatewayRpcError';
this.code = code;
}
}
export class CompanionRuntimeClient {
private readonly url: string;
private readonly token?: string;
private readonly requestTimeoutMs: number;
private readonly websocketFactory: (url: string) => WebSocket;
private ws: WebSocket | null = null;
private nextId = 1;
private pending = new Map<number, PendingRequest>();
constructor(options: CompanionRuntimeClientOptions) {
this.url = options.url;
this.token = options.token;
this.requestTimeoutMs = options.requestTimeoutMs ?? 15_000;
this.websocketFactory = options.websocketFactory ?? ((url) => new WebSocket(url));
}
get connected(): boolean {
return this.ws?.readyState === WebSocket.OPEN;
}
async connect(): Promise<void> {
if (this.connected) {
return;
}
const ws = this.websocketFactory(withToken(this.url, this.token));
await new Promise<void>((resolve, reject) => {
let settled = false;
const onOpen = () => {
cleanup();
settled = true;
this.ws = ws;
this.ws.on('message', (raw) => this.handleMessage(raw.toString()));
this.ws.on('close', () => this.rejectAllPending(new Error('WebSocket closed')));
this.ws.on('error', () => {
// close event handles pending rejection
});
resolve();
};
const onError = (err: Error) => {
cleanup();
settled = true;
reject(err);
};
const onClose = () => {
cleanup();
if (!settled) {
settled = true;
reject(new Error('WebSocket closed before connection established'));
}
};
const cleanup = () => {
ws.off('open', onOpen);
ws.off('error', onError);
ws.off('close', onClose);
};
ws.once('open', onOpen);
ws.once('error', onError);
ws.once('close', onClose);
});
}
disconnect(code?: number, reason?: string): void {
if (!this.ws) {
return;
}
const ws = this.ws;
this.ws = null;
this.rejectAllPending(new Error('Disconnected'));
ws.close(code, reason);
}
async call<T>(method: string, params?: Record<string, unknown>): Promise<T> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error('WebSocket is not connected');
}
const id = this.nextId++;
return new Promise<T>((resolve, reject) => {
const timeout = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`RPC timeout for method ${method}`));
}, this.requestTimeoutMs);
this.pending.set(id, {
resolve: (value) => {
clearTimeout(timeout);
resolve(value as T);
},
reject: (error) => {
clearTimeout(timeout);
reject(error);
},
timeout,
});
this.ws?.send(JSON.stringify({ id, method, params: params ?? {} }), (err) => {
if (!err) {
return;
}
const pending = this.pending.get(id);
if (!pending) {
return;
}
clearTimeout(pending.timeout);
this.pending.delete(id);
reject(err);
});
});
}
registerNode(input: RegisterNodeInput): Promise<NodeRegisterResult> {
return this.call<NodeRegisterResult>('node.register', {
nodeId: input.nodeId,
role: input.role,
protocolVersion: input.protocolVersion ?? GATEWAY_PROTOCOL_VERSION,
capabilities: input.capabilities,
});
}
getNodeCapabilities(): Promise<NodeCapabilitiesResult> {
return this.call<NodeCapabilitiesResult>('node.capabilities.get');
}
setNodeStatus(input: SetNodeStatusInput): Promise<NodeStatusSetResult> {
return this.call<NodeStatusSetResult>('node.status.set', {
platform: input.platform,
appVersion: input.appVersion,
deviceName: input.deviceName,
statusText: input.statusText,
batteryPct: input.batteryPct,
powerSource: input.powerSource,
});
}
setNodeLocation(input: SetNodeLocationInput): Promise<NodeLocationSetResult> {
return this.call<NodeLocationSetResult>('node.location.set', {
latitude: input.latitude,
longitude: input.longitude,
accuracyMeters: input.accuracyMeters,
altitudeMeters: input.altitudeMeters,
headingDegrees: input.headingDegrees,
speedMps: input.speedMps,
source: input.source,
capturedAt: input.capturedAt,
});
}
getNodeLocation(): Promise<NodeLocationGetResult> {
return this.call<NodeLocationGetResult>('node.location.get');
}
setNodePushToken(input: SetNodePushTokenInput): Promise<NodePushTokenSetResult> {
return this.call<NodePushTokenSetResult>('node.push_token.set', {
provider: input.provider,
token: input.token,
topic: input.topic,
environment: input.environment,
});
}
getSystemCapabilities(): Promise<SystemCapabilitiesResult> {
return this.call<SystemCapabilitiesResult>('system.capabilities');
}
listSystemNodes(input?: ListNodesInput): Promise<SystemNodesResult> {
return this.call<SystemNodesResult>('system.nodes', {
role: input?.role,
platform: input?.platform,
limit: input?.limit,
});
}
private handleMessage(raw: string): void {
let parsed: RpcMessage;
try {
parsed = JSON.parse(raw) as RpcMessage;
} catch {
return;
}
if (!('id' in parsed) || typeof parsed.id !== 'number') {
return;
}
if ('event' in parsed) {
return;
}
const pending = this.pending.get(parsed.id);
if (!pending) {
return;
}
this.pending.delete(parsed.id);
if ('error' in parsed) {
pending.reject(new GatewayRpcError(parsed.error.code, parsed.error.message));
return;
}
pending.resolve(parsed.result);
}
private rejectAllPending(error: Error): void {
for (const [, pending] of this.pending) {
clearTimeout(pending.timeout);
pending.reject(error);
}
this.pending.clear();
}
}
function withToken(url: string, token?: string): string {
if (!token) {
return url;
}
const parsed = new URL(url);
parsed.searchParams.set('token', token);
return parsed.toString();
}