From 37be391a407088c8d0b68b84e57218f76311b7cb Mon Sep 17 00:00:00 2001 From: William Valentin Date: Tue, 24 Feb 2026 13:14:53 -0800 Subject: [PATCH] Unify TUI runtime commands with gateway and harden gateway restart --- Makefile | 10 +- README.md | 8 +- docs/api/PROTOCOL.md | 1 + docs/architecture/AGENT_DIAGRAM.md | 1 + .../GATEWAY_SESSIONS_AND_QUEUE.md | 3 + docs/plans/state.json | 54 ++- src/cli/tui.ts | 36 ++ src/cli/tuiRuntimeGateway.test.ts | 138 ++++++ src/cli/tuiRuntimeGateway.ts | 396 ++++++++++++++++++ src/commands/index.ts | 5 + src/commands/runtimeBackendMode.ts | 94 +++++ src/daemon/index.ts | 5 + src/daemon/routing.ts | 109 ++--- src/daemon/services.ts | 82 +++- src/daemon/startServices.test.ts | 139 ++++++ src/frontends/tui/commands.ts | 6 +- src/frontends/tui/components/App.tsx | 25 +- src/frontends/tui/fullscreen.ts | 2 + src/frontends/tui/minimal.test.ts | 56 ++- src/frontends/tui/minimal.ts | 25 +- src/gateway/handlers/agent.test.ts | 61 +++ src/gateway/handlers/agent.ts | 59 ++- src/gateway/server.test.ts | 30 ++ src/gateway/server.ts | 28 +- 24 files changed, 1253 insertions(+), 120 deletions(-) create mode 100644 src/cli/tuiRuntimeGateway.test.ts create mode 100644 src/cli/tuiRuntimeGateway.ts create mode 100644 src/commands/runtimeBackendMode.ts create mode 100644 src/daemon/startServices.test.ts diff --git a/Makefile b/Makefile index 844a9d3..d9da07a 100644 --- a/Makefile +++ b/Makefile @@ -17,17 +17,19 @@ start: ## Start production build (foreground) node dist/cli/index.js start # Systemd daemon management -daemon-start: ## Start the systemd service +daemon-start: ## Build and start the systemd service + pnpm build systemctl --user start flynn.service - @echo "Flynn daemon started" + @echo "Flynn daemon built and started" daemon-stop: ## Stop the systemd service systemctl --user stop flynn.service @echo "Flynn daemon stopped" -daemon-restart: ## Restart the systemd service +daemon-restart: ## Build and restart the systemd service + pnpm build systemctl --user restart flynn.service - @echo "Flynn daemon restarted" + @echo "Flynn daemon built and restarted" daemon-status: ## Check systemd service status systemctl --user status flynn.service diff --git a/README.md b/README.md index 6b65061..6d9aefc 100644 --- a/README.md +++ b/README.md @@ -376,6 +376,8 @@ Runtime backend mode can be controlled live (persisted in `~/.local/share/flynn/ This manual runtime mode control is the intended Pi activation/deactivation switch. +In TUI sessions, `/runtime ...` is forwarded through the gateway/daemon command path (same control-plane behavior as Telegram/WebChat). `flynn tui` now auto-attaches to gateway and auto-starts local daemon+gateway if needed. + To evaluate canary performance from audit logs, run: ```bash @@ -652,6 +654,7 @@ pnpm tui:fs | `/model ` | Switch active tier (`local`, `default`, `fast`, `complex`, or aliases `ollama`, `sonnet`, `haiku`, `opus`) | | `/model ` | Hot-swap a tier's provider and model at runtime | | `/backend [provider]` | TUI-local command: show or switch local model backend (`ollama`, `llamacpp`) | +| `/runtime ` | Forward runtime backend mode control to daemon/gateway command service | | `/login [provider]` | Authenticate with GitHub (OAuth device flow) | | `/reset` | Clear history | | `/status` | Show session info | @@ -673,7 +676,7 @@ pnpm tui:fs TUI keyboard controls: `Esc` cancels active prompt/running response. `Ctrl+C` clears the current input; press `Ctrl+C` twice quickly to exit. -Note: runtime backend mode control is a daemon/channel command (`/runtime ...`, `/backend ...` alias), not the TUI-local `/backend [provider]` switch. +`flynn tui` now attaches to the gateway command path at startup (and auto-starts local daemon+gateway when unavailable), so `/runtime ...` in TUI matches Telegram/WebChat behavior. `/backend [provider]` remains the TUI-local backend switch for local tier providers. #### Runtime Model Switching @@ -819,6 +822,9 @@ systemctl --user enable --now flynn # View logs journalctl --user -u flynn -f + +# From the repo root, rebuild + restart service after code changes +make restart ``` ## Hook Engine diff --git a/docs/api/PROTOCOL.md b/docs/api/PROTOCOL.md index 7b8a1fd..bc80c2f 100644 --- a/docs/api/PROTOCOL.md +++ b/docs/api/PROTOCOL.md @@ -38,6 +38,7 @@ The gateway serialises agent work **per session**, not per WebSocket connection: - Session-local overrides can be managed at runtime via `agent.send` commands: `/queue`, `/queue set ...`, `/queue reset`. - Backend selection for a turn is server-side (`native` by default, optional external backends per config: `claude_code`, `opencode`, `codex`, `gemini`, `pi_embedded`) and does not change JSON-RPC method signatures. - Runtime backend mode overrides are available via `agent.send` command fast-path: `/runtime status`, `/runtime activate pi`, `/runtime deactivate pi`, `/runtime use config` (`/backend ...` remains a compatibility alias). +- The gateway `agent.send` command path and channel-router path use the same runtime backend-mode command service; `flynn tui` forwards `/runtime ...` through this gateway path for parity. - Backend routing and fallback outcomes are emitted to audit logs (`backend.route`, `backend.success`, `backend.fallback`) for rollout evaluation; this telemetry is outside JSON-RPC response payloads. This is implemented via a per-lane queue (`LaneQueue`) in the gateway server, and used by `agent.send` and `agent.cancel`. diff --git a/docs/architecture/AGENT_DIAGRAM.md b/docs/architecture/AGENT_DIAGRAM.md index 09fc27c..b1b5062 100644 --- a/docs/architecture/AGENT_DIAGRAM.md +++ b/docs/architecture/AGENT_DIAGRAM.md @@ -100,6 +100,7 @@ ChannelAdapter -> ChannelRegistry | | | +----> Runtime backend mode overrides | (/runtime status|activate pi|deactivate pi|use config) + | (TUI `/runtime` is forwarded via gateway/daemon command path) | | | v | SessionManager diff --git a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md index bfb1fd2..a2f208d 100644 --- a/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md +++ b/docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md @@ -12,6 +12,7 @@ If you only want the protocol surface, see `docs/api/PROTOCOL.md`. - Sessions persist in SQLite via `SessionManager` even if clients disconnect. - Once dequeued, message routing may execute the native orchestrator path or an optional external backend path (`claude_code`, `opencode`, `codex`, `gemini`, `pi_embedded`) depending on agent/backend config. - Runtime backend mode can be overridden manually via `/runtime` command fast-path (`status`, `activate pi`, `deactivate pi`, `use config`) and is persisted in preferences (`/backend` remains a compatibility alias). +- `flynn tui` now attaches to this same gateway command path for `/runtime ...` and auto-starts/attaches daemon+gateway when needed. - Backend routing outcomes are auditable via `backend.route` / `backend.success` / `backend.fallback`, which enables offline canary evaluation without changing gateway protocol methods. ## Component Map @@ -24,6 +25,7 @@ flowchart LR end subgraph GW[Gateway Process] + TUI[TUI client\n(runtime command forwarding)] WS[WebSocket connection\n(connectionId)] GS[GatewayServer] LQ[LaneQueue\nper-session FIFO] @@ -37,6 +39,7 @@ flowchart LR AO[AgentOrchestrator / External Backends] end + TUI --> WS WS --> GS QP --> GS BM --> GS diff --git a/docs/plans/state.json b/docs/plans/state.json index f6c82b5..51bef95 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -53,6 +53,54 @@ ], "test_status": "pnpm test:run src/daemon/routing.test.ts + pnpm typecheck passing" }, + "openclaw-gateway-first-tui-runtime-unification": { + "status": "completed", + "date": "2026-02-24", + "updated": "2026-02-24", + "summary": "Unified runtime backend mode control across channel router, gateway, and TUI by extracting a shared backend-mode command service, wiring gateway `agent.send` to that shared service, and forwarding minimal/fullscreen TUI `/runtime` through a gateway bridge. `flynn tui` now auto-attaches to gateway and auto-starts local daemon+gateway when unavailable while preserving TUI-local `/backend [provider]` switching.", + "files_modified": [ + "src/commands/runtimeBackendMode.ts", + "src/commands/index.ts", + "src/daemon/routing.ts", + "src/daemon/index.ts", + "src/daemon/services.ts", + "src/gateway/server.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "src/cli/tuiRuntimeGateway.ts", + "src/cli/tuiRuntimeGateway.test.ts", + "src/cli/tui.ts", + "src/frontends/tui/fullscreen.ts", + "src/frontends/tui/components/App.tsx", + "src/frontends/tui/minimal.ts", + "src/frontends/tui/minimal.test.ts", + "src/frontends/tui/commands.ts", + "README.md", + "docs/architecture/AGENT_DIAGRAM.md", + "docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md", + "docs/api/PROTOCOL.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/cli/tuiRuntimeGateway.test.ts src/frontends/tui/commands.test.ts src/frontends/tui/minimal.test.ts src/gateway/handlers/agent.test.ts src/daemon/routing.test.ts src/commands/builtin/index.test.ts + pnpm typecheck passing" + }, + "gateway-startup-eaddrinuse-hardening": { + "status": "completed", + "date": "2026-02-24", + "updated": "2026-02-24", + "summary": "Hardened gateway startup bind-failure handling so port collisions (`EADDRINUSE`) are rejected deterministically instead of surfacing as an unhandled WebSocketServer error event. Added clearer daemon startup error text, reordered startup to bind gateway before channel adapters (avoiding noisy channel teardown on bind failures), added bounded bind-retry logic for short restart races, made TUI auto-start treat `EADDRINUSE` as an attach race (retry connect) rather than immediate failure, and updated `make daemon-start`/`make daemon-restart` to rebuild before service start/restart so runtime fixes are always reflected in `dist`.", + "files_modified": [ + "Makefile", + "README.md", + "src/gateway/server.ts", + "src/gateway/server.test.ts", + "src/daemon/services.ts", + "src/daemon/startServices.test.ts", + "src/cli/tuiRuntimeGateway.ts", + "src/cli/tuiRuntimeGateway.test.ts", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/gateway/server.test.ts src/cli/tuiRuntimeGateway.test.ts src/daemon/startServices.test.ts src/daemon/services.test.ts src/daemon/routing.test.ts src/frontends/tui/minimal.test.ts + pnpm typecheck passing" + }, "pi-embedded-backend-canary-evaluation-phase": { "status": "completed", "date": "2026-02-24", @@ -6510,7 +6558,7 @@ } }, "overall_progress": { - "total_test_count": 1996, + "total_test_count": 2009, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", @@ -6543,7 +6591,9 @@ "next_up": "Track OpenClaw evolution regularly for inspiration and feature ideas", "pi_embedded_canary_spike": "completed — added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default", "pi_embedded_evaluation_phase": "completed — final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)", - "pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing" + "pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", + "openclaw_gateway_first_tui_runtime_unification": "completed — shared `/runtime` backend-mode command service across channel router + gateway, plus TUI `/runtime` forwarding through a gateway bridge with daemon/gateway auto-start attach", + "gateway_startup_eaddrinuse_hardening": "completed — gateway bind collisions now fail deterministically with explicit error handling and TUI auto-start treats EADDRINUSE as attach race with connect retry" }, "soul_md_and_cron_create": { "date": "2026-02-11", diff --git a/src/cli/tui.ts b/src/cli/tui.ts index d987eb3..6f7cb71 100644 --- a/src/cli/tui.ts +++ b/src/cli/tui.ts @@ -6,6 +6,7 @@ import { existsSync, mkdirSync, readFileSync } from 'fs'; import { resolve } from 'path'; import { homedir } from 'os'; import { setLogLevel } from '../logger.js'; +import { GatewayRuntimeCommandClient, TuiRuntimeGatewayBridge } from './tuiRuntimeGateway.js'; // ANSI color codes for tool status display const toolColors = { @@ -101,6 +102,37 @@ export function registerTuiCommand(program: Command): void { // choice if they set log_level to something more verbose. const tuiLogLevel = config.log_level === 'debug' ? 'debug' : 'warn'; setLogLevel(tuiLogLevel); + + const gatewayRuntimeBridge = new TuiRuntimeGatewayBridge({ + client: new GatewayRuntimeCommandClient({ + url: `ws://127.0.0.1:${config.server.port}`, + token: config.server.token, + }), + startDaemon: async () => { + const { startDaemon } = await import('../daemon/index.js'); + const daemon = await startDaemon(config, { + configPath, + persistConfigPath: configPath, + }); + return { + shutdown: async () => { + await daemon.lifecycle.shutdown(); + }, + }; + }, + }); + + try { + await gatewayRuntimeBridge.ensureReady(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error( + `Failed to attach to Flynn gateway runtime for TUI command unification: ${message}\n` + + 'Tried auto-starting daemon/gateway. Check server.port/token config and run `flynn start` manually to verify.', + ); + process.exit(1); + } + const { MinimalTui, startFullscreenTui } = await import('../frontends/tui/index.js'); const { NativeAgent } = await import('../backends/index.js'); const { @@ -253,6 +285,7 @@ export function registerTuiCommand(program: Command): void { cleanupPromise = (async () => { await lifecycle.shutdown(); sessionStore.close(); + await gatewayRuntimeBridge.shutdown(); })(); } return cleanupPromise; @@ -393,6 +426,7 @@ export function registerTuiCommand(program: Command): void { onTools: listAvailableTools, onResearch: delegateToResearchAgent, onCouncil: runCouncilTask, + onRuntimeCommand: (input) => gatewayRuntimeBridge.executeRuntimeCommand(input), onExit: () => { void cleanup(); }, @@ -411,6 +445,7 @@ export function registerTuiCommand(program: Command): void { onTools: listAvailableTools, onResearch: delegateToResearchAgent, onCouncil: runCouncilTask, + onRuntimeCommand: (input) => gatewayRuntimeBridge.executeRuntimeCommand(input), localProviders: config.models.local_providers, modelProviderConfigs, contextThresholdPct: config.compaction.threshold_pct, @@ -445,6 +480,7 @@ export function registerTuiCommand(program: Command): void { onTools: listAvailableTools, onResearch: delegateToResearchAgent, onCouncil: runCouncilTask, + onRuntimeCommand: (input) => gatewayRuntimeBridge.executeRuntimeCommand(input), onExit: () => { void cleanup(); }, diff --git a/src/cli/tuiRuntimeGateway.test.ts b/src/cli/tuiRuntimeGateway.test.ts new file mode 100644 index 0000000..1ed7945 --- /dev/null +++ b/src/cli/tuiRuntimeGateway.test.ts @@ -0,0 +1,138 @@ +import { EventEmitter } from 'node:events'; +import { describe, it, expect, vi } from 'vitest'; +import { WebSocket } from 'ws'; +import { GatewayRuntimeCommandClient, TuiRuntimeGatewayBridge } from './tuiRuntimeGateway.js'; + +class FakeWebSocket extends EventEmitter { + readyState: number = WebSocket.CONNECTING; + sentPayloads: string[] = []; + + constructor() { + super(); + queueMicrotask(() => { + this.readyState = WebSocket.OPEN; + this.emit('open'); + }); + } + + send(data: string, cb?: (error?: Error) => void): void { + this.sentPayloads.push(data); + cb?.(); + const payload = JSON.parse(data) as { id: number }; + this.emit('message', Buffer.from(JSON.stringify({ + id: payload.id, + event: 'done', + data: { content: 'Backend mode: config_default' }, + }))); + } + + close(code?: number, reason?: string): void { + this.readyState = WebSocket.CLOSED; + this.emit('close', code ?? 1000, Buffer.from(reason ?? '')); + } +} + +describe('GatewayRuntimeCommandClient', () => { + it('sends /runtime command through agent.send stream and resolves done content', async () => { + let createdSocket: FakeWebSocket | null = null; + const client = new GatewayRuntimeCommandClient({ + url: 'ws://127.0.0.1:18800', + websocketFactory: () => { + const ws = new FakeWebSocket(); + createdSocket = ws; + return ws as unknown as WebSocket; + }, + }); + + const output = await client.executeRuntimeCommand('status'); + + expect(output).toBe('Backend mode: config_default'); + expect(createdSocket).not.toBeNull(); + const sent = JSON.parse(createdSocket!.sentPayloads[0]) as { + method: string; + params: { message: string; metadata: { command: string; commandArgs?: string } }; + }; + expect(sent.method).toBe('agent.send'); + expect(sent.params.message).toBe('/runtime status'); + expect(sent.params.metadata.command).toBe('runtime'); + expect(sent.params.metadata.commandArgs).toBe('status'); + }); +}); + +describe('TuiRuntimeGatewayBridge', () => { + it('auto-starts daemon runtime when initial gateway connect fails', async () => { + const runtime = { shutdown: vi.fn(async () => {}) }; + const client = { + connect: vi.fn() + .mockRejectedValueOnce(new Error('connect ECONNREFUSED')) + .mockResolvedValue(undefined), + disconnect: vi.fn(), + executeRuntimeCommand: vi.fn(async () => 'Backend mode: force_native'), + }; + const startDaemon = vi.fn(async () => runtime); + + const bridge = new TuiRuntimeGatewayBridge({ + client, + startDaemon, + startupTimeoutMs: 500, + retryIntervalMs: 1, + sleep: async () => new Promise((resolve) => setTimeout(resolve, 1)), + }); + + const output = await bridge.executeRuntimeCommand('status'); + + expect(startDaemon).toHaveBeenCalledOnce(); + expect(client.connect).toHaveBeenCalledTimes(2); + expect(client.executeRuntimeCommand).toHaveBeenCalledWith('status'); + expect(output).toContain('force_native'); + + await bridge.shutdown(); + expect(client.disconnect).toHaveBeenCalledOnce(); + expect(runtime.shutdown).toHaveBeenCalledOnce(); + }); + + it('returns actionable error when gateway stays unavailable after auto-start', async () => { + const client = { + connect: vi.fn().mockRejectedValue(new Error('connect ECONNREFUSED')), + disconnect: vi.fn(), + executeRuntimeCommand: vi.fn(async () => ''), + }; + const bridge = new TuiRuntimeGatewayBridge({ + client, + startDaemon: vi.fn(async () => ({ shutdown: vi.fn(async () => {}) })), + startupTimeoutMs: 10, + retryIntervalMs: 1, + sleep: async () => new Promise((resolve) => setTimeout(resolve, 1)), + }); + + await expect(bridge.ensureReady()).rejects.toThrow('Gateway did not become ready after auto-start'); + }); + + it('treats EADDRINUSE during auto-start as an attach race and retries connect', async () => { + const client = { + connect: vi.fn() + .mockRejectedValueOnce(new Error('connect ECONNREFUSED')) + .mockResolvedValue(undefined), + disconnect: vi.fn(), + executeRuntimeCommand: vi.fn(async () => 'Backend mode: config_default'), + }; + const startDaemon = vi.fn(async () => { + const error = new Error('listen EADDRINUSE: address already in use 127.0.0.1:18800') as Error & { code?: string }; + error.code = 'EADDRINUSE'; + throw error; + }); + const bridge = new TuiRuntimeGatewayBridge({ + client, + startDaemon, + startupTimeoutMs: 500, + retryIntervalMs: 1, + sleep: async () => new Promise((resolve) => setTimeout(resolve, 1)), + }); + + const output = await bridge.executeRuntimeCommand('status'); + + expect(startDaemon).toHaveBeenCalledOnce(); + expect(client.connect).toHaveBeenCalledTimes(2); + expect(output).toContain('config_default'); + }); +}); diff --git a/src/cli/tuiRuntimeGateway.ts b/src/cli/tuiRuntimeGateway.ts new file mode 100644 index 0000000..4203975 --- /dev/null +++ b/src/cli/tuiRuntimeGateway.ts @@ -0,0 +1,396 @@ +import { WebSocket } from 'ws'; + +interface PendingRuntimeCommand { + resolve: (value: string) => void; + reject: (error: Error) => void; + timeout: NodeJS.Timeout; +} + +interface StartedRuntime { + shutdown: () => Promise; +} + +export interface RuntimeCommandClient { + connect: () => Promise; + disconnect: () => void; + executeRuntimeCommand: (input?: string) => Promise; +} + +export interface GatewayRuntimeCommandClientOptions { + url: string; + token?: string; + requestTimeoutMs?: number; + websocketFactory?: (url: string) => WebSocket; +} + +export class GatewayRuntimeCommandClient implements RuntimeCommandClient { + private readonly url: string; + private readonly token?: string; + private readonly requestTimeoutMs: number; + private readonly websocketFactory: (url: string) => WebSocket; + + private ws: WebSocket | null = null; + private connectPromise: Promise | null = null; + private nextId = 1; + private pending = new Map(); + + constructor(options: GatewayRuntimeCommandClientOptions) { + const timeoutMs = options.requestTimeoutMs ?? 15_000; + if (!Number.isFinite(timeoutMs) || timeoutMs <= 0) { + throw new Error('requestTimeoutMs must be a positive number'); + } + this.url = options.url; + this.token = options.token; + this.requestTimeoutMs = timeoutMs; + this.websocketFactory = options.websocketFactory ?? ((url) => new WebSocket(url)); + } + + get connected(): boolean { + return this.ws?.readyState === WebSocket.OPEN; + } + + async connect(): Promise { + if (this.connected) { + return; + } + + if (this.connectPromise) { + return this.connectPromise; + } + + this.connectPromise = this.openConnection(); + try { + await this.connectPromise; + } finally { + this.connectPromise = null; + } + } + + disconnect(): void { + const ws = this.ws; + this.ws = null; + this.rejectAllPending(new Error('Disconnected from gateway runtime command service')); + if (ws) { + ws.close(1000, 'TUI runtime bridge shutting down'); + } + } + + async executeRuntimeCommand(input?: string): Promise { + if (!this.connected) { + await this.connect(); + } + + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error('Gateway connection is not open'); + } + + const id = this.nextId++; + const commandArgs = (input ?? '').trim(); + const message = commandArgs ? `/runtime ${commandArgs}` : '/runtime'; + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + this.pending.delete(id); + reject(new Error('Timed out waiting for /runtime command response')); + }, this.requestTimeoutMs); + + this.pending.set(id, { resolve, reject, timeout }); + + const payload: Record = { + id, + method: 'agent.send', + params: { + message, + metadata: { + isCommand: true, + command: 'runtime', + ...(commandArgs ? { commandArgs } : {}), + }, + }, + }; + + this.ws?.send(JSON.stringify(payload), (error) => { + if (!error) { + return; + } + const pending = this.pending.get(id); + if (!pending) { + return; + } + clearTimeout(pending.timeout); + this.pending.delete(id); + pending.reject(error); + }); + }); + } + + private async openConnection(): Promise { + const ws = this.websocketFactory(withToken(this.url, this.token)); + + await new Promise((resolve, reject) => { + let settled = false; + + const onOpen = () => { + cleanup(); + settled = true; + this.ws = ws; + this.ws.on('message', (raw) => this.handleMessage(raw.toString())); + this.ws.on('close', () => { + if (this.ws === ws) { + this.ws = null; + this.rejectAllPending(new Error('Gateway connection closed')); + } + }); + this.ws.on('error', () => { + // close event handles pending rejection + }); + resolve(); + }; + + const onError = (error: Error) => { + cleanup(); + settled = true; + reject(error); + }; + + const onClose = (code: number, reason?: Buffer) => { + cleanup(); + if (settled) { + return; + } + settled = true; + const reasonText = reason?.toString().trim(); + if (code === 4003) { + reject(new Error('Gateway is locked by another client (code 4003)')); + return; + } + reject(new Error(reasonText ? `Gateway closed connection: ${reasonText}` : 'Gateway closed before connection was established')); + }; + + const cleanup = () => { + ws.off('open', onOpen); + ws.off('error', onError); + ws.off('close', onClose); + }; + + ws.once('open', onOpen); + ws.once('error', onError); + ws.once('close', onClose); + }); + } + + private handleMessage(raw: string): void { + let parsed: unknown; + try { + parsed = JSON.parse(raw); + } catch { + return; + } + + if (!parsed || typeof parsed !== 'object') { + return; + } + const message = parsed as { + id?: unknown; + event?: unknown; + data?: unknown; + error?: { message?: unknown }; + result?: unknown; + }; + + if (typeof message.id !== 'number') { + return; + } + + const pending = this.pending.get(message.id); + if (!pending) { + return; + } + + if (typeof message.event === 'string') { + if (message.event === 'done') { + this.resolvePending(message.id, getDoneContent(message.data)); + return; + } + if (message.event === 'error') { + this.rejectPending(message.id, new Error(getErrorMessage(message.data))); + } + return; + } + + if (message.error && typeof message.error.message === 'string') { + this.rejectPending(message.id, new Error(message.error.message)); + return; + } + + if (message.result !== undefined) { + const text = typeof message.result === 'string' + ? message.result + : JSON.stringify(message.result, null, 2); + this.resolvePending(message.id, text); + } + } + + private resolvePending(id: number, value: string): void { + const pending = this.pending.get(id); + if (!pending) { + return; + } + clearTimeout(pending.timeout); + this.pending.delete(id); + pending.resolve(value); + } + + private rejectPending(id: number, error: Error): void { + const pending = this.pending.get(id); + if (!pending) { + return; + } + clearTimeout(pending.timeout); + this.pending.delete(id); + pending.reject(error); + } + + private rejectAllPending(error: Error): void { + for (const [id, pending] of this.pending.entries()) { + clearTimeout(pending.timeout); + this.pending.delete(id); + pending.reject(error); + } + } +} + +export interface TuiRuntimeGatewayBridgeOptions { + client: RuntimeCommandClient; + startDaemon?: () => Promise; + startupTimeoutMs?: number; + retryIntervalMs?: number; + sleep?: (ms: number) => Promise; +} + +export class TuiRuntimeGatewayBridge { + private readonly client: RuntimeCommandClient; + private readonly startDaemon?: () => Promise; + private readonly startupTimeoutMs: number; + private readonly retryIntervalMs: number; + private readonly sleep: (ms: number) => Promise; + + private startedRuntime: StartedRuntime | null = null; + private ensurePromise: Promise | null = null; + + constructor(options: TuiRuntimeGatewayBridgeOptions) { + this.client = options.client; + this.startDaemon = options.startDaemon; + this.startupTimeoutMs = options.startupTimeoutMs ?? 10_000; + this.retryIntervalMs = options.retryIntervalMs ?? 250; + this.sleep = options.sleep ?? ((ms) => new Promise((resolve) => setTimeout(resolve, ms))); + } + + async ensureReady(): Promise { + if (this.ensurePromise) { + return this.ensurePromise; + } + this.ensurePromise = this.ensureReadyInner().finally(() => { + this.ensurePromise = null; + }); + return this.ensurePromise; + } + + async executeRuntimeCommand(input?: string): Promise { + await this.ensureReady(); + return this.client.executeRuntimeCommand(input); + } + + async shutdown(): Promise { + this.client.disconnect(); + if (this.startedRuntime) { + await this.startedRuntime.shutdown(); + this.startedRuntime = null; + } + } + + private async ensureReadyInner(): Promise { + try { + await this.client.connect(); + return; + } catch (error) { + const firstMessage = error instanceof Error ? error.message : String(error); + + if (!this.startDaemon) { + throw new Error(`Gateway connection failed: ${firstMessage}`); + } + + if (!this.startedRuntime) { + try { + this.startedRuntime = await this.startDaemon(); + } catch (startError) { + if (!isAddressInUseError(startError)) { + throw startError; + } + // Another process won the race to bind the gateway port. + // Treat as attach race and continue with connect retries. + } + } + + const deadline = Date.now() + this.startupTimeoutMs; + let lastMessage = firstMessage; + while (Date.now() <= deadline) { + try { + await this.client.connect(); + return; + } catch (retryError) { + lastMessage = retryError instanceof Error ? retryError.message : String(retryError); + await this.sleep(this.retryIntervalMs); + } + } + + throw new Error(`Gateway did not become ready after auto-start (${this.startupTimeoutMs}ms timeout). Last error: ${lastMessage}`); + } + } +} + +function withToken(url: string, token?: string): string { + if (!token) { + return url; + } + + const parsed = new URL(url); + parsed.searchParams.set('token', token); + return parsed.toString(); +} + +function getDoneContent(data: unknown): string { + if (typeof data === 'string') { + return data; + } + if (data && typeof data === 'object' && 'content' in data && typeof (data as { content?: unknown }).content === 'string') { + return (data as { content: string }).content; + } + if (data === undefined || data === null) { + return ''; + } + return JSON.stringify(data, null, 2); +} + +function getErrorMessage(data: unknown): string { + if (typeof data === 'string') { + return data; + } + if (data && typeof data === 'object' && 'message' in data && typeof (data as { message?: unknown }).message === 'string') { + return (data as { message: string }).message; + } + return 'Runtime command failed'; +} + +function isAddressInUseError(error: unknown): boolean { + if (!error || typeof error !== 'object') { + return false; + } + if ('code' in error && (error as { code?: unknown }).code === 'EADDRINUSE') { + return true; + } + if ('message' in error && typeof (error as { message?: unknown }).message === 'string') { + return (error as { message: string }).message.includes('EADDRINUSE'); + } + return false; +} diff --git a/src/commands/index.ts b/src/commands/index.ts index c5aa884..f4555a9 100644 --- a/src/commands/index.ts +++ b/src/commands/index.ts @@ -1,5 +1,10 @@ export { CommandRegistry } from './registry.js'; export type { CommandContext, CommandDefinition, CommandResult, CommandServices } from './types.js'; +export { + executeRuntimeBackendModeCommand, + formatRuntimeBackendStatusLine, +} from './runtimeBackendMode.js'; +export type { RuntimeBackendMode } from './runtimeBackendMode.js'; export { createHelpCommand, createStatusCommand, diff --git a/src/commands/runtimeBackendMode.ts b/src/commands/runtimeBackendMode.ts new file mode 100644 index 0000000..d8fe048 --- /dev/null +++ b/src/commands/runtimeBackendMode.ts @@ -0,0 +1,94 @@ +export type RuntimeBackendMode = 'config_default' | 'force_native' | 'force_pi_embedded'; + +export interface RuntimeBackendModeCommandContext { + getActiveTier: () => string; + getBackendMode: () => RuntimeBackendMode; + getConfiguredDefaultBackend: () => string; + getEffectiveDefaultBackend: () => string; + getAvailableExternalBackends: () => string[]; + setBackendMode?: (mode: RuntimeBackendMode) => void; +} + +function normalizeRuntimeInput(inputRaw: string): string { + let normalized = inputRaw.trim().toLowerCase(); + // Accept subcommand-only input and accidental full command input. + normalized = normalized.replace(/^(?:\/)?(?:runtime|backend)\b/, '').trim(); + normalized = normalized.replace(/^\//, '').trim(); + return normalized; +} + +export function formatRuntimeBackendStatusLine(ctx: RuntimeBackendModeCommandContext): string { + const availableExternal = [...ctx.getAvailableExternalBackends()].sort().join(', ') || 'none'; + return [ + `Flynn is running. Active model tier: ${ctx.getActiveTier()}. Backend: ${ctx.getEffectiveDefaultBackend()}`, + `Backend mode: ${ctx.getBackendMode()}`, + `Configured default: ${ctx.getConfiguredDefaultBackend()}`, + `Available external backends: ${availableExternal}`, + ].join('\n'); +} + +export function executeRuntimeBackendModeCommand( + inputRaw: string, + ctx: RuntimeBackendModeCommandContext, +): string { + const normalized = normalizeRuntimeInput(inputRaw); + + if (!normalized || normalized === 'status' || normalized === 'show') { + return formatRuntimeBackendStatusLine(ctx); + } + + if (!ctx.setBackendMode) { + return 'Backend mode control is not available in this runtime.'; + } + + if ( + normalized === 'activate pi' + || normalized === 'activate pi_embedded' + || normalized === 'activate pi-embedded' + ) { + ctx.setBackendMode('force_pi_embedded'); + return [ + 'Pi embedded backend activated globally.', + formatRuntimeBackendStatusLine(ctx), + ].join('\n\n'); + } + + if ( + normalized === 'deactivate pi' + || normalized === 'deactivate pi_embedded' + || normalized === 'deactivate pi-embedded' + ) { + ctx.setBackendMode('force_native'); + return [ + 'Pi embedded backend deactivated globally. Native is now forced for Pi-routed turns.', + formatRuntimeBackendStatusLine(ctx), + ].join('\n\n'); + } + + if ( + normalized === 'use config' + || normalized === 'reset' + || normalized === 'auto' + || normalized === 'config' + ) { + ctx.setBackendMode('config_default'); + return [ + 'Backend mode reset to config default.', + formatRuntimeBackendStatusLine(ctx), + ].join('\n\n'); + } + + return [ + 'Usage:', + '/runtime status', + '/runtime activate pi', + '/runtime deactivate pi', + '/runtime use config', + '', + 'Alias:', + '/backend status', + '/backend activate pi', + '/backend deactivate pi', + '/backend use config', + ].join('\n'); +} diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 473948f..366eca3 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -249,6 +249,11 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions): const gateway = createGateway({ config, configPath: options?.persistConfigPath ?? options?.configPath, sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor, channelRegistry, pairingManager, lifecycle, memoryStore, + getBackendMode: () => backendMode, + setBackendMode: (mode) => { + backendMode = mode; + savePreference(dataDir, 'backendMode', mode); + }, getChannelAgents: () => channelAgents, commandRegistry, intentRegistry, routingPolicy, hookEngine, }); diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index e835e2a..7118aef 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -18,6 +18,11 @@ import { ToolRegistry, ToolExecutor } from '../tools/index.js'; import { SessionManager } from '../session/index.js'; import { AgentConfigRegistry, AgentRouter } from '../agents/index.js'; import type { CommandRegistry } from '../commands/index.js'; +import { + executeRuntimeBackendModeCommand, + formatRuntimeBackendStatusLine, + type RuntimeBackendMode, +} from '../commands/index.js'; import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; import type { HookEngine } from '../hooks/index.js'; @@ -31,7 +36,7 @@ import { dirname, resolve } from 'path'; import { loadCouncilScaffoldSafe } from '../councils/scaffold.js'; import { buildCouncilPreflightReport, shouldRunCouncilPreflight } from '../councils/preflight.js'; -export type BackendRuntimeMode = 'config_default' | 'force_native' | 'force_pi_embedded'; +export type BackendRuntimeMode = RuntimeBackendMode; function buildProviderConfigMap(config: Config): Partial> { const providerConfigs: Partial> = {}; @@ -393,17 +398,29 @@ export function createMessageRouter(deps: { return requestedBackend; } - function formatBackendStatusLine(activeTier: string): string { - const mode = getBackendMode(); - const configuredDefault = getConfiguredOrFallbackDefaultBackend(); - const effectiveDefault = resolveRoutableBackend(getEffectiveDefaultBackend()); - const availableExternal = Object.keys(deps.externalBackends ?? {}).sort().join(', ') || 'none'; - return [ - `Flynn is running. Active model tier: ${activeTier}. Backend: ${effectiveDefault}`, - `Backend mode: ${mode}`, - `Configured default: ${configuredDefault}`, - `Available external backends: ${availableExternal}`, - ].join('\n'); + function listAvailableExternalBackends(): string[] { + return Object.keys(deps.externalBackends ?? {}); + } + + function formatBackendStatus(activeTier: string): string { + return formatRuntimeBackendStatusLine({ + getActiveTier: () => activeTier, + getBackendMode, + getConfiguredDefaultBackend: getConfiguredOrFallbackDefaultBackend, + getEffectiveDefaultBackend: () => resolveRoutableBackend(getEffectiveDefaultBackend()), + getAvailableExternalBackends: listAvailableExternalBackends, + }); + } + + function executeBackendCommand(inputRaw: string, activeTier: string): string { + return executeRuntimeBackendModeCommand(inputRaw, { + getActiveTier: () => activeTier, + getBackendMode, + setBackendMode: deps.setBackendMode, + getConfiguredDefaultBackend: getConfiguredOrFallbackDefaultBackend, + getEffectiveDefaultBackend: () => resolveRoutableBackend(getEffectiveDefaultBackend()), + getAvailableExternalBackends: listAvailableExternalBackends, + }); } async function maybeBuildTtsAttachment(responseText: string, channel: string) { @@ -823,7 +840,7 @@ export function createMessageRouter(deps: { rawInput: commandInput, services: { getStatus: () => { - return formatBackendStatusLine(agent.getModelTier()); + return formatBackendStatus(agent.getModelTier()); }, getTools: () => { const names = new Set(deps.toolRegistry.list().map((tool: Tool) => tool.name)); @@ -1203,71 +1220,7 @@ export function createMessageRouter(deps: { return `Session transferred to ${destinationLabel}`; }, - backendCommand: (inputRaw: string) => { - let normalized = inputRaw.trim().toLowerCase(); - // Accept both subcommand-only input ("status") and accidental full-command - // input ("/runtime status", "runtime status", "/backend status"). - normalized = normalized.replace(/^(?:\/)?(?:runtime|backend)\b/, '').trim(); - normalized = normalized.replace(/^\//, '').trim(); - if (!normalized || normalized === 'status' || normalized === 'show') { - return formatBackendStatusLine(agent.getModelTier()); - } - - if (!deps.setBackendMode) { - return 'Backend mode control is not available in this runtime.'; - } - - if ( - normalized === 'activate pi' - || normalized === 'activate pi_embedded' - || normalized === 'activate pi-embedded' - ) { - deps.setBackendMode('force_pi_embedded'); - return [ - 'Pi embedded backend activated globally.', - formatBackendStatusLine(agent.getModelTier()), - ].join('\n\n'); - } - - if ( - normalized === 'deactivate pi' - || normalized === 'deactivate pi_embedded' - || normalized === 'deactivate pi-embedded' - ) { - deps.setBackendMode('force_native'); - return [ - 'Pi embedded backend deactivated globally. Native is now forced for Pi-routed turns.', - formatBackendStatusLine(agent.getModelTier()), - ].join('\n\n'); - } - - if ( - normalized === 'use config' - || normalized === 'reset' - || normalized === 'auto' - || normalized === 'config' - ) { - deps.setBackendMode('config_default'); - return [ - 'Backend mode reset to config default.', - formatBackendStatusLine(agent.getModelTier()), - ].join('\n\n'); - } - - return [ - 'Usage:', - '/runtime status', - '/runtime activate pi', - '/runtime deactivate pi', - '/runtime use config', - '', - 'Alias:', - '/backend status', - '/backend activate pi', - '/backend deactivate pi', - '/backend use config', - ].join('\n'); - }, + backendCommand: (inputRaw: string) => executeBackendCommand(inputRaw, agent.getModelTier()), getApprovals: () => { if (!deps.hookEngine) { diff --git a/src/daemon/services.ts b/src/daemon/services.ts index e31005b..ef051c5 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -24,7 +24,7 @@ import { assembleSystemPrompt } from '../prompt/index.js'; import { join, relative, resolve, sep } from 'path'; import { homedir } from 'os'; import type { MemoryStore } from '../memory/store.js'; -import type { CommandRegistry } from '../commands/index.js'; +import type { CommandRegistry, RuntimeBackendMode } from '../commands/index.js'; import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; import type { HookEngine } from '../hooks/index.js'; @@ -293,6 +293,8 @@ export interface GatewayDeps { getChannelAgents: () => Map | null; memoryStore?: MemoryStore; commandRegistry?: CommandRegistry; + getBackendMode?: () => RuntimeBackendMode; + setBackendMode?: (mode: RuntimeBackendMode) => void; intentRegistry?: ComponentRegistry; routingPolicy?: RoutingPolicy; hookEngine?: HookEngine; @@ -388,6 +390,8 @@ export function createGateway(deps: GatewayDeps): GatewayServer { channelRegistry, pairingManager, memoryStore: deps.memoryStore, + getBackendMode: deps.getBackendMode, + setBackendMode: deps.setBackendMode, restart: async () => { console.log('Restart requested via gateway'); await lifecycle.shutdown(); @@ -472,25 +476,31 @@ export async function startServices(deps: { memoryStore?: MemoryStore; memoryDir: string; dataDir: string; + gatewayStartRetry?: { + maxAttempts?: number; + retryDelayMs?: number; + sleep?: (ms: number) => Promise; + }; }): Promise { const { config, lifecycle, channelRegistry, gateway, modelRouter, memoryStore, memoryDir, dataDir } = deps; - // Register shutdown handler for channels - lifecycle.onShutdown(async () => { - await channelRegistry.stopAll(); - console.log('Channel adapters stopped'); - }); - - // Start all channel adapters - await channelRegistry.startAll(); - // Start gateway (HTTP + WS server) lifecycle.onShutdown(async () => { await gateway.stop(); console.log('Gateway server stopped'); }); - await gateway.start(); + const host = config.server.localhost ? '127.0.0.1' : '0.0.0.0'; + await startGatewayWithRetry(gateway, host, config.server.port, deps.gatewayStartRetry); + + // Register shutdown handler for channels + lifecycle.onShutdown(async () => { + await channelRegistry.stopAll(); + console.log('Channel adapters stopped'); + }); + + // Start all channel adapters after gateway bind succeeds. + await channelRegistry.startAll(); // Tailscale Serve if (config.server.tailscale?.serve) { @@ -589,3 +599,53 @@ export async function startServices(deps: { console.log('Flynn daemon started'); } + +function isAddressInUseError(error: unknown): error is NodeJS.ErrnoException { + return ( + typeof error === 'object' + && error !== null + && 'code' in error + && (error as NodeJS.ErrnoException).code === 'EADDRINUSE' + ); +} + +async function startGatewayWithRetry( + gateway: Pick, + host: string, + port: number, + retry?: { + maxAttempts?: number; + retryDelayMs?: number; + sleep?: (ms: number) => Promise; + }, +): Promise { + const maxAttempts = Math.max(1, retry?.maxAttempts ?? 10); + const retryDelayMs = Math.max(0, retry?.retryDelayMs ?? 500); + const sleep = retry?.sleep ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))); + + for (let attempt = 1; attempt <= maxAttempts; attempt += 1) { + try { + await gateway.start(); + return; + } catch (error) { + if (!isAddressInUseError(error)) { + throw error; + } + + await gateway.stop().catch(() => {}); + + if (attempt === maxAttempts) { + throw new Error( + `Gateway bind failed: ${host}:${port} is already in use after ${maxAttempts} attempts. ` + + 'Another Flynn daemon or process is already listening on this port.', + ); + } + + console.warn( + `Gateway bind collision on ${host}:${port} (attempt ${attempt}/${maxAttempts}); ` + + `retrying in ${retryDelayMs}ms...`, + ); + await sleep(retryDelayMs); + } + } +} diff --git a/src/daemon/startServices.test.ts b/src/daemon/startServices.test.ts new file mode 100644 index 0000000..c204638 --- /dev/null +++ b/src/daemon/startServices.test.ts @@ -0,0 +1,139 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import { configSchema } from '../config/schema.js'; +import { Lifecycle } from './lifecycle.js'; +import { startServices } from './services.js'; + +vi.mock('../automation/index.js', () => { + return { + HeartbeatMonitor: class { + start(): void {} + stop(): void {} + }, + MinioSyncScheduler: class { + start(): void {} + stop(): void {} + }, + }; +}); + +function makeConfig(overrides: Record = {}) { + return configSchema.parse({ + telegram: { bot_token: 'test-token', allowed_chat_ids: [1] }, + models: { default: { provider: 'anthropic', model: 'claude-3' } }, + ...overrides, + }); +} + +describe('startServices startup ordering', () => { + afterEach(async () => { + vi.restoreAllMocks(); + }); + + it('fails after bounded retries on persistent gateway bind collision before starting channels', async () => { + const lifecycle = new Lifecycle(); + const config = makeConfig({ server: { localhost: true, port: 18800 } }); + const startError = new Error('listen EADDRINUSE: address already in use 127.0.0.1:18800') as Error & { code?: string }; + startError.code = 'EADDRINUSE'; + + const channelRegistry = { + startAll: vi.fn(async () => {}), + stopAll: vi.fn(async () => {}), + }; + const gateway = { + start: vi.fn(async () => { throw startError; }), + stop: vi.fn(async () => {}), + getMetrics: vi.fn(() => ({ getModelMetrics: () => [] })), + }; + + await expect(startServices({ + config, + lifecycle, + channelRegistry: channelRegistry as never, + gateway: gateway as never, + modelRouter: {} as never, + memoryDir: '/tmp', + dataDir: '/tmp', + gatewayStartRetry: { + maxAttempts: 3, + retryDelayMs: 0, + sleep: async () => {}, + }, + })).rejects.toThrow('Gateway bind failed'); + + expect(channelRegistry.startAll).not.toHaveBeenCalled(); + expect(channelRegistry.stopAll).not.toHaveBeenCalled(); + expect(gateway.start).toHaveBeenCalledTimes(3); + expect(gateway.stop).toHaveBeenCalledTimes(3); + }); + + it('retries gateway bind collisions and then starts channels on success', async () => { + const lifecycle = new Lifecycle(); + const config = makeConfig({ server: { localhost: true, port: 18800 } }); + const startError = new Error('listen EADDRINUSE: address already in use 127.0.0.1:18800') as Error & { code?: string }; + startError.code = 'EADDRINUSE'; + const order: string[] = []; + + const channelRegistry = { + startAll: vi.fn(async () => { order.push('channels.start'); }), + stopAll: vi.fn(async () => {}), + }; + const gateway = { + start: vi.fn(async () => { + order.push('gateway.start'); + if (gateway.start.mock.calls.length === 1) { + throw startError; + } + }), + stop: vi.fn(async () => { order.push('gateway.stop'); }), + getMetrics: vi.fn(() => ({ getModelMetrics: () => [] })), + }; + + await startServices({ + config, + lifecycle, + channelRegistry: channelRegistry as never, + gateway: gateway as never, + modelRouter: {} as never, + memoryDir: '/tmp', + dataDir: '/tmp', + gatewayStartRetry: { + maxAttempts: 3, + retryDelayMs: 0, + sleep: async () => {}, + }, + }); + + expect(order).toEqual(['gateway.start', 'gateway.stop', 'gateway.start', 'channels.start']); + expect(channelRegistry.startAll).toHaveBeenCalledOnce(); + await lifecycle.shutdown(); + }); + + it('starts gateway before channels when startup succeeds', async () => { + const lifecycle = new Lifecycle(); + const config = makeConfig({ server: { localhost: true, port: 18800 } }); + const order: string[] = []; + + const channelRegistry = { + startAll: vi.fn(async () => { order.push('channels.start'); }), + stopAll: vi.fn(async () => {}), + }; + const gateway = { + start: vi.fn(async () => { order.push('gateway.start'); }), + stop: vi.fn(async () => {}), + getMetrics: vi.fn(() => ({ getModelMetrics: () => [] })), + }; + + await startServices({ + config, + lifecycle, + channelRegistry: channelRegistry as never, + gateway: gateway as never, + modelRouter: {} as never, + memoryDir: '/tmp', + dataDir: '/tmp', + }); + + expect(order).toEqual(['gateway.start', 'channels.start']); + await lifecycle.shutdown(); + }); +}); diff --git a/src/frontends/tui/commands.ts b/src/frontends/tui/commands.ts index 599a026..bebe25c 100644 --- a/src/frontends/tui/commands.ts +++ b/src/frontends/tui/commands.ts @@ -155,7 +155,7 @@ export function parseCommand(input: string): Command | null { return { type: 'backend', provider }; } - // Runtime backend mode control (daemon/channel command; reserved in TUI) + // Runtime backend mode control (forwarded via gateway/daemon command service) if (trimmed === '/runtime') { return { type: 'runtime' }; } @@ -233,7 +233,7 @@ Commands: /model [name] Show or switch model tier (local, default, fast, complex) /model

Change tier's provider/model (e.g. /model default anthropic/claude-sonnet-4) /backend [provider] Show or switch local backend (ollama, llamacpp) - /runtime [args] Runtime backend mode control (daemon/channel sessions) + /runtime [args] Runtime backend mode control (forwarded via gateway/daemon command service) /research Delegate a task to the configured research agent /council Run the councils pipeline for a task /council preflight Check council tier routing, endpoint/auth mode, and probe latency @@ -305,7 +305,7 @@ export const COMMAND_TOOLTIPS: Record = { '/tools': 'Show authoritative runtime tool list for this session', '/model': 'Show or switch model (local, default, fast, complex)', '/backend': 'Show or switch local backend (ollama, llamacpp)', - '/runtime': 'Runtime backend mode control (daemon/channel command; not local TUI backend switch)', + '/runtime': 'Runtime backend mode control via gateway/daemon command service (not local /backend provider switch)', '/research': 'Delegate a task to the configured research agent', '/council': 'Run the councils pipeline for a task; use "/council preflight" for route/auth checks', '/reset': 'Clear conversation history', diff --git a/src/frontends/tui/components/App.tsx b/src/frontends/tui/components/App.tsx index bbe6de4..b804303 100644 --- a/src/frontends/tui/components/App.tsx +++ b/src/frontends/tui/components/App.tsx @@ -62,6 +62,7 @@ export interface AppProps { onTools?: () => string; onResearch?: (task: string) => Promise | string; onCouncil?: (task: string) => Promise | string; + onRuntimeCommand?: (input?: string) => Promise | string; onExit?: () => void; } @@ -82,6 +83,7 @@ export function App({ onTools, onResearch, onCouncil, + onRuntimeCommand, onExit, }: AppProps): React.ReactElement { const ensureTimestamp = useCallback((message: Message): Message => ({ @@ -564,14 +566,20 @@ export function App({ } case 'runtime': { - pushAssistantMessage( - 'Runtime backend mode command is not available in fullscreen TUI mode.\n' - + 'Use it in daemon/channel sessions:\n' - + '/runtime status\n' - + '/runtime activate pi\n' - + '/runtime deactivate pi\n' - + '/runtime use config', - ); + if (!onRuntimeCommand) { + pushAssistantMessage( + 'Runtime backend mode command service is unavailable in this TUI session.\n' + + 'Use `flynn start` and reconnect.', + ); + return; + } + try { + const response = await onRuntimeCommand(command.input); + pushAssistantMessage(response); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + pushAssistantMessage(`Runtime command failed: ${message}`); + } return; } @@ -841,6 +849,7 @@ export function App({ pairingManager, modelProviderConfigs, onTransfer, + onRuntimeCommand, ]); return ( diff --git a/src/frontends/tui/fullscreen.ts b/src/frontends/tui/fullscreen.ts index c0c8560..afd7052 100644 --- a/src/frontends/tui/fullscreen.ts +++ b/src/frontends/tui/fullscreen.ts @@ -26,6 +26,7 @@ export interface FullscreenTuiConfig { onTools?: () => string; onResearch?: (task: string) => Promise | string; onCouncil?: (task: string) => Promise | string; + onRuntimeCommand?: (input?: string) => Promise | string; onExit?: () => void; } @@ -57,6 +58,7 @@ export async function startFullscreenTui(config: FullscreenTuiConfig): Promise { } }); - it('prints guidance when /runtime is invoked in TUI mode', async () => { + it('forwards /runtime command through runtime command callback', async () => { + const mockSession = { + id: 'test', + getHistory: () => [], + addMessage: vi.fn(), + clear: vi.fn(), + replaceHistory: vi.fn(), + }; + const onRuntimeCommand = vi.fn(async () => 'Backend mode: config_default'); + const logSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); + try { + const tui = new MinimalTui({ + session: asSession(mockSession), + modelClient: asModelClient({}), + systemPrompt: 'test', + onRuntimeCommand, + }); + + await minimalTuiPrivates(tui).handleCommand({ type: 'runtime', input: 'status' }); + expect(onRuntimeCommand).toHaveBeenCalledWith('status'); + expect(logSpy).toHaveBeenCalledWith('Backend mode: config_default\n'); + } finally { + logSpy.mockRestore(); + } + }); + + it('prints guidance when runtime command service is unavailable', async () => { const mockSession = { id: 'test', getHistory: () => [], @@ -420,7 +446,33 @@ describe('MinimalTui backend command', () => { }); await minimalTuiPrivates(tui).handleCommand({ type: 'runtime', input: 'status' }); - expect(logSpy).toHaveBeenCalledWith(expect.stringContaining('Runtime backend mode command is not available in this TUI mode.')); + expect(logSpy).toHaveBeenCalledWith(expect.stringContaining('Runtime backend mode command service is unavailable in this TUI session.')); + } finally { + logSpy.mockRestore(); + } + }); + + it('keeps /backend status local-only and does not invoke runtime command callback', async () => { + const mockSession = { + id: 'test', + getHistory: () => [], + addMessage: vi.fn(), + clear: vi.fn(), + replaceHistory: vi.fn(), + }; + const onRuntimeCommand = vi.fn(async () => 'should not be called'); + const logSpy = vi.spyOn(console, 'log').mockImplementation(() => {}); + try { + const tui = new MinimalTui({ + session: asSession(mockSession), + modelClient: asModelClient({}), + systemPrompt: 'test', + onRuntimeCommand, + }); + + await minimalTuiPrivates(tui).handleCommand({ type: 'backend', provider: 'status' }); + expect(onRuntimeCommand).not.toHaveBeenCalled(); + expect(logSpy).toHaveBeenCalledWith(expect.stringContaining('Backend switching not available.')); } finally { logSpy.mockRestore(); } diff --git a/src/frontends/tui/minimal.ts b/src/frontends/tui/minimal.ts index c757aae..bf8bb45 100644 --- a/src/frontends/tui/minimal.ts +++ b/src/frontends/tui/minimal.ts @@ -71,6 +71,7 @@ export interface MinimalTuiConfig { onTools?: () => string; onResearch?: (task: string) => Promise | string; onCouncil?: (task: string) => Promise | string; + onRuntimeCommand?: (input?: string) => Promise | string; localProviders?: Record; modelProviderConfigs?: Partial>; currentLocalProvider?: string; @@ -525,7 +526,7 @@ export class MinimalTui { break; case 'runtime': - this.handleRuntimeCommand(command.input); + await this.handleRuntimeCommand(command.input); break; case 'login': @@ -899,14 +900,20 @@ export class MinimalTui { console.log(`${colors.gray}Switched to backend: ${provider}${colors.reset}\n`); } - private handleRuntimeCommand(_input?: string): void { - console.log(`${colors.gray}Runtime backend mode command is not available in this TUI mode.${colors.reset}`); - console.log(`${colors.gray}Use it in daemon/channel sessions:${colors.reset}`); - console.log(' /runtime status'); - console.log(' /runtime activate pi'); - console.log(' /runtime deactivate pi'); - console.log(' /runtime use config'); - console.log(''); + private async handleRuntimeCommand(input?: string): Promise { + if (!this.config.onRuntimeCommand) { + console.log(`${colors.gray}Runtime backend mode command service is unavailable in this TUI session.${colors.reset}`); + console.log(`${colors.gray}Use 'flynn start' and reconnect.${colors.reset}\n`); + return; + } + + try { + const output = await this.config.onRuntimeCommand(input); + console.log(`${output}\n`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.log(`${colors.gray}Runtime command failed:${colors.reset} ${message}\n`); + } } private async stopBackend(provider: string): Promise { diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 0548c70..699da78 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -241,6 +241,67 @@ describe('createAgentHandlers command fast-path', () => { expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Cancellation requested'); }); + it('handles /runtime status via command fast-path without calling agent.process', async () => { + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + const req: GatewayRequest = { + id: 13, + method: 'agent.send', + params: { + message: '/runtime status', + connectionId: 'conn-1', + metadata: { isCommand: true, command: 'runtime', commandArgs: 'status' }, + }, + }; + + await handlers['agent.send'](req, send); + + expect(mockAgent.process).not.toHaveBeenCalled(); + expect((sent[0] as GatewayEvent).event).toBe('done'); + expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Backend mode:'); + }); + + it('handles /runtime deactivate pi via shared backend mode service callbacks', async () => { + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + let backendMode: 'config_default' | 'force_native' | 'force_pi_embedded' = 'force_pi_embedded'; + const handlersWithBackendMode = createAgentHandlers({ + sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'], + laneQueue: new LaneQueue(), + sessionManager: sessionManager as unknown as AgentHandlerDeps['sessionManager'], + commandRegistry, + runtimeConfig: { + backends: { + default: 'pi_embedded', + claude_code: { enabled: false }, + opencode: { enabled: false }, + codex: { enabled: false }, + gemini: { enabled: false }, + pi_embedded: { enabled: true }, + }, + } as unknown as AgentHandlerDeps['runtimeConfig'], + getBackendMode: () => backendMode, + setBackendMode: (mode) => { + backendMode = mode; + }, + }); + const req: GatewayRequest = { + id: 14, + method: 'agent.send', + params: { + message: '/runtime deactivate pi', + connectionId: 'conn-1', + metadata: { isCommand: true, command: 'runtime', commandArgs: 'deactivate pi' }, + }, + }; + + await handlersWithBackendMode['agent.send'](req, send); + + expect(mockAgent.process).not.toHaveBeenCalled(); + expect(backendMode).toBe('force_native'); + expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('deactivated globally'); + }); + it('falls through to agent.process for unknown commands', async () => { const sent: OutboundMessage[] = []; const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index f4936b5..8b067f7 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -10,7 +10,11 @@ import type { Attachment } from '../../channels/types.js'; import type { SessionManager } from '../../session/manager.js'; import type { ModelTier } from '../../models/router.js'; import type { ModelRouter } from '../../models/router.js'; -import type { CommandRegistry } from '../../commands/index.js'; +import { + executeRuntimeBackendModeCommand, + type CommandRegistry, + type RuntimeBackendMode, +} from '../../commands/index.js'; import type { Config, ModelConfig, ModelProvider } from '../../config/index.js'; import { MODEL_PROVIDERS } from '../../config/index.js'; import { createClientFromConfig } from '../../daemon/models.js'; @@ -33,6 +37,8 @@ export interface AgentHandlerDeps { modelRouter?: ModelRouter; runtimeConfig?: Config; hookEngine?: HookEngine; + getBackendMode?: () => RuntimeBackendMode; + setBackendMode?: (mode: RuntimeBackendMode) => void; } function buildProviderConfigMap(config: Config): Partial> { @@ -55,6 +61,30 @@ function buildProviderConfigMap(config: Config): Partial => { @@ -437,6 +467,33 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { }); }, + backendCommand: (inputRaw: string) => { + const availableExternalBackends = listEnabledExternalBackends(deps.runtimeConfig); + const mode = deps.getBackendMode?.() ?? 'config_default'; + const configuredDefault = deps.runtimeConfig?.backends.default ?? 'native'; + const getEffectiveDefaultBackend = (): string => { + if (mode === 'force_native') { + return 'native'; + } + if (mode === 'force_pi_embedded') { + return availableExternalBackends.includes('pi_embedded') ? 'pi_embedded' : 'native'; + } + if (configuredDefault === 'native') { + return 'native'; + } + return availableExternalBackends.includes(configuredDefault) ? configuredDefault : 'native'; + }; + + return executeRuntimeBackendModeCommand(inputRaw, { + getActiveTier: () => agent.getModelTier(), + getBackendMode: () => deps.getBackendMode?.() ?? 'config_default', + setBackendMode: deps.setBackendMode, + getConfiguredDefaultBackend: () => configuredDefault, + getEffectiveDefaultBackend, + getAvailableExternalBackends: () => availableExternalBackends, + }); + }, + getQueue: () => { const mode = resolvedPolicy?.mode ?? 'collect'; const cap = resolvedPolicy?.cap ?? 50; diff --git a/src/gateway/server.test.ts b/src/gateway/server.test.ts index 6ab2234..c34909c 100644 --- a/src/gateway/server.test.ts +++ b/src/gateway/server.test.ts @@ -179,6 +179,36 @@ describe('GatewayServer integration', () => { } }); + it('rejects startup with EADDRINUSE when port is already bound', async () => { + if (!LISTEN_ALLOWED) { + return; + } + + const blockingServer = createServer(); + await new Promise((resolve, reject) => { + blockingServer.once('error', reject); + blockingServer.listen(18893, '127.0.0.1', () => resolve()); + }); + + const conflicting = new GatewayServer({ + port: 18893, + sessionManager: mockSessionManager as unknown as GatewayServerConfig['sessionManager'], + modelClient: mockModelClient, + systemPrompt: 'Test prompt', + toolRegistry: mockToolRegistry as unknown as GatewayServerConfig['toolRegistry'], + toolExecutor: mockToolExecutor as unknown as GatewayServerConfig['toolExecutor'], + version: '0.1.0-test', + uiDir: resolve(import.meta.dirname, 'ui'), + }); + + try { + await expect(conflicting.start()).rejects.toMatchObject({ code: 'EADDRINUSE' }); + } finally { + await new Promise((resolveClose) => blockingServer.close(() => resolveClose())); + await conflicting.stop(); + } + }); + it('lists tools via tools.list', async () => { if (!LISTEN_ALLOWED) { return; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 9ae5b76..9782bba 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -48,6 +48,7 @@ import type { GmailWatcher } from '../automation/gmail.js'; import type { PairingManager } from '../channels/pairing.js'; import type { MemoryStore } from '../memory/store.js'; import type { CommandRegistry } from '../commands/index.js'; +import type { RuntimeBackendMode } from '../commands/index.js'; import type { HookEngine } from '../hooks/index.js'; import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; @@ -116,6 +117,8 @@ export interface GatewayServerConfig { pairingManager?: PairingManager; memoryStore?: MemoryStore; commandRegistry?: CommandRegistry; + getBackendMode?: () => RuntimeBackendMode; + setBackendMode?: (mode: RuntimeBackendMode) => void; hookEngine?: HookEngine; intentRegistry?: ComponentRegistry; routingPolicy?: RoutingPolicy; @@ -438,6 +441,8 @@ export class GatewayServer { hookEngine: this.config.hookEngine, modelRouter: 'setClient' in this.config.modelClient ? this.config.modelClient : undefined, runtimeConfig: this.config.config, + getBackendMode: this.config.getBackendMode, + setBackendMode: this.config.setBackendMode, }); const intentHandlers = createIntentHandlers({ @@ -561,7 +566,8 @@ export class GatewayServer { const host = this.config.host ?? '127.0.0.1'; const { port } = this.config; - return new Promise((resolve) => { + return new Promise((resolve, reject) => { + let settled = false; this.observabilityCollector?.start(); // Create HTTP server first — handles static file requests this.httpServer = createServer((req: IncomingMessage, res: ServerResponse) => { @@ -570,6 +576,22 @@ export class GatewayServer { // Attach WebSocket server to the shared HTTP server (no separate port) this.wss = new WebSocketServer({ server: this.httpServer }); + this.wss.on('error', (error: Error) => { + if (!settled) { + settled = true; + reject(error); + return; + } + console.error(`Gateway WebSocket server error: ${error.message}`); + }); + this.httpServer.on('error', (error: Error) => { + if (!settled) { + settled = true; + reject(error); + return; + } + console.error(`Gateway HTTP server error: ${error.message}`); + }); this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { // Auth check on upgrade — only WS connections require auth @@ -591,6 +613,10 @@ export class GatewayServer { }); this.httpServer.listen(port, host, () => { + if (settled) { + return; + } + settled = true; console.log(`Gateway server listening on ${host}:${port}`); void this.startDiscovery(host, port).finally(() => { resolve();