Files
flynn/src/companion/runtimeClient.test.ts
T

1306 lines
38 KiB
TypeScript

import { describe, it, expect, beforeAll, afterAll, vi } from 'vitest';
import { resolve } from 'path';
import { createServer } from 'net';
import { EventEmitter } from 'events';
import { WebSocket } from 'ws';
import type { GatewayServerConfig } from '../gateway/server.js';
import { GatewayServer } from '../gateway/server.js';
import { CompanionRuntimeClient, GatewayRpcError } from './runtimeClient.js';
async function canListenOnLocalhost(): Promise<boolean> {
return new Promise((resolvePromise) => {
const s = createServer();
s.once('error', () => resolvePromise(false));
s.listen(0, '127.0.0.1', () => {
s.close(() => resolvePromise(true));
});
});
}
const mockSession = {
id: 'test',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
setHistory: vi.fn(),
replaceHistory: vi.fn(),
};
const mockSessionManager = {
getSession: vi.fn(() => mockSession),
getSessionConfig: vi.fn(() => undefined),
listSessions: vi.fn(() => ['ws:test']),
transferSession: vi.fn(),
closeSession: vi.fn(),
};
const mockModelClient = {
chat: vi.fn(async () => ({
content: 'Hello from Flynn!',
stopReason: 'end_turn',
usage: { inputTokens: 10, outputTokens: 5 },
})),
};
const mockToolRegistry = {
register: vi.fn(),
get: vi.fn(),
list: vi.fn(() => []),
filteredList: vi.fn(() => []),
toAnthropicFormat: vi.fn(() => []),
toOpenAIFormat: vi.fn(() => []),
filteredToAnthropicFormat: vi.fn(() => []),
filteredToOpenAIFormat: vi.fn(() => []),
};
const mockToolExecutor = {
execute: vi.fn(async () => ({ success: true, output: 'ok' })),
};
const TEST_PORT = 18911;
const TEST_TOKEN = 'runtime-client-token';
let LISTEN_ALLOWED = true;
let server: GatewayServer;
beforeAll(async () => {
LISTEN_ALLOWED = await canListenOnLocalhost();
if (!LISTEN_ALLOWED) {
return;
}
server = new GatewayServer({
port: TEST_PORT,
sessionManager: mockSessionManager as unknown as GatewayServerConfig['sessionManager'],
modelClient: mockModelClient,
systemPrompt: 'Test prompt',
toolRegistry: mockToolRegistry as unknown as GatewayServerConfig['toolRegistry'],
toolExecutor: mockToolExecutor as unknown as GatewayServerConfig['toolExecutor'],
version: '0.1.0-test',
uiDir: resolve(import.meta.dirname, '../gateway/ui'),
auth: { token: TEST_TOKEN },
nodes: {
enabled: true,
allowedRoles: ['companion'],
featureGates: { 'ui.canvas': true },
locationEnabled: true,
pushEnabled: true,
},
});
await server.start();
});
afterAll(async () => {
if (!LISTEN_ALLOWED) {
return;
}
await server.stop();
});
describe('CompanionRuntimeClient', () => {
it('validates requestTimeoutMs option', () => {
expect(() => {
new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
requestTimeoutMs: 0,
});
}).toThrow('requestTimeoutMs must be a positive number');
});
it('dispatches gateway events to subscribed handlers and supports unsubscribe', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const handler = vi.fn();
expect(client.eventSubscriptionCount).toBe(0);
const unsubscribe = client.subscribeEvents(handler);
expect(client.eventSubscriptionCount).toBe(1);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 42,
event: 'agent.stream',
data: { token: 'hello' },
}),
);
expect(handler).toHaveBeenCalledWith('agent.stream', { token: 'hello' });
unsubscribe();
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 43,
event: 'agent.stream',
data: { token: 'world' },
}),
);
expect(handler).toHaveBeenCalledTimes(1);
expect(client.eventSubscriptionCount).toBe(0);
});
it('isolates subscriber callback failures', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const badHandler = vi.fn(() => {
throw new Error('subscriber failed');
});
const goodHandler = vi.fn();
client.subscribeEvents(badHandler);
client.subscribeEvents(goodHandler);
expect(() => {
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 44,
event: 'agent.stream',
data: { token: 'safe' },
}),
);
}).not.toThrow();
expect(badHandler).toHaveBeenCalledOnce();
expect(goodHandler).toHaveBeenCalledWith('agent.stream', { token: 'safe' });
});
it('supports filtered subscribeEvent helper', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const streamHandler = vi.fn();
const unsubscribe = client.subscribeEvent<{ token: string }>('agent.stream', streamHandler);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 45,
event: 'agent.stream',
data: { token: 'first' },
}),
);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 46,
event: 'agent.typing',
data: { active: true },
}),
);
expect(streamHandler).toHaveBeenCalledTimes(1);
expect(streamHandler).toHaveBeenCalledWith({ token: 'first' });
unsubscribe();
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 47,
event: 'agent.stream',
data: { token: 'second' },
}),
);
expect(streamHandler).toHaveBeenCalledTimes(1);
});
it('supports subscribeAgentStream and subscribeAgentTyping helpers', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const streamHandler = vi.fn();
const typingHandler = vi.fn();
client.subscribeAgentStream(streamHandler);
client.subscribeAgentTyping(typingHandler);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 52,
event: 'agent.stream',
data: { token: 'x' },
}),
);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 53,
event: 'agent.typing',
data: { active: true },
}),
);
expect(streamHandler).toHaveBeenCalledWith({ token: 'x' });
expect(typingHandler).toHaveBeenCalledWith({ active: true });
});
it('lists known companion event names', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(client.listKnownEventNames()).toEqual([
'agent.stream',
'agent.typing',
'context_warning',
]);
});
it('supports subscribeContextWarning helper', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const warningHandler = vi.fn();
client.subscribeContextWarning(warningHandler);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 56,
event: 'context_warning',
data: { thresholdPct: 80, estimatedPct: 92 },
}),
);
expect(warningHandler).toHaveBeenCalledWith({ thresholdPct: 80, estimatedPct: 92 });
});
it('clears all event subscriptions', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const handlerA = vi.fn();
const handlerB = vi.fn();
client.subscribeEvents(handlerA);
client.subscribeEvent('agent.stream', handlerB);
const clearResult = client.clearEventSubscriptions();
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 50,
event: 'agent.stream',
data: { token: 'cleared' },
}),
);
expect(handlerA).not.toHaveBeenCalled();
expect(handlerB).not.toHaveBeenCalled();
expect(clearResult).toEqual({
clearedSubscriptions: 2,
cancelledWaits: 0,
});
});
it('dispose clears subscriptions and is safe to call repeatedly', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const handler = vi.fn();
client.subscribeEvents(handler);
client.dispose();
client.dispose();
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 51,
event: 'agent.stream',
data: { token: 'after-dispose' },
}),
);
expect(handler).not.toHaveBeenCalled();
});
it('waitForEvent resolves using optional predicate filter', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(client.pendingEventWaitCount).toBe(0);
expect(client.hasPendingWork).toBe(false);
const awaited = client.waitForEvent<{ seq: number }>('agent.stream', {
timeoutMs: 2000,
predicate: (data) => data.seq === 2,
});
expect(client.pendingEventWaitCount).toBe(1);
expect(client.hasPendingWork).toBe(true);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 48,
event: 'agent.stream',
data: { seq: 1 },
}),
);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 49,
event: 'agent.stream',
data: { seq: 2 },
}),
);
await expect(awaited).resolves.toEqual({ seq: 2 });
expect(client.pendingEventWaitCount).toBe(0);
expect(client.hasPendingWork).toBe(false);
});
it('waitForEvent rejects on timeout', async () => {
vi.useFakeTimers();
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = expect(
client.waitForEvent('agent.stream', { timeoutMs: 100 }),
).rejects.toThrow('Timed out waiting for event agent.stream');
await vi.advanceTimersByTimeAsync(100);
await awaited;
vi.useRealTimers();
});
it('waitForEvent validates eventName input', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(() => client.waitForEvent('')).toThrow('eventName must be a non-empty string');
expect(() => client.waitForEvent(' ')).toThrow('eventName must be a non-empty string');
expect(() => client.waitForEvent(123 as unknown as string)).toThrow(
'eventName must be a non-empty string',
);
expect(() => client.waitForEvent('agent.stream', { timeoutMs: 0 })).toThrow(
'timeoutMs must be a positive number',
);
});
it('waitForEvent supports AbortSignal cancellation', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const controller = new AbortController();
const awaited = expect(
client.waitForEvent('agent.stream', { signal: controller.signal, timeoutMs: 10_000 }),
).rejects.toThrow('Aborted while waiting for event agent.stream');
controller.abort();
await awaited;
});
it('waitForEvent rejects immediately when event subscriptions are cleared', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = expect(
client.waitForEvent('agent.stream', { timeoutMs: 10_000 }),
).rejects.toThrow('Event subscriptions cleared');
expect(client.clearEventSubscriptions()).toEqual({
clearedSubscriptions: 1,
cancelledWaits: 1,
});
await awaited;
});
it('cancelPendingEventWaits rejects waiters without clearing subscriptions', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const handler = vi.fn();
client.subscribeEvents(handler);
const awaited = expect(
client.waitForEvent('agent.stream', { timeoutMs: 10_000 }),
).rejects.toThrow('manually cancelled');
expect(client.cancelPendingEventWaits('manually cancelled')).toBe(1);
await awaited;
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 99,
event: 'agent.stream',
data: { token: 'still-subscribed' },
}),
);
expect(handler).toHaveBeenCalledWith('agent.stream', { token: 'still-subscribed' });
expect(client.cancelPendingEventWaits()).toBe(0);
});
it('cancelPendingEventWaits updates pending work snapshot immediately', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
expect(client.getPendingWorkSnapshot()).toEqual({
pendingRequestCount: 0,
pendingEventWaitCount: 1,
hasPendingWork: true,
});
expect(client.cancelPendingEventWaits('snapshot cancel')).toBe(1);
expect(client.getPendingWorkSnapshot()).toEqual({
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
});
await pendingWait;
});
it('waitForEvent rejects immediately on disconnect', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = expect(
client.waitForEvent('agent.stream', { timeoutMs: 10_000 }),
).rejects.toThrow('Disconnected');
client.disconnect();
await awaited;
});
it('waitForEvent rejects when websocket closes unexpectedly', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close');
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await client.connect();
const awaited = expect(
client.waitForEvent('agent.stream', { timeoutMs: 10_000 }),
).rejects.toThrow('WebSocket closed');
const ws = (client as unknown as { ws: WebSocket | null }).ws;
ws?.close();
await awaited;
expect(client.connected).toBe(false);
});
it('waitForAnyEvent rejects when websocket closes unexpectedly', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close');
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await client.connect();
const awaited = expect(
client.waitForAnyEvent(['agent.stream', 'agent.typing'], { timeoutMs: 10_000 }),
).rejects.toThrow('WebSocket closed');
const ws = (client as unknown as { ws: WebSocket | null }).ws;
ws?.close();
await awaited;
expect(client.connected).toBe(false);
});
it('waitForAgentStream resolves on agent.stream events', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = client.waitForAgentStream<{ token: string }>({ timeoutMs: 2000 });
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 54,
event: 'agent.stream',
data: { token: 'typed-stream' },
}),
);
await expect(awaited).resolves.toEqual({ token: 'typed-stream' });
});
it('waitForAgentTyping resolves on agent.typing events', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = client.waitForAgentTyping<{ active: boolean }>({ timeoutMs: 2000 });
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 55,
event: 'agent.typing',
data: { active: true },
}),
);
await expect(awaited).resolves.toEqual({ active: true });
});
it('waitForContextWarning resolves on context_warning events', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = client.waitForContextWarning<{ thresholdPct: number; estimatedPct: number }>({
timeoutMs: 2000,
});
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 57,
event: 'context_warning',
data: { thresholdPct: 75, estimatedPct: 88 },
}),
);
await expect(awaited).resolves.toEqual({ thresholdPct: 75, estimatedPct: 88 });
});
it('waitForAnyEvent resolves with event envelope for first matching event', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(client.pendingEventWaitCount).toBe(0);
const awaited = client.waitForAnyEvent<{ active?: boolean; token?: string }>(
['agent.typing', 'agent.stream'],
{ timeoutMs: 2000 },
);
expect(client.pendingEventWaitCount).toBe(1);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 58,
event: 'agent.typing',
data: { active: true },
}),
);
await expect(awaited).resolves.toEqual({
event: 'agent.typing',
data: { active: true },
});
expect(client.pendingEventWaitCount).toBe(0);
});
it('waitForAnyEvent supports per-event predicate filtering', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const awaited = client.waitForAnyEvent<{ token?: string }>(
['agent.stream'],
{
timeoutMs: 2000,
predicate: (_event, data) => data.token === 'accept',
},
);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 59,
event: 'agent.stream',
data: { token: 'skip' },
}),
);
(client as unknown as { handleMessage: (raw: string) => void }).handleMessage(
JSON.stringify({
id: 60,
event: 'agent.stream',
data: { token: 'accept' },
}),
);
await expect(awaited).resolves.toEqual({
event: 'agent.stream',
data: { token: 'accept' },
});
});
it('waitForAnyEvent validates input event list', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(() => client.waitForAnyEvent([])).toThrow(
'eventNames must contain at least one event name',
);
expect(() => client.waitForAnyEvent(['agent.stream', ' '])).toThrow(
'eventNames must not contain empty values',
);
expect(() => client.waitForAnyEvent(['agent.stream', 123 as unknown as string])).toThrow(
'eventNames must not contain empty values',
);
expect(() => client.waitForAnyEvent(['agent.stream'], { timeoutMs: 0 })).toThrow(
'timeoutMs must be a positive number',
);
});
it('tracks pendingRequestCount for in-flight RPCs', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close');
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
requestTimeoutMs: 10_000,
});
await client.connect();
expect(client.pendingRequestCount).toBe(0);
expect(client.hasPendingWork).toBe(false);
expect(client.idle).toBe(true);
const pending = client.call('system.capabilities');
expect(client.pendingRequestCount).toBe(1);
expect(client.hasPendingWork).toBe(true);
expect(client.idle).toBe(false);
client.disconnect();
await expect(pending).rejects.toThrow('Disconnected');
expect(client.pendingRequestCount).toBe(0);
expect(client.hasPendingWork).toBe(false);
expect(client.idle).toBe(true);
});
it('returns pending work snapshot', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(client.getPendingWorkSnapshot()).toEqual({
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
expect(client.getPendingWorkSnapshot()).toEqual({
pendingRequestCount: 0,
pendingEventWaitCount: 1,
hasPendingWork: true,
});
client.clearEventSubscriptions();
await pendingWait;
expect(client.getPendingWorkSnapshot()).toEqual({
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
});
});
it('returns event surface snapshot', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
const unsubscribe = client.subscribeEvents(() => undefined);
expect(client.getEventSurfaceSnapshot()).toEqual({
knownEventNames: ['agent.stream', 'agent.typing', 'context_warning'],
eventSubscriptionCount: 2,
pendingEventWaitCount: 1,
});
unsubscribe();
client.clearEventSubscriptions();
await pendingWait;
});
it('returns connection snapshot', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
expect(client.getConnectionSnapshot()).toEqual({
connected: false,
eventSubscriptionCount: 1,
pendingRequestCount: 0,
pendingEventWaitCount: 1,
hasPendingWork: true,
idle: false,
lastDisconnectCode: undefined,
lastDisconnectReason: undefined,
});
client.clearEventSubscriptions();
await pendingWait;
expect(client.getConnectionSnapshot()).toEqual({
connected: false,
eventSubscriptionCount: 0,
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
idle: true,
lastDisconnectCode: undefined,
lastDisconnectReason: undefined,
});
});
it('connection snapshot tracks manual disconnect code and reason', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
client.disconnect(4100, 'manual stop');
expect(client.getConnectionSnapshot()).toEqual({
connected: false,
eventSubscriptionCount: 0,
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
idle: true,
lastDisconnectCode: 4100,
lastDisconnectReason: 'manual stop',
});
expect(client.lastDisconnectCode).toBe(4100);
expect(client.lastDisconnectReason).toBe('manual stop');
});
it('connection snapshot tracks transport close code and reason', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close', 4001, Buffer.from('transport closed'));
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await client.connect();
const ws = (client as unknown as { ws: WebSocket | null }).ws;
ws?.close();
expect(client.getConnectionSnapshot()).toEqual({
connected: false,
eventSubscriptionCount: 0,
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
idle: true,
lastDisconnectCode: 4001,
lastDisconnectReason: 'transport closed',
});
});
it('manual disconnect metadata is not overwritten by local close event', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(): void {
this.readyState = WebSocket.CLOSED;
this.emit('close');
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await client.connect();
client.disconnect(4100, 'manual stop');
expect(client.getConnectionSnapshot()).toEqual({
connected: false,
eventSubscriptionCount: 0,
pendingRequestCount: 0,
pendingEventWaitCount: 0,
hasPendingWork: false,
idle: true,
lastDisconnectCode: 4100,
lastDisconnectReason: 'manual stop',
});
});
it('connect clears stale disconnect metadata from prior sessions', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(code?: number, reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close', code ?? 1000, Buffer.from(reason ?? ''));
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
});
await client.connect();
client.disconnect(4100, 'manual stop');
expect(client.lastDisconnectCode).toBe(4100);
expect(client.lastDisconnectReason).toBe('manual stop');
await client.connect();
expect(client.lastDisconnectCode).toBeUndefined();
expect(client.lastDisconnectReason).toBeUndefined();
});
it('waitForIdle resolves immediately when no work is pending', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
await expect(client.waitForIdle()).resolves.toBeUndefined();
});
it('waitForIdle validates pollIntervalMs option', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(() => client.waitForIdle({ pollIntervalMs: 0 })).toThrow(
'pollIntervalMs must be a positive number',
);
expect(() => client.waitForIdle({ pollIntervalMs: Number.NaN })).toThrow(
'pollIntervalMs must be a positive number',
);
expect(() => client.waitForIdle({ pollIntervalMs: Number.POSITIVE_INFINITY })).toThrow(
'pollIntervalMs must be a positive number',
);
});
it('waitForIdle validates timeoutMs option', () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
expect(() => client.waitForIdle({ timeoutMs: 0 })).toThrow(
'timeoutMs must be a positive number',
);
expect(() => client.waitForIdle({ timeoutMs: Number.NaN })).toThrow(
'timeoutMs must be a positive number',
);
expect(() => client.waitForIdle({ timeoutMs: Number.POSITIVE_INFINITY })).toThrow(
'timeoutMs must be a positive number',
);
});
it('waitForIdle resolves after pending event waiters are cleared', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
expect(client.pendingEventWaitCount).toBe(1);
const idle = client.waitForIdle({ timeoutMs: 1_000, pollIntervalMs: 5 });
setTimeout(() => {
client.clearEventSubscriptions();
}, 20);
await expect(idle).resolves.toBeUndefined();
await pendingWait;
expect(client.pendingEventWaitCount).toBe(0);
expect(client.hasPendingWork).toBe(false);
});
it('waitForIdle rejects on timeout while work remains pending', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
await expect(
client.waitForIdle({ timeoutMs: 40, pollIntervalMs: 5 }),
).rejects.toThrow('Timed out waiting for runtime idle state');
client.clearEventSubscriptions();
await pendingWait;
});
it('waitForIdle resolves after pending RPC requests are rejected', async () => {
class FakeWebSocket extends EventEmitter {
readyState: number = WebSocket.CONNECTING;
constructor() {
super();
queueMicrotask(() => {
this.readyState = WebSocket.OPEN;
this.emit('open');
});
}
send(_payload: string, callback?: (error?: Error) => void): void {
callback?.();
}
close(_code?: number, _reason?: string): void {
this.readyState = WebSocket.CLOSED;
this.emit('close');
}
}
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
websocketFactory: () => new FakeWebSocket() as unknown as WebSocket,
requestTimeoutMs: 10_000,
});
await client.connect();
const pendingCall = client.call('system.capabilities').catch(() => undefined);
expect(client.pendingRequestCount).toBe(1);
const idle = client.waitForIdle({ timeoutMs: 1_000, pollIntervalMs: 5 });
setTimeout(() => {
client.disconnect();
}, 20);
await expect(idle).resolves.toBeUndefined();
await pendingCall;
expect(client.pendingRequestCount).toBe(0);
expect(client.hasPendingWork).toBe(false);
});
it('waitForIdle supports AbortSignal cancellation', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
const controller = new AbortController();
const awaited = expect(
client.waitForIdle({ timeoutMs: 10_000, signal: controller.signal }),
).rejects.toThrow('Aborted while waiting for runtime idle state');
controller.abort();
await awaited;
client.clearEventSubscriptions();
await pendingWait;
});
it('waitForIdle rejects immediately when signal is already aborted', async () => {
const client = new CompanionRuntimeClient({
url: 'ws://127.0.0.1:1',
});
const pendingWait = client.waitForEvent('agent.stream', { timeoutMs: 10_000 }).catch(() => undefined);
const controller = new AbortController();
controller.abort();
await expect(
client.waitForIdle({ timeoutMs: 10_000, signal: controller.signal }),
).rejects.toThrow('Aborted while waiting for runtime idle state');
client.clearEventSubscriptions();
await pendingWait;
});
it('connects and performs node registration + capability discovery', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
expect(client.connected).toBe(true);
try {
const register = await client.registerNode({
nodeId: 'macos-companion',
role: 'companion',
capabilities: ['ui.canvas', 'node.location.read'],
});
expect(register.registered).toBe(true);
expect(register.node.id).toBe('macos-companion');
expect(register.protocol.serverVersion).toBe(1);
expect(register.capabilities.enabled).toContain('ui.canvas');
const nodeCaps = await client.getNodeCapabilities();
expect(nodeCaps.node.id).toBe('macos-companion');
expect(nodeCaps.capabilities.enabled).toContain('ui.canvas');
const systemCaps = await client.getSystemCapabilities();
expect(systemCaps.nodes.enabled).toBe(true);
expect(systemCaps.nodes.registered).toBe(true);
expect(systemCaps.nodes.nodeId).toBe('macos-companion');
} finally {
client.disconnect();
}
});
it('bootstraps node registration and capabilities in one call', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
try {
const boot = await client.bootstrapNode(
{
nodeId: 'bootstrap-runtime-node',
role: 'companion',
capabilities: ['ui.canvas'],
},
{ includeSystemCapabilities: true },
);
expect(boot.register.registered).toBe(true);
expect(boot.capabilities.node.id).toBe('bootstrap-runtime-node');
expect(boot.systemCapabilities?.nodes.enabled).toBe(true);
expect(boot.systemCapabilities?.nodes.registered).toBe(true);
} finally {
client.disconnect();
}
});
it('updates status/location/push and exposes them through system.nodes', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
try {
await client.registerNode({
nodeId: 'ios-companion',
role: 'companion',
capabilities: ['node.location.write', 'node.push.register'],
});
const status = await client.setNodeStatus({
platform: 'ios',
appVersion: '1.2.3',
batteryPct: 77,
powerSource: 'battery',
});
expect(status.updated).toBe(true);
expect(status.status.platform).toBe('ios');
const location = await client.setNodeLocation({
latitude: 37.78,
longitude: -122.41,
source: 'gps',
});
expect(location.updated).toBe(true);
expect(location.location.source).toBe('gps');
const locationGet = await client.getNodeLocation();
expect(locationGet.location?.latitude).toBe(37.78);
const push = await client.setNodePushToken({
provider: 'apns',
token: 'abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890',
topic: 'dev.flynn.companion',
environment: 'production',
});
expect(push.updated).toBe(true);
expect(push.push.provider).toBe('apns');
expect(push.push.tokenPreview.startsWith('***')).toBe(true);
const nodes = await client.listSystemNodes({ role: 'companion', platform: 'ios' });
expect(nodes.summary.total).toBeGreaterThanOrEqual(1);
const current = nodes.nodes.find((entry) => entry.nodeId === 'ios-companion');
expect(current).toBeTruthy();
expect(current?.status?.platform).toBe('ios');
expect(current?.push?.provider).toBe('apns');
} finally {
client.disconnect();
}
});
it('surfaces gateway errors as GatewayRpcError', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
try {
await expect(client.registerNode({
nodeId: 'observer-node',
role: 'observer',
capabilities: ['node.location.read'],
})).rejects.toBeInstanceOf(GatewayRpcError);
await expect(client.registerNode({
nodeId: 'observer-node',
role: 'observer',
capabilities: ['node.location.read'],
})).rejects.toMatchObject({
code: -5,
});
} finally {
client.disconnect();
}
});
it('supports canvas artifact lifecycle via typed helper methods', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
});
await client.connect();
try {
const sessionId = 'ws:companion-canvas';
const put = await client.putCanvasArtifact({
sessionId,
artifactId: 'artifact-1',
type: 'markdown',
title: 'Companion note',
content: { body: '# Hello' },
metadata: { source: 'runtime-client-test' },
});
expect(put.upserted).toBe(true);
expect(put.artifact.id).toBe('artifact-1');
expect(put.artifact.type).toBe('markdown');
const list = await client.listCanvasArtifacts(sessionId);
expect(list.artifacts.length).toBeGreaterThanOrEqual(1);
expect(list.artifacts.some((artifact) => artifact.id === 'artifact-1')).toBe(true);
const get = await client.getCanvasArtifact({ sessionId, artifactId: 'artifact-1' });
expect(get.artifact.id).toBe('artifact-1');
expect(get.artifact.title).toBe('Companion note');
const del = await client.deleteCanvasArtifact({ sessionId, artifactId: 'artifact-1' });
expect(del.deleted).toBe(true);
const clear = await client.clearCanvasArtifacts(sessionId);
expect(clear.cleared).toBe(0);
} finally {
client.disconnect();
}
});
it('supports autoConnect mode for one-shot RPC usage', async () => {
if (!LISTEN_ALLOWED) {
return;
}
const client = new CompanionRuntimeClient({
url: `ws://127.0.0.1:${TEST_PORT}`,
token: TEST_TOKEN,
autoConnect: true,
});
expect(client.connected).toBe(false);
try {
const register = await client.registerNode({
nodeId: 'auto-connect-node',
role: 'companion',
capabilities: ['ui.canvas'],
});
expect(register.registered).toBe(true);
expect(client.connected).toBe(true);
} finally {
client.disconnect();
}
});
});