import { Lifecycle } from './lifecycle.js'; import type { Config, ModelConfig } from '../config/index.js'; import { AnthropicClient, OpenAIClient, OllamaClient, LlamaCppClient, GeminiClient, BedrockClient, ModelRouter, DEFAULT_RETRY_CONFIG } from '../models/index.js'; import type { ModelClient, RetryConfig } from '../models/index.js'; import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js'; import { SessionStore, SessionManager } from '../session/index.js'; import { HookEngine } from '../hooks/index.js'; import { ToolRegistry, ToolExecutor, ToolPolicy, allBuiltinTools, createWebSearchTools, createProcessTools, ProcessManager, BrowserManager, createBrowserTools } from '../tools/index.js'; import type { Tool } from '../tools/types.js'; import { MemoryStore } from '../memory/index.js'; import { createMemoryTools } from '../tools/builtin/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js'; import { CronScheduler } from '../automation/index.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; import { assembleSystemPrompt } from '../prompt/index.js'; import { AgentConfigRegistry, AgentRouter } from '../agents/index.js'; import { DockerSandbox, SandboxManager, createSandboxedShellTool, createSandboxedProcessStartTool } from '../sandbox/index.js'; import { resolve } from 'path'; import { homedir } from 'os'; import { mkdirSync } from 'fs'; export interface DaemonContext { config: Config; lifecycle: Lifecycle; sessionStore: SessionStore; sessionManager: SessionManager; hookEngine: HookEngine; modelRouter: ModelRouter; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; gateway: GatewayServer; channelRegistry: ChannelRegistry; mcpManager: McpManager; skillRegistry: SkillRegistry; skillInstaller: SkillInstaller; agentConfigRegistry: AgentConfigRegistry; agentRouter: AgentRouter; sandboxManager?: SandboxManager; browserManager?: BrowserManager; } function loadSystemPrompt(config: Config): string { const searchDirs = [ process.cwd(), resolve(import.meta.dirname, '../..'), ...(config.prompt.search_dirs ?? []), ]; const result = assembleSystemPrompt({ searchDirs, extraSections: config.prompt.extra_sections, }); if (result.loadedFiles.length > 0) { console.log(`Loaded prompt templates: ${result.loadedFiles.map(f => f.split('/').pop()).join(', ')}`); } return result.prompt; } /** * Create a ModelClient from a provider config entry. * Dispatches on the `provider` field so all tiers and fallback entries * use the correct client implementation. */ export function createClientFromConfig(cfg: ModelConfig): ModelClient { switch (cfg.provider) { case 'anthropic': return new AnthropicClient({ model: cfg.model, apiKey: cfg.api_key, authToken: cfg.auth_token, }); case 'openai': return new OpenAIClient({ model: cfg.model, apiKey: cfg.api_key, }); case 'ollama': return new OllamaClient({ model: cfg.model, host: cfg.endpoint, numGpu: cfg.num_gpu, }); case 'llamacpp': return new LlamaCppClient({ endpoint: cfg.endpoint ?? 'http://localhost:8080', model: cfg.model, authToken: cfg.auth_token, }); case 'gemini': return new GeminiClient({ model: cfg.model, apiKey: cfg.api_key, }); case 'openrouter': return new OpenAIClient({ model: cfg.model, apiKey: cfg.api_key ?? process.env.OPENROUTER_API_KEY, baseURL: cfg.endpoint ?? 'https://openrouter.ai/api/v1', }); case 'bedrock': return new BedrockClient({ model: cfg.model, region: cfg.endpoint, accessKeyId: cfg.api_key, secretAccessKey: cfg.auth_token, }); default: throw new Error(`Unknown model provider: ${(cfg as Record).provider}`); } } function createModelRouter(config: Config): ModelRouter { const models = config.models; const defaultClient = createClientFromConfig(models.default); const fastClient = models.fast ? createClientFromConfig(models.fast) : undefined; const complexClient = models.complex ? createClientFromConfig(models.complex) : undefined; const localClient = models.local ? createClientFromConfig(models.local) : undefined; // Build fallback chain — each entry references a tier name or 'local' const fallbackChain: ModelClient[] = []; for (const providerName of models.fallback_chain) { if (providerName === 'local' && localClient) { fallbackChain.push(localClient); } else if (providerName === 'default') { // Allows re-trying the default provider in the chain fallbackChain.push(defaultClient); } else if (providerName === 'fast' && fastClient) { fallbackChain.push(fastClient); } else if (providerName === 'complex' && complexClient) { fallbackChain.push(complexClient); } else if (models.local_providers?.[providerName]) { // Named provider from local_providers map fallbackChain.push(createClientFromConfig(models.local_providers[providerName])); } else { console.warn(`Fallback chain entry "${providerName}" not found — skipping`); } } console.log(`Model router: default=${models.default.provider}/${models.default.model}, ` + `fallback=[${models.fallback_chain.join(', ')}]`); // Build retry config if enabled const retryConfig: RetryConfig | undefined = config.retry.enabled ? { maxRetries: config.retry.max_retries, initialDelayMs: config.retry.initial_delay_ms, backoffMultiplier: config.retry.backoff_multiplier, maxDelayMs: config.retry.max_delay_ms, nonRetryablePatterns: DEFAULT_RETRY_CONFIG.nonRetryablePatterns, } : undefined; if (retryConfig) { console.log(`Retry policy: max_retries=${retryConfig.maxRetries}, initial_delay=${retryConfig.initialDelayMs}ms`); } return new ModelRouter({ default: defaultClient, fast: fastClient, complex: complexClient, local: localClient, fallbackChain, retryConfig, }); } /** * Create the unified message handler for the channel registry. * Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session. * The orchestrator wraps a NativeAgent and adds delegation to different model tiers. */ function createMessageRouter(deps: { sessionManager: SessionManager; modelRouter: ModelRouter; systemPrompt: string; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; config: Config; memoryStore?: MemoryStore; agentConfigRegistry?: AgentConfigRegistry; agentRouter?: AgentRouter; sandboxManager?: SandboxManager; }) { // Cache agents by session ID + agent config name to avoid recreating on every message const agents = new Map(); function getOrCreateAgent(channel: string, senderId: string): AgentOrchestrator { // Resolve agent config name via routing (sender → channel → default fallback) const agentConfigName = deps.agentRouter?.resolve(channel, senderId); const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined; // Include agent config name in cache key so different agents aren't shared const sessionId = agentConfigName ? `${channel}:${senderId}:${agentConfigName}` : `${channel}:${senderId}`; let agent = agents.get(sessionId); if (!agent) { const session = deps.sessionManager.getSession(channel, senderId); // Use agent config overrides where available, falling back to global config const effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt; const effectiveTier = agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default'; const effectiveProvider = deps.config.models.default.provider; const delegationConfig: DelegationConfig = { compaction: deps.config.agents.delegation.compaction ?? 'fast', memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast', classification: deps.config.agents.delegation.classification ?? 'fast', tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast', complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex', }; // Clone the tool registry and replace shell tools with sandboxed versions if configured let effectiveToolRegistry = deps.toolRegistry; if (agentConfig?.sandbox && deps.sandboxManager && deps.config.sandbox.enabled) { effectiveToolRegistry = deps.toolRegistry.clone(); // Lazy sandbox: create the sandboxed tools with a deferred sandbox reference // The sandbox is created on first use via SandboxManager.getOrCreate() const sandboxSessionId = sessionId; const sandboxManager = deps.sandboxManager; // Create a proxy sandbox that lazily initializes const lazySandboxShell: Tool = { name: 'shell.exec', description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.', inputSchema: { type: 'object', properties: { command: { type: 'string', description: 'The shell command to execute' }, cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' }, }, required: ['command'], }, execute: async (rawArgs: unknown) => { const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); const tool = createSandboxedShellTool(sandbox); return tool.execute(rawArgs); }, }; const lazySandboxProcess: Tool = { name: 'process.start', description: 'Start a command in the background inside a sandboxed container.', inputSchema: { type: 'object', properties: { command: { type: 'string', description: 'The shell command to run in the background' }, cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, }, required: ['command'], }, execute: async (rawArgs: unknown) => { const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); const tool = createSandboxedProcessStartTool(sandbox); return tool.execute(rawArgs); }, }; effectiveToolRegistry.replace(lazySandboxShell); effectiveToolRegistry.replace(lazySandboxProcess); } agent = new AgentOrchestrator({ modelRouter: deps.modelRouter, systemPrompt: effectiveSystemPrompt, session, toolRegistry: effectiveToolRegistry, toolExecutor: deps.toolExecutor, primaryTier: effectiveTier, delegation: delegationConfig, maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, compaction: deps.config.compaction.enabled ? { thresholdPct: deps.config.compaction.threshold_pct, keepTurns: deps.config.compaction.keep_turns, summaryMaxTokens: deps.config.compaction.summary_max_tokens, } : undefined, modelName: deps.config.models.default.model, contextWindow: deps.config.models.default.context_window, memoryStore: deps.memoryStore, toolPolicyContext: { agent: effectiveTier, provider: effectiveProvider, }, }); agents.set(sessionId, agent); } return agent; } return async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { const agent = getOrCreateAgent(msg.channel, msg.senderId); // Handle special commands if (msg.metadata?.isCommand) { if (msg.metadata.command === 'reset') { agent.reset(); return; } if (msg.metadata.command === 'compact') { const result = await agent.compact(); if (result && result.compactedCount > 0) { await reply({ text: `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`, replyTo: msg.id, }); } else { await reply({ text: 'Nothing to compact.', replyTo: msg.id, }); } return; } if (msg.metadata.command === 'usage') { const usage = agent.getUsage(); const lines = [ '**Token Usage**', '', `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, ]; const delegationEntries = Object.entries(usage.delegation); if (delegationEntries.length > 0) { lines.push(''); lines.push('Delegation:'); for (const [tier, stats] of delegationEntries) { lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); } } lines.push(''); lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); if (usage.total.estimatedCost > 0) { lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); } await reply({ text: lines.join('\n'), replyTo: msg.id }); return; } } try { const response = await agent.process(msg.text); await reply({ text: response, replyTo: msg.id }); } catch (error) { console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); await reply({ text: 'Sorry, an error occurred while processing your message.', replyTo: msg.id, }); } }; } export async function startDaemon(config: Config): Promise { const lifecycle = new Lifecycle(); // Ensure data directory exists const dataDir = resolve(homedir(), '.local/share/flynn'); mkdirSync(dataDir, { recursive: true }); // Initialize memory store const memoryDir = config.memory.dir ?? resolve(dataDir, 'memory'); mkdirSync(memoryDir, { recursive: true }); const memoryStore = config.memory.enabled ? new MemoryStore({ dir: memoryDir, maxContextTokens: config.memory.max_context_tokens }) : undefined; // Initialize session store and manager const sessionStore = new SessionStore(resolve(dataDir, 'sessions.db')); const sessionManager = new SessionManager(sessionStore); lifecycle.onShutdown(async () => { sessionStore.close(); console.log('Session store closed'); }); // Initialize hook engine const hookEngine = new HookEngine(config.hooks); // Initialize tool registry and executor const toolRegistry = new ToolRegistry(); for (const tool of allBuiltinTools) { toolRegistry.register(tool); } // Register memory tools if memory is enabled if (memoryStore) { for (const tool of createMemoryTools(memoryStore)) { toolRegistry.register(tool); } } // Register web search tool if configured with credentials if (config.web_search.api_key || config.web_search.endpoint) { for (const tool of createWebSearchTools({ provider: config.web_search.provider, apiKey: config.web_search.api_key, endpoint: config.web_search.endpoint, maxResults: config.web_search.max_results, })) { toolRegistry.register(tool); } } // Initialize process manager and register process tools const processManager = new ProcessManager({ maxConcurrent: config.process.max_concurrent, maxRuntimeMinutes: config.process.max_runtime_minutes, bufferSize: config.process.buffer_size, }); for (const tool of createProcessTools(processManager)) { toolRegistry.register(tool); } lifecycle.onShutdown(async () => { await processManager.shutdown(); console.log('Process manager stopped'); }); // Initialize browser manager and register browser tools (if enabled) let browserManager: BrowserManager | undefined; if (config.browser?.enabled) { browserManager = new BrowserManager({ executablePath: config.browser.executable_path, wsEndpoint: config.browser.ws_endpoint, headless: config.browser.headless, maxPages: config.browser.max_pages, defaultTimeout: config.browser.default_timeout, }); for (const tool of createBrowserTools(browserManager)) { toolRegistry.register(tool); } console.log(`Browser tools enabled (headless=${config.browser.headless})`); lifecycle.onShutdown(async () => { await browserManager!.shutdown(); console.log('Browser manager stopped'); }); } const toolExecutor = new ToolExecutor(toolRegistry, hookEngine); // Initialize tool policy from config const toolPolicy = new ToolPolicy(config.tools); toolRegistry.setPolicy(toolPolicy); const effectiveProfile = toolPolicy.getEffectiveProfile(); if (effectiveProfile !== 'full') { console.log(`Tool policy: profile=${effectiveProfile}, deny=[${config.tools.deny.join(', ')}]`); } // Initialize MCP manager and start configured servers const mcpManager = new McpManager(toolRegistry); if (config.mcp.servers.length > 0) { console.log(`Starting ${config.mcp.servers.length} MCP server(s)...`); await mcpManager.startAll(config.mcp.servers); } lifecycle.onShutdown(async () => { await mcpManager.stopAll(); console.log('MCP servers stopped'); }); // Initialize skills system const defaultManagedDir = resolve(homedir(), '.flynn/workspace/skills'); const skillRegistry = new SkillRegistry(); const skillInstaller = new SkillInstaller(config.skills.managed_dir ?? defaultManagedDir); const skills = loadAllSkills({ bundledDir: config.skills.bundled_dir, managedDir: config.skills.managed_dir ?? defaultManagedDir, workspaceDir: config.skills.workspace_dir, }); for (const skill of skills) { skillRegistry.register(skill); } if (skills.length > 0) { const available = skillRegistry.listAvailable().length; console.log(`Loaded ${skills.length} skill(s) (${available} available)`); } // Initialize agent config registry and router const agentConfigRegistry = new AgentConfigRegistry(); if (config.agent_configs && Object.keys(config.agent_configs).length > 0) { agentConfigRegistry.loadFromConfig(config.agent_configs); console.log(`Loaded ${Object.keys(config.agent_configs).length} agent config(s)`); } const agentRouter = new AgentRouter(config.routing); // Initialize sandbox manager if Docker is available let sandboxManager: SandboxManager | undefined; if (config.sandbox.enabled) { const dockerAvailable = await DockerSandbox.isAvailable(); if (dockerAvailable) { sandboxManager = new SandboxManager(config.sandbox); console.log(`Docker sandbox enabled (image=${config.sandbox.image}, network=${config.sandbox.network})`); } else { console.warn('Docker sandbox enabled but Docker not available — falling back to host execution'); } } if (sandboxManager) { lifecycle.onShutdown(async () => { await sandboxManager!.destroyAll(); console.log('Docker sandboxes destroyed'); }); } // Initialize model router const modelRouter = createModelRouter(config); // Load system prompt and append skill instructions let systemPrompt = loadSystemPrompt(config); const skillAdditions = skillRegistry.getSystemPromptAdditions(); if (skillAdditions) { systemPrompt = `${systemPrompt}\n\n# Available Skills\n\n${skillAdditions}`; } // Initialize gateway WebSocket server const gateway = new GatewayServer({ port: config.server.port, host: config.server.localhost ? '127.0.0.1' : '0.0.0.0', sessionManager, modelClient: modelRouter, systemPrompt, toolRegistry, toolExecutor, auth: { token: config.server.token, tailscaleIdentity: config.server.tailscale_identity, }, authHttp: config.server.auth_http, uiDir: resolve(import.meta.dirname, '../gateway/ui'), config, restart: async () => { console.log('Restart requested via gateway'); await lifecycle.shutdown(); // Exit with code 75 (EX_TEMPFAIL) — process supervisor should restart process.exit(75); }, }); if (config.server.token) { console.log(`Gateway auth: token required${config.server.tailscale_identity ? ' + Tailscale identity' : ''}`); } // ── Channel Registry ────────────────────────────────────────── const channelRegistry = new ChannelRegistry(); // Set up the unified message handler channelRegistry.setMessageHandler(createMessageRouter({ sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor, config, memoryStore, agentConfigRegistry, agentRouter, sandboxManager, })); // Register Telegram adapter const telegramAdapter = new TelegramAdapter({ botToken: config.telegram.bot_token, allowedChatIds: config.telegram.allowed_chat_ids, requireMention: config.telegram.require_mention, hookEngine, }); channelRegistry.register(telegramAdapter); // Register Discord adapter (if configured) if (config.discord) { const discordAdapter = new DiscordAdapter({ botToken: config.discord.bot_token, allowedGuildIds: config.discord.allowed_guild_ids.length > 0 ? config.discord.allowed_guild_ids : undefined, allowedChannelIds: config.discord.allowed_channel_ids.length > 0 ? config.discord.allowed_channel_ids : undefined, requireMention: config.discord.require_mention, }); channelRegistry.register(discordAdapter); } // Register Slack adapter (if configured) if (config.slack) { const slackAdapter = new SlackAdapter({ botToken: config.slack.bot_token, appToken: config.slack.app_token, signingSecret: config.slack.signing_secret, allowedChannelIds: config.slack.allowed_channel_ids.length > 0 ? config.slack.allowed_channel_ids : undefined, requireMention: config.slack.require_mention, }); channelRegistry.register(slackAdapter); } // Register WhatsApp adapter (if configured) if (config.whatsapp) { const whatsappAdapter = new WhatsAppAdapter({ allowedNumbers: config.whatsapp.allowed_numbers.length > 0 ? config.whatsapp.allowed_numbers : undefined, allowedGroupIds: config.whatsapp.allowed_group_ids.length > 0 ? config.whatsapp.allowed_group_ids : undefined, requireMention: config.whatsapp.require_mention, dataDir: config.whatsapp.data_dir, }); channelRegistry.register(whatsappAdapter); } // Register WebChat adapter (wraps the gateway) const webChatAdapter = new WebChatAdapter({ gateway }); channelRegistry.register(webChatAdapter); // Register cron scheduler adapter (if any cron jobs configured) if (config.automation.cron.length > 0) { const cronScheduler = new CronScheduler(config.automation.cron, channelRegistry); channelRegistry.register(cronScheduler); console.log(`Registered ${config.automation.cron.length} cron job(s)`); } // ── Signal Handlers ─────────────────────────────────────────── const signalHandler = () => { lifecycle.shutdown().then(() => process.exit(0)); }; process.on('SIGINT', signalHandler); process.on('SIGTERM', signalHandler); lifecycle.onShutdown(async () => { process.off('SIGINT', signalHandler); process.off('SIGTERM', signalHandler); }); // ── Start Services ──────────────────────────────────────────── // Register shutdown handler for channels (stops Telegram bot etc.) lifecycle.onShutdown(async () => { await channelRegistry.stopAll(); console.log('Channel adapters stopped'); }); // Start all channel adapters (Telegram long polling, WebChat status) await channelRegistry.startAll(); // Start gateway (HTTP + WS server) lifecycle.onShutdown(async () => { await gateway.stop(); console.log('Gateway server stopped'); }); await gateway.start(); console.log('Flynn daemon started'); return { config, lifecycle, sessionStore, sessionManager, hookEngine, modelRouter, toolRegistry, toolExecutor, gateway, channelRegistry, mcpManager, skillRegistry, skillInstaller, agentConfigRegistry, agentRouter, sandboxManager, browserManager, }; } export { Lifecycle } from './lifecycle.js';