Files
flynn/src/daemon/services.ts
T
William Valentin a8a2c59313 feat: implement model persistence with per-session overrides
- Add session_config SQLite table for per-session settings
- Update routing to support session override → agent config → global default resolution chain
- Upgrade WebChat SessionBridge from NativeAgent to AgentOrchestrator
- Add /model, /local, /cloud commands to Telegram adapter
- Add /model command to WebChat gateway handlers
- Clear session overrides on /reset command
- Pass memoryStore and config through to SessionBridge
- Add comprehensive tests for all new functionality

Fixes model persistence bug where TUI model changes didn't affect WebChat/Telegram sessions. Now:
- TUI /model sets global default (persists across restarts, affects all new sessions)
- WebChat/Telegram /model sets session override (only that conversation, cleared on /reset)
- WebChat sessions gain AgentOrchestrator features (delegation, compaction, memory)
2026-02-11 21:51:38 -08:00

273 lines
9.1 KiB
TypeScript

import type { Config } from '../config/index.js';
import type { Lifecycle } from './lifecycle.js';
import type { ToolRegistry, ToolExecutor } from '../tools/index.js';
import type { AgentOrchestrator } from '../backends/index.js';
import type { OutboundAttachmentCollector } from '../backends/native/attachments.js';
import { ModelRouter } from '../models/index.js';
import { SessionManager } from '../session/index.js';
import { GatewayServer } from '../gateway/index.js';
import { ChannelRegistry, PairingManager, type PairingStore } from '../channels/index.js';
import { HeartbeatMonitor } from '../automation/index.js';
import { McpManager } from '../mcp/index.js';
import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js';
import { assembleSystemPrompt } from '../prompt/index.js';
import { resolve } from 'path';
import { homedir } from 'os';
import type { MemoryStore } from '../memory/store.js';
// ── Skills ──────────────────────────────────────────────────────
export interface SkillsResult {
skillRegistry: SkillRegistry;
skillInstaller: SkillInstaller;
}
export function initSkills(config: Config): SkillsResult {
const defaultManagedDir = resolve(homedir(), '.flynn/workspace/skills');
const skillRegistry = new SkillRegistry();
const skillInstaller = new SkillInstaller(config.skills.managed_dir ?? defaultManagedDir);
const skills = loadAllSkills({
bundledDir: config.skills.bundled_dir,
managedDir: config.skills.managed_dir ?? defaultManagedDir,
workspaceDir: config.skills.workspace_dir,
});
for (const skill of skills) {
skillRegistry.register(skill);
}
if (skills.length > 0) {
const available = skillRegistry.listAvailable().length;
console.log(`Loaded ${skills.length} skill(s) (${available} available)`);
}
return { skillRegistry, skillInstaller };
}
// ── MCP ─────────────────────────────────────────────────────────
export async function initMcp(config: Config, lifecycle: Lifecycle, toolRegistry: ToolRegistry): Promise<McpManager> {
const mcpManager = new McpManager(toolRegistry);
if (config.mcp.servers.length > 0) {
console.log(`Starting ${config.mcp.servers.length} MCP server(s)...`);
await mcpManager.startAll(config.mcp.servers);
}
lifecycle.onShutdown(async () => {
await mcpManager.stopAll();
console.log('MCP servers stopped');
});
return mcpManager;
}
// ── System Prompt ───────────────────────────────────────────────
export function loadSystemPrompt(config: Config, skillRegistry: SkillRegistry): string {
const searchDirs = [
process.cwd(),
resolve(import.meta.dirname, '../..'),
...(config.prompt.search_dirs ?? []),
];
const result = assembleSystemPrompt({
searchDirs,
extraSections: config.prompt.extra_sections,
});
if (result.loadedFiles.length > 0) {
console.log(`Loaded prompt templates: ${result.loadedFiles.map(f => f.split('/').pop()).join(', ')}`);
}
let prompt = result.prompt;
const skillAdditions = skillRegistry.getSystemPromptAdditions();
if (skillAdditions) {
prompt = `${prompt}\n\n# Available Skills\n\n${skillAdditions}`;
}
return prompt;
}
// ── Pairing Manager ─────────────────────────────────────────────
export function initPairingManager(config: Config, store?: PairingStore): PairingManager | undefined {
if (!config.pairing.enabled) {return undefined;}
const ttlMatch = config.pairing.code_ttl.match(/^(\d+)(s|m|h)$/);
const codeTtlMs = ttlMatch
? Number(ttlMatch[1]) * ({ s: 1000, m: 60_000, h: 3_600_000 }[ttlMatch[2] as 's' | 'm' | 'h'])
: 5 * 60_000;
const manager = new PairingManager({
enabled: true,
codeTtl: codeTtlMs,
codeLength: config.pairing.code_length,
}, store);
console.log(`Pairing codes enabled (TTL: ${config.pairing.code_ttl}, length: ${config.pairing.code_length})`);
return manager;
}
// ── Gateway ─────────────────────────────────────────────────────
export interface GatewayDeps {
config: Config;
sessionManager: SessionManager;
modelRouter: ModelRouter;
systemPrompt: string;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
channelRegistry: ChannelRegistry;
pairingManager?: PairingManager;
lifecycle: Lifecycle;
getChannelAgents: () => Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }> | null;
memoryStore?: MemoryStore;
}
export function createGateway(deps: GatewayDeps): GatewayServer {
const { config, sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor, channelRegistry, pairingManager, lifecycle, getChannelAgents } = deps;
const gateway = new GatewayServer({
port: config.server.port,
host: config.server.localhost ? '127.0.0.1' : '0.0.0.0',
sessionManager,
modelClient: modelRouter,
systemPrompt,
toolRegistry,
toolExecutor,
auth: {
token: config.server.token,
tailscaleIdentity: config.server.tailscale_identity,
},
authHttp: config.server.auth_http,
lock: config.server.lock,
uiDir: resolve(import.meta.dirname, '../gateway/ui'),
config,
channelRegistry,
pairingManager,
memoryStore: deps.memoryStore,
restart: async () => {
console.log('Restart requested via gateway');
await lifecycle.shutdown();
process.exit(75);
},
getTokenUsage: () => {
const results: Array<{
sessionId: string;
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}> = [];
const sessionBridge = gateway.getSessionBridge();
for (const entry of sessionBridge.getAllUsage()) {
results.push(entry);
}
const channelAgents = getChannelAgents();
if (channelAgents) {
for (const [sessionId, { orchestrator }] of channelAgents) {
const usage = orchestrator.getUsage();
results.push({
sessionId,
primary: usage.primary,
delegation: usage.delegation,
total: usage.total,
});
}
}
return results;
},
});
if (config.server.token) {
console.log(`Gateway auth: token required${config.server.tailscale_identity ? ' + Tailscale identity' : ''}`);
}
return gateway;
}
// ── Service Startup ─────────────────────────────────────────────
export async function startServices(deps: {
config: Config;
lifecycle: Lifecycle;
channelRegistry: ChannelRegistry;
gateway: GatewayServer;
modelRouter: ModelRouter;
memoryDir: string;
dataDir: string;
}): Promise<void> {
const { config, lifecycle, channelRegistry, gateway, modelRouter, memoryDir, dataDir } = deps;
// Register shutdown handler for channels
lifecycle.onShutdown(async () => {
await channelRegistry.stopAll();
console.log('Channel adapters stopped');
});
// Start all channel adapters
await channelRegistry.startAll();
// Start gateway (HTTP + WS server)
lifecycle.onShutdown(async () => {
await gateway.stop();
console.log('Gateway server stopped');
});
await gateway.start();
// Tailscale Serve
if (config.server.tailscale?.serve) {
const { startTailscaleServe, stopTailscaleServe } = await import('../gateway/tailscale.js');
const tsConfig = {
localPort: config.server.port,
servePort: config.server.tailscale.port,
hostname: config.server.tailscale.hostname,
};
try {
await startTailscaleServe(tsConfig);
lifecycle.onShutdown(async () => {
await stopTailscaleServe(tsConfig);
});
} catch {
console.warn('Tailscale Serve failed to start — gateway still accessible on local port');
}
}
// Heartbeat Monitor
const heartbeatMonitor = new HeartbeatMonitor({
config: config.automation.heartbeat,
getGatewayPort: () => config.server.port,
modelRouter,
channelLister: channelRegistry,
memoryDir: config.memory.enabled ? memoryDir : undefined,
dataDir,
channelLookup: channelRegistry,
});
heartbeatMonitor.start();
lifecycle.onShutdown(async () => {
heartbeatMonitor.stop();
console.log('Heartbeat monitor stopped');
});
// Signal handlers
const signalHandler = () => {
lifecycle.shutdown().then(() => process.exit(0));
};
process.on('SIGINT', signalHandler);
process.on('SIGTERM', signalHandler);
lifecycle.onShutdown(async () => {
process.off('SIGINT', signalHandler);
process.off('SIGTERM', signalHandler);
});
console.log('Flynn daemon started');
}