From 717e5d60e5e5624033be24dbea93b7bc9a8e1d3a Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 16 Feb 2026 19:27:25 -0800 Subject: [PATCH] feat(companion): add waitForAnyEvent runtime helper --- README.md | 2 +- docs/plans/state.json | 14 ++++++ src/companion/index.ts | 1 + src/companion/runtimeClient.test.ts | 66 +++++++++++++++++++++++++ src/companion/runtimeClient.ts | 74 +++++++++++++++++++++++++++++ 5 files changed, 156 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b3ff015..8ffb3ab 100644 --- a/README.md +++ b/README.md @@ -1190,7 +1190,7 @@ Methods: - `system.capabilities` returns gateway protocol and node policy snapshot. Companion runtime helper: -- `src/companion/runtimeClient.ts` provides a typed Node/WebSocket client for companion runtimes (macOS/iOS/Android workers) with wrappers for `node.register`, `node.capabilities.get`, `node.location.set/get`, `node.status.set`, `node.push_token.set`, `system.capabilities`, `system.nodes`, and canvas artifact RPCs (`canvas.put/get/list/delete/clear`), plus convenience helpers (`bootstrapNode`, optional `autoConnect`, `dispose()`) and event helpers (`subscribeEvents()`, `subscribeEvent()`, `subscribeAgentStream()`, `subscribeAgentTyping()`, `subscribeContextWarning()`, `waitForEvent()` with timeout/predicate/abort support and deterministic teardown cancellation, `waitForAgentStream()`, `waitForAgentTyping()`, `waitForContextWarning()`, `clearEventSubscriptions()`). +- `src/companion/runtimeClient.ts` provides a typed Node/WebSocket client for companion runtimes (macOS/iOS/Android workers) with wrappers for `node.register`, `node.capabilities.get`, `node.location.set/get`, `node.status.set`, `node.push_token.set`, `system.capabilities`, `system.nodes`, and canvas artifact RPCs (`canvas.put/get/list/delete/clear`), plus convenience helpers (`bootstrapNode`, optional `autoConnect`, `dispose()`) and event helpers (`subscribeEvents()`, `subscribeEvent()`, `subscribeAgentStream()`, `subscribeAgentTyping()`, `subscribeContextWarning()`, `waitForEvent()` with timeout/predicate/abort support and deterministic teardown cancellation, `waitForAnyEvent()`, `waitForAgentStream()`, `waitForAgentTyping()`, `waitForContextWarning()`, `clearEventSubscriptions()`). - `src/companion/platformClients.ts` provides platform-focused wrappers: - `MacOSCompanionClient` (`platform: "macos"`, APNs push registration) - `IOSCompanionClient` (`platform: "ios"`, APNs push registration) diff --git a/docs/plans/state.json b/docs/plans/state.json index fd7aab6..adafbce 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -593,6 +593,20 @@ ], "test_status": "pnpm test:run src/companion/runtimeClient.test.ts src/companion/platformClients.test.ts src/companion/heartbeatLoop.test.ts src/companion/platformClients.integration.test.ts + pnpm typecheck passing" }, + "companion-runtime-wait-for-any-event-helper": { + "status": "completed", + "date": "2026-02-17", + "updated": "2026-02-17", + "summary": "Added `waitForAnyEvent()` on `CompanionRuntimeClient` to await the first matching event from a set of event names with timeout/predicate/abort support and typed event envelopes.", + "files_modified": [ + "src/companion/runtimeClient.ts", + "src/companion/runtimeClient.test.ts", + "src/companion/index.ts", + "README.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/companion/runtimeClient.test.ts src/companion/platformClients.test.ts src/companion/heartbeatLoop.test.ts src/companion/platformClients.integration.test.ts + pnpm typecheck passing" + }, "browser-tools-activation-clarity": { "status": "completed", "date": "2026-02-17", diff --git a/src/companion/index.ts b/src/companion/index.ts index a61722d..17bd81f 100644 --- a/src/companion/index.ts +++ b/src/companion/index.ts @@ -15,6 +15,7 @@ export type { CompanionEventHandler, CompanionTypedEventHandler, CompanionEventPredicate, + CompanionEventEnvelope, RegisterNodeInput, ListNodesInput, SetNodeStatusInput, diff --git a/src/companion/runtimeClient.test.ts b/src/companion/runtimeClient.test.ts index 5bb6e48..de7ad60 100644 --- a/src/companion/runtimeClient.test.ts +++ b/src/companion/runtimeClient.test.ts @@ -416,6 +416,72 @@ describe('CompanionRuntimeClient', () => { await expect(awaited).resolves.toEqual({ thresholdPct: 75, estimatedPct: 88 }); }); + it('waitForAnyEvent resolves with event envelope for first matching event', async () => { + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + }); + const awaited = client.waitForAnyEvent<{ active?: boolean; token?: string }>( + ['agent.typing', 'agent.stream'], + { timeoutMs: 2000 }, + ); + + (client as unknown as { handleMessage: (raw: string) => void }).handleMessage( + JSON.stringify({ + id: 58, + event: 'agent.typing', + data: { active: true }, + }), + ); + + await expect(awaited).resolves.toEqual({ + event: 'agent.typing', + data: { active: true }, + }); + }); + + it('waitForAnyEvent supports per-event predicate filtering', async () => { + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + }); + const awaited = client.waitForAnyEvent<{ token?: string }>( + ['agent.stream'], + { + timeoutMs: 2000, + predicate: (_event, data) => data.token === 'accept', + }, + ); + + (client as unknown as { handleMessage: (raw: string) => void }).handleMessage( + JSON.stringify({ + id: 59, + event: 'agent.stream', + data: { token: 'skip' }, + }), + ); + (client as unknown as { handleMessage: (raw: string) => void }).handleMessage( + JSON.stringify({ + id: 60, + event: 'agent.stream', + data: { token: 'accept' }, + }), + ); + + await expect(awaited).resolves.toEqual({ + event: 'agent.stream', + data: { token: 'accept' }, + }); + }); + + it('waitForAnyEvent validates input event list', () => { + const client = new CompanionRuntimeClient({ + url: 'ws://127.0.0.1:1', + }); + + expect(() => client.waitForAnyEvent([])).toThrow( + 'eventNames must contain at least one event name', + ); + }); + it('connects and performs node registration + capability discovery', async () => { if (!LISTEN_ALLOWED) { return; diff --git a/src/companion/runtimeClient.ts b/src/companion/runtimeClient.ts index 2831842..8d92a22 100644 --- a/src/companion/runtimeClient.ts +++ b/src/companion/runtimeClient.ts @@ -44,6 +44,10 @@ export interface CompanionRuntimeClientOptions { export type CompanionEventHandler = (event: string, data: unknown) => void; export type CompanionTypedEventHandler = (data: TData) => void; export type CompanionEventPredicate = (data: TData) => boolean; +export type CompanionEventEnvelope = { + event: string; + data: TData; +}; export const COMPANION_EVENT_NAMES = { agentStream: 'agent.stream', @@ -475,6 +479,76 @@ export class CompanionRuntimeClient { }); } + waitForAnyEvent( + eventNames: readonly string[], + options?: { + timeoutMs?: number; + predicate?: (event: string, data: TData) => boolean; + signal?: AbortSignal; + }, + ): Promise> { + if (eventNames.length === 0) { + throw new Error('eventNames must contain at least one event name'); + } + const eventNameSet = new Set(eventNames); + const timeoutMs = options?.timeoutMs ?? this.requestTimeoutMs; + const predicate = options?.predicate; + const signal = options?.signal; + + return new Promise>((resolve, reject) => { + let settled = false; + let abortCleanup: (() => void) | null = null; + + const finish = (fn: () => void) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timeout); + unsubscribe(); + if (abortCleanup) { + abortCleanup(); + abortCleanup = null; + } + this.pendingEventWaits.delete(cancelWait); + fn(); + }; + + const cancelWait = (error: Error) => { + finish(() => reject(error)); + }; + this.pendingEventWaits.add(cancelWait); + + const unsubscribe = this.subscribeEvents((event, data) => { + if (!eventNameSet.has(event)) { + return; + } + const castData = data as TData; + if (predicate && !predicate(event, castData)) { + return; + } + finish(() => resolve({ event, data: castData })); + }); + + const timeout = setTimeout(() => { + cancelWait(new Error(`Timed out waiting for any event in [${eventNames.join(', ')}]`)); + }, timeoutMs); + + if (signal) { + const onAbort = () => { + cancelWait(new Error(`Aborted while waiting for events [${eventNames.join(', ')}]`)); + }; + signal.addEventListener('abort', onAbort, { once: true }); + abortCleanup = () => { + signal.removeEventListener('abort', onAbort); + }; + if (signal.aborted) { + onAbort(); + } + } + }); + } + waitForAgentStream(options?: { timeoutMs?: number; predicate?: CompanionEventPredicate;