import { Lifecycle } from './lifecycle.js'; import type { Config } from '../config/index.js'; import { AnthropicClient, OpenAIClient, OllamaClient, LlamaCppClient, ModelRouter } from '../models/index.js'; import { NativeAgent } from '../backends/index.js'; import { SessionStore, SessionManager } from '../session/index.js'; import { HookEngine } from '../hooks/index.js'; import { ToolRegistry, ToolExecutor, allBuiltinTools } from '../tools/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter } from '../channels/index.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; import { resolve } from 'path'; import { homedir } from 'os'; import { mkdirSync, readFileSync, existsSync } from 'fs'; export interface DaemonContext { config: Config; lifecycle: Lifecycle; sessionStore: SessionStore; sessionManager: SessionManager; hookEngine: HookEngine; modelRouter: ModelRouter; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; gateway: GatewayServer; channelRegistry: ChannelRegistry; mcpManager: McpManager; skillRegistry: SkillRegistry; skillInstaller: SkillInstaller; } function loadSystemPrompt(): string { // Try to load SOUL.md from working directory first, then from project root const paths = [ resolve(process.cwd(), 'SOUL.md'), resolve(import.meta.dirname, '../../SOUL.md'), ]; for (const soulPath of paths) { if (existsSync(soulPath)) { return readFileSync(soulPath, 'utf-8'); } } // Fallback if SOUL.md not found return 'You are Flynn, a helpful personal AI assistant. Be direct, concise, and helpful. Use markdown when it improves readability.'; } function createModelRouter(config: Config): ModelRouter { const models = config.models; const defaultClient = new AnthropicClient({ model: models.default.model, apiKey: models.default.api_key, authToken: models.default.auth_token, }); let fastClient; let complexClient; let localClient; if (models.fast) { fastClient = new AnthropicClient({ model: models.fast.model, apiKey: models.fast.api_key, authToken: models.fast.auth_token, }); } if (models.complex) { complexClient = new AnthropicClient({ model: models.complex.model, apiKey: models.complex.api_key, authToken: models.complex.auth_token, }); } if (models.local) { if (models.local.provider === 'ollama') { localClient = new OllamaClient({ model: models.local.model, host: models.local.endpoint, numGpu: models.local.num_gpu, }); } else if (models.local.provider === 'llamacpp') { localClient = new LlamaCppClient({ endpoint: models.local.endpoint ?? 'http://localhost:8080', model: models.local.model, authToken: models.local.auth_token, }); } } const fallbackChain = []; for (const providerName of models.fallback_chain) { if (providerName === 'openai') { fallbackChain.push(new OpenAIClient({ model: 'gpt-4o' })); } else if (providerName === 'local' && localClient) { fallbackChain.push(localClient); } } return new ModelRouter({ default: defaultClient, fast: fastClient, complex: complexClient, local: localClient, fallbackChain, }); } /** * Create the unified message handler for the channel registry. * Each channel+sender pair gets its own NativeAgent backed by a persistent session. */ function createMessageRouter(deps: { sessionManager: SessionManager; modelRouter: ModelRouter; systemPrompt: string; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; }) { // Cache agents by session ID to avoid recreating on every message const agents = new Map(); function getOrCreateAgent(channel: string, senderId: string): NativeAgent { const sessionId = `${channel}:${senderId}`; let agent = agents.get(sessionId); if (!agent) { const session = deps.sessionManager.getSession(channel, senderId); agent = new NativeAgent({ modelClient: deps.modelRouter, systemPrompt: deps.systemPrompt, session, toolRegistry: deps.toolRegistry, toolExecutor: deps.toolExecutor, }); agents.set(sessionId, agent); } return agent; } return async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { const agent = getOrCreateAgent(msg.channel, msg.senderId); // Handle special commands if (msg.metadata?.isCommand && msg.metadata.command === 'reset') { agent.reset(); return; } try { const response = await agent.process(msg.text); await reply({ text: response, replyTo: msg.id }); } catch (error) { console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); await reply({ text: 'Sorry, an error occurred while processing your message.', replyTo: msg.id, }); } }; } export async function startDaemon(config: Config): Promise { const lifecycle = new Lifecycle(); // Ensure data directory exists const dataDir = resolve(homedir(), '.local/share/flynn'); mkdirSync(dataDir, { recursive: true }); // Initialize session store and manager const sessionStore = new SessionStore(resolve(dataDir, 'sessions.db')); const sessionManager = new SessionManager(sessionStore); lifecycle.onShutdown(async () => { sessionStore.close(); console.log('Session store closed'); }); // Initialize hook engine const hookEngine = new HookEngine(config.hooks); // Initialize tool registry and executor const toolRegistry = new ToolRegistry(); for (const tool of allBuiltinTools) { toolRegistry.register(tool); } const toolExecutor = new ToolExecutor(toolRegistry, hookEngine); // Initialize MCP manager and start configured servers const mcpManager = new McpManager(toolRegistry); if (config.mcp.servers.length > 0) { console.log(`Starting ${config.mcp.servers.length} MCP server(s)...`); await mcpManager.startAll(config.mcp.servers); } lifecycle.onShutdown(async () => { await mcpManager.stopAll(); console.log('MCP servers stopped'); }); // Initialize skills system const defaultManagedDir = resolve(homedir(), '.flynn/workspace/skills'); const skillRegistry = new SkillRegistry(); const skillInstaller = new SkillInstaller(config.skills.managed_dir ?? defaultManagedDir); const skills = loadAllSkills({ bundledDir: config.skills.bundled_dir, managedDir: config.skills.managed_dir ?? defaultManagedDir, workspaceDir: config.skills.workspace_dir, }); for (const skill of skills) { skillRegistry.register(skill); } if (skills.length > 0) { const available = skillRegistry.listAvailable().length; console.log(`Loaded ${skills.length} skill(s) (${available} available)`); } // Initialize model router const modelRouter = createModelRouter(config); // Load system prompt and append skill instructions let systemPrompt = loadSystemPrompt(); const skillAdditions = skillRegistry.getSystemPromptAdditions(); if (skillAdditions) { systemPrompt = `${systemPrompt}\n\n# Available Skills\n\n${skillAdditions}`; } // Initialize gateway WebSocket server const gateway = new GatewayServer({ port: config.server.port, host: config.server.localhost ? '127.0.0.1' : '0.0.0.0', sessionManager, modelClient: modelRouter, systemPrompt, toolRegistry, toolExecutor, uiDir: resolve(import.meta.dirname, '../gateway/ui'), config, restart: async () => { console.log('Restart requested via gateway'); await lifecycle.shutdown(); // Exit with code 75 (EX_TEMPFAIL) — process supervisor should restart process.exit(75); }, }); // ── Channel Registry ────────────────────────────────────────── const channelRegistry = new ChannelRegistry(); // Set up the unified message handler channelRegistry.setMessageHandler(createMessageRouter({ sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor, })); // Register Telegram adapter const telegramAdapter = new TelegramAdapter({ botToken: config.telegram.bot_token, allowedChatIds: config.telegram.allowed_chat_ids, hookEngine, }); channelRegistry.register(telegramAdapter); // Register WebChat adapter (wraps the gateway) const webChatAdapter = new WebChatAdapter({ gateway }); channelRegistry.register(webChatAdapter); // ── Signal Handlers ─────────────────────────────────────────── const signalHandler = () => { lifecycle.shutdown().then(() => process.exit(0)); }; process.on('SIGINT', signalHandler); process.on('SIGTERM', signalHandler); lifecycle.onShutdown(async () => { process.off('SIGINT', signalHandler); process.off('SIGTERM', signalHandler); }); // ── Start Services ──────────────────────────────────────────── // Register shutdown handler for channels (stops Telegram bot etc.) lifecycle.onShutdown(async () => { await channelRegistry.stopAll(); console.log('Channel adapters stopped'); }); // Start all channel adapters (Telegram long polling, WebChat status) await channelRegistry.startAll(); // Start gateway (HTTP + WS server) lifecycle.onShutdown(async () => { await gateway.stop(); console.log('Gateway server stopped'); }); await gateway.start(); console.log('Flynn daemon started'); return { config, lifecycle, sessionStore, sessionManager, hookEngine, modelRouter, toolRegistry, toolExecutor, gateway, channelRegistry, mcpManager, skillRegistry, skillInstaller, }; } export { Lifecycle } from './lifecycle.js';