Unify TUI runtime commands with gateway and harden gateway restart

This commit is contained in:
William Valentin
2026-02-24 13:14:53 -08:00
parent db2f697741
commit 37be391a40
24 changed files with 1253 additions and 120 deletions
+5
View File
@@ -249,6 +249,11 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions):
const gateway = createGateway({
config, configPath: options?.persistConfigPath ?? options?.configPath, sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor,
channelRegistry, pairingManager, lifecycle, memoryStore,
getBackendMode: () => backendMode,
setBackendMode: (mode) => {
backendMode = mode;
savePreference(dataDir, 'backendMode', mode);
},
getChannelAgents: () => channelAgents, commandRegistry, intentRegistry, routingPolicy, hookEngine,
});
+31 -78
View File
@@ -18,6 +18,11 @@ import { ToolRegistry, ToolExecutor } from '../tools/index.js';
import { SessionManager } from '../session/index.js';
import { AgentConfigRegistry, AgentRouter } from '../agents/index.js';
import type { CommandRegistry } from '../commands/index.js';
import {
executeRuntimeBackendModeCommand,
formatRuntimeBackendStatusLine,
type RuntimeBackendMode,
} from '../commands/index.js';
import type { ComponentRegistry } from '../intents/index.js';
import type { RoutingPolicy } from '../routing/index.js';
import type { HookEngine } from '../hooks/index.js';
@@ -31,7 +36,7 @@ import { dirname, resolve } from 'path';
import { loadCouncilScaffoldSafe } from '../councils/scaffold.js';
import { buildCouncilPreflightReport, shouldRunCouncilPreflight } from '../councils/preflight.js';
export type BackendRuntimeMode = 'config_default' | 'force_native' | 'force_pi_embedded';
export type BackendRuntimeMode = RuntimeBackendMode;
function buildProviderConfigMap(config: Config): Partial<Record<ModelProvider, ModelConfig>> {
const providerConfigs: Partial<Record<ModelProvider, ModelConfig>> = {};
@@ -393,17 +398,29 @@ export function createMessageRouter(deps: {
return requestedBackend;
}
function formatBackendStatusLine(activeTier: string): string {
const mode = getBackendMode();
const configuredDefault = getConfiguredOrFallbackDefaultBackend();
const effectiveDefault = resolveRoutableBackend(getEffectiveDefaultBackend());
const availableExternal = Object.keys(deps.externalBackends ?? {}).sort().join(', ') || 'none';
return [
`Flynn is running. Active model tier: ${activeTier}. Backend: ${effectiveDefault}`,
`Backend mode: ${mode}`,
`Configured default: ${configuredDefault}`,
`Available external backends: ${availableExternal}`,
].join('\n');
function listAvailableExternalBackends(): string[] {
return Object.keys(deps.externalBackends ?? {});
}
function formatBackendStatus(activeTier: string): string {
return formatRuntimeBackendStatusLine({
getActiveTier: () => activeTier,
getBackendMode,
getConfiguredDefaultBackend: getConfiguredOrFallbackDefaultBackend,
getEffectiveDefaultBackend: () => resolveRoutableBackend(getEffectiveDefaultBackend()),
getAvailableExternalBackends: listAvailableExternalBackends,
});
}
function executeBackendCommand(inputRaw: string, activeTier: string): string {
return executeRuntimeBackendModeCommand(inputRaw, {
getActiveTier: () => activeTier,
getBackendMode,
setBackendMode: deps.setBackendMode,
getConfiguredDefaultBackend: getConfiguredOrFallbackDefaultBackend,
getEffectiveDefaultBackend: () => resolveRoutableBackend(getEffectiveDefaultBackend()),
getAvailableExternalBackends: listAvailableExternalBackends,
});
}
async function maybeBuildTtsAttachment(responseText: string, channel: string) {
@@ -823,7 +840,7 @@ export function createMessageRouter(deps: {
rawInput: commandInput,
services: {
getStatus: () => {
return formatBackendStatusLine(agent.getModelTier());
return formatBackendStatus(agent.getModelTier());
},
getTools: () => {
const names = new Set(deps.toolRegistry.list().map((tool: Tool) => tool.name));
@@ -1203,71 +1220,7 @@ export function createMessageRouter(deps: {
return `Session transferred to ${destinationLabel}`;
},
backendCommand: (inputRaw: string) => {
let normalized = inputRaw.trim().toLowerCase();
// Accept both subcommand-only input ("status") and accidental full-command
// input ("/runtime status", "runtime status", "/backend status").
normalized = normalized.replace(/^(?:\/)?(?:runtime|backend)\b/, '').trim();
normalized = normalized.replace(/^\//, '').trim();
if (!normalized || normalized === 'status' || normalized === 'show') {
return formatBackendStatusLine(agent.getModelTier());
}
if (!deps.setBackendMode) {
return 'Backend mode control is not available in this runtime.';
}
if (
normalized === 'activate pi'
|| normalized === 'activate pi_embedded'
|| normalized === 'activate pi-embedded'
) {
deps.setBackendMode('force_pi_embedded');
return [
'Pi embedded backend activated globally.',
formatBackendStatusLine(agent.getModelTier()),
].join('\n\n');
}
if (
normalized === 'deactivate pi'
|| normalized === 'deactivate pi_embedded'
|| normalized === 'deactivate pi-embedded'
) {
deps.setBackendMode('force_native');
return [
'Pi embedded backend deactivated globally. Native is now forced for Pi-routed turns.',
formatBackendStatusLine(agent.getModelTier()),
].join('\n\n');
}
if (
normalized === 'use config'
|| normalized === 'reset'
|| normalized === 'auto'
|| normalized === 'config'
) {
deps.setBackendMode('config_default');
return [
'Backend mode reset to config default.',
formatBackendStatusLine(agent.getModelTier()),
].join('\n\n');
}
return [
'Usage:',
'/runtime status',
'/runtime activate pi',
'/runtime deactivate pi',
'/runtime use config',
'',
'Alias:',
'/backend status',
'/backend activate pi',
'/backend deactivate pi',
'/backend use config',
].join('\n');
},
backendCommand: (inputRaw: string) => executeBackendCommand(inputRaw, agent.getModelTier()),
getApprovals: () => {
if (!deps.hookEngine) {
+71 -11
View File
@@ -24,7 +24,7 @@ import { assembleSystemPrompt } from '../prompt/index.js';
import { join, relative, resolve, sep } from 'path';
import { homedir } from 'os';
import type { MemoryStore } from '../memory/store.js';
import type { CommandRegistry } from '../commands/index.js';
import type { CommandRegistry, RuntimeBackendMode } from '../commands/index.js';
import type { ComponentRegistry } from '../intents/index.js';
import type { RoutingPolicy } from '../routing/index.js';
import type { HookEngine } from '../hooks/index.js';
@@ -293,6 +293,8 @@ export interface GatewayDeps {
getChannelAgents: () => Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }> | null;
memoryStore?: MemoryStore;
commandRegistry?: CommandRegistry;
getBackendMode?: () => RuntimeBackendMode;
setBackendMode?: (mode: RuntimeBackendMode) => void;
intentRegistry?: ComponentRegistry;
routingPolicy?: RoutingPolicy;
hookEngine?: HookEngine;
@@ -388,6 +390,8 @@ export function createGateway(deps: GatewayDeps): GatewayServer {
channelRegistry,
pairingManager,
memoryStore: deps.memoryStore,
getBackendMode: deps.getBackendMode,
setBackendMode: deps.setBackendMode,
restart: async () => {
console.log('Restart requested via gateway');
await lifecycle.shutdown();
@@ -472,25 +476,31 @@ export async function startServices(deps: {
memoryStore?: MemoryStore;
memoryDir: string;
dataDir: string;
gatewayStartRetry?: {
maxAttempts?: number;
retryDelayMs?: number;
sleep?: (ms: number) => Promise<void>;
};
}): Promise<void> {
const { config, lifecycle, channelRegistry, gateway, modelRouter, memoryStore, memoryDir, dataDir } = deps;
// Register shutdown handler for channels
lifecycle.onShutdown(async () => {
await channelRegistry.stopAll();
console.log('Channel adapters stopped');
});
// Start all channel adapters
await channelRegistry.startAll();
// Start gateway (HTTP + WS server)
lifecycle.onShutdown(async () => {
await gateway.stop();
console.log('Gateway server stopped');
});
await gateway.start();
const host = config.server.localhost ? '127.0.0.1' : '0.0.0.0';
await startGatewayWithRetry(gateway, host, config.server.port, deps.gatewayStartRetry);
// Register shutdown handler for channels
lifecycle.onShutdown(async () => {
await channelRegistry.stopAll();
console.log('Channel adapters stopped');
});
// Start all channel adapters after gateway bind succeeds.
await channelRegistry.startAll();
// Tailscale Serve
if (config.server.tailscale?.serve) {
@@ -589,3 +599,53 @@ export async function startServices(deps: {
console.log('Flynn daemon started');
}
function isAddressInUseError(error: unknown): error is NodeJS.ErrnoException {
return (
typeof error === 'object'
&& error !== null
&& 'code' in error
&& (error as NodeJS.ErrnoException).code === 'EADDRINUSE'
);
}
async function startGatewayWithRetry(
gateway: Pick<GatewayServer, 'start' | 'stop'>,
host: string,
port: number,
retry?: {
maxAttempts?: number;
retryDelayMs?: number;
sleep?: (ms: number) => Promise<void>;
},
): Promise<void> {
const maxAttempts = Math.max(1, retry?.maxAttempts ?? 10);
const retryDelayMs = Math.max(0, retry?.retryDelayMs ?? 500);
const sleep = retry?.sleep ?? ((ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms)));
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
await gateway.start();
return;
} catch (error) {
if (!isAddressInUseError(error)) {
throw error;
}
await gateway.stop().catch(() => {});
if (attempt === maxAttempts) {
throw new Error(
`Gateway bind failed: ${host}:${port} is already in use after ${maxAttempts} attempts. `
+ 'Another Flynn daemon or process is already listening on this port.',
);
}
console.warn(
`Gateway bind collision on ${host}:${port} (attempt ${attempt}/${maxAttempts}); `
+ `retrying in ${retryDelayMs}ms...`,
);
await sleep(retryDelayMs);
}
}
}
+139
View File
@@ -0,0 +1,139 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import { configSchema } from '../config/schema.js';
import { Lifecycle } from './lifecycle.js';
import { startServices } from './services.js';
vi.mock('../automation/index.js', () => {
return {
HeartbeatMonitor: class {
start(): void {}
stop(): void {}
},
MinioSyncScheduler: class {
start(): void {}
stop(): void {}
},
};
});
function makeConfig(overrides: Record<string, unknown> = {}) {
return configSchema.parse({
telegram: { bot_token: 'test-token', allowed_chat_ids: [1] },
models: { default: { provider: 'anthropic', model: 'claude-3' } },
...overrides,
});
}
describe('startServices startup ordering', () => {
afterEach(async () => {
vi.restoreAllMocks();
});
it('fails after bounded retries on persistent gateway bind collision before starting channels', async () => {
const lifecycle = new Lifecycle();
const config = makeConfig({ server: { localhost: true, port: 18800 } });
const startError = new Error('listen EADDRINUSE: address already in use 127.0.0.1:18800') as Error & { code?: string };
startError.code = 'EADDRINUSE';
const channelRegistry = {
startAll: vi.fn(async () => {}),
stopAll: vi.fn(async () => {}),
};
const gateway = {
start: vi.fn(async () => { throw startError; }),
stop: vi.fn(async () => {}),
getMetrics: vi.fn(() => ({ getModelMetrics: () => [] })),
};
await expect(startServices({
config,
lifecycle,
channelRegistry: channelRegistry as never,
gateway: gateway as never,
modelRouter: {} as never,
memoryDir: '/tmp',
dataDir: '/tmp',
gatewayStartRetry: {
maxAttempts: 3,
retryDelayMs: 0,
sleep: async () => {},
},
})).rejects.toThrow('Gateway bind failed');
expect(channelRegistry.startAll).not.toHaveBeenCalled();
expect(channelRegistry.stopAll).not.toHaveBeenCalled();
expect(gateway.start).toHaveBeenCalledTimes(3);
expect(gateway.stop).toHaveBeenCalledTimes(3);
});
it('retries gateway bind collisions and then starts channels on success', async () => {
const lifecycle = new Lifecycle();
const config = makeConfig({ server: { localhost: true, port: 18800 } });
const startError = new Error('listen EADDRINUSE: address already in use 127.0.0.1:18800') as Error & { code?: string };
startError.code = 'EADDRINUSE';
const order: string[] = [];
const channelRegistry = {
startAll: vi.fn(async () => { order.push('channels.start'); }),
stopAll: vi.fn(async () => {}),
};
const gateway = {
start: vi.fn(async () => {
order.push('gateway.start');
if (gateway.start.mock.calls.length === 1) {
throw startError;
}
}),
stop: vi.fn(async () => { order.push('gateway.stop'); }),
getMetrics: vi.fn(() => ({ getModelMetrics: () => [] })),
};
await startServices({
config,
lifecycle,
channelRegistry: channelRegistry as never,
gateway: gateway as never,
modelRouter: {} as never,
memoryDir: '/tmp',
dataDir: '/tmp',
gatewayStartRetry: {
maxAttempts: 3,
retryDelayMs: 0,
sleep: async () => {},
},
});
expect(order).toEqual(['gateway.start', 'gateway.stop', 'gateway.start', 'channels.start']);
expect(channelRegistry.startAll).toHaveBeenCalledOnce();
await lifecycle.shutdown();
});
it('starts gateway before channels when startup succeeds', async () => {
const lifecycle = new Lifecycle();
const config = makeConfig({ server: { localhost: true, port: 18800 } });
const order: string[] = [];
const channelRegistry = {
startAll: vi.fn(async () => { order.push('channels.start'); }),
stopAll: vi.fn(async () => {}),
};
const gateway = {
start: vi.fn(async () => { order.push('gateway.start'); }),
stop: vi.fn(async () => {}),
getMetrics: vi.fn(() => ({ getModelMetrics: () => [] })),
};
await startServices({
config,
lifecycle,
channelRegistry: channelRegistry as never,
gateway: gateway as never,
modelRouter: {} as never,
memoryDir: '/tmp',
dataDir: '/tmp',
});
expect(order).toEqual(['gateway.start', 'channels.start']);
await lifecycle.shutdown();
});
});