import type { AudioTranscriptionConfig } from '../models/media.js'; import type { Attachment } from '../channels/types.js'; import { isSupportedAudio, transcribeAudio } from '../models/media.js'; import { supportsAudioInput } from '../models/capabilities.js'; import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js'; import { OutboundAttachmentCollector } from '../backends/native/attachments.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { MemoryStore } from '../memory/index.js'; import type { Tool } from '../tools/types.js'; import { createMediaSendTool } from '../tools/index.js'; import { createSandboxedShellTool, createSandboxedProcessStartTool, SandboxManager } from '../sandbox/index.js'; import { MODEL_PROVIDERS, type Config, type ModelConfig, type ModelProvider } from '../config/index.js'; import { ModelRouter, type ModelTier } from '../models/index.js'; import { ToolRegistry, ToolExecutor } from '../tools/index.js'; import { SessionManager } from '../session/index.js'; import { AgentConfigRegistry, AgentRouter } from '../agents/index.js'; import type { CommandRegistry } from '../commands/index.js'; import type { ComponentRegistry } from '../intents/index.js'; import type { RoutingPolicy } from '../routing/index.js'; import { createClientFromConfig } from './models.js'; import type { SkillRegistry } from '../skills/index.js'; import { auditLogger } from '../audit/index.js'; import { randomUUID } from 'crypto'; function buildProviderConfigMap(config: Config): Partial> { const providerConfigs: Partial> = {}; const modelConfigs: ModelConfig[] = [ config.models.default, ...(config.models.fast ? [config.models.fast] : []), ...(config.models.complex ? [config.models.complex] : []), ...(config.models.local ? [config.models.local] : []), ...Object.values(config.models.local_providers ?? {}), ]; for (const modelConfig of modelConfigs) { providerConfigs[modelConfig.provider] = modelConfig; if (modelConfig.fallback) { providerConfigs[modelConfig.fallback.provider] = modelConfig.fallback; } } return providerConfigs; } /** * Create the unified message handler for the channel registry. * Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session. * The orchestrator wraps a NativeAgent and adds delegation to different model tiers. * * Returns both the message handler function and the agents map for usage tracking. */ export function createMessageRouter(deps: { sessionManager: SessionManager; modelRouter: ModelRouter; systemPrompt: string; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; config: Config; memoryStore?: MemoryStore; agentConfigRegistry?: AgentConfigRegistry; agentRouter?: AgentRouter; sandboxManager?: SandboxManager; commandRegistry?: CommandRegistry; intentRegistry?: ComponentRegistry; routingPolicy?: RoutingPolicy; skillRegistry?: SkillRegistry; }): { handler: (msg: InboundMessage, reply: (response: OutboundMessage) => Promise) => Promise; agents: Map; } { // Cache agents by session ID + agent config name to avoid recreating on every message const agents = new Map(); function getOrCreateAgent(channel: string, senderId: string, metadata?: Record, agentOverride?: string): { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector } { // Resolve agent config name via routing (sender → channel → default fallback) const agentConfigName = agentOverride ?? deps.agentRouter?.resolve(channel, senderId); const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined; // Cron job tier wins over agent config tier const tierFromMetadata = metadata?.modelTier as ModelTier | undefined; // Include agent config name in cache key so different agents aren't shared let skillOverride = metadata?.skillOverride as string | undefined; if (skillOverride && deps.skillRegistry) { const s = deps.skillRegistry.get(skillOverride); if (!s || !s.available) { skillOverride = undefined; } } const baseSid = agentConfigName || skillOverride ? `${channel}:${senderId}:${agentConfigName ?? 'default'}:${skillOverride ?? 'none'}` : `${channel}:${senderId}`; const session = deps.sessionManager.getSession(channel, senderId); // Read per-session model tier override (persisted in SQLite) const sessionTierOverride = session.getConfig('modelTier') as ModelTier | undefined; // Resolution chain: metadata (cron) → session override → agent config → global default const effectiveTier = tierFromMetadata ?? sessionTierOverride ?? agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default'; // Cache agents by tier too so switching tiers updates context-window heuristics. const sessionId = `${baseSid}:${effectiveTier}`; let entry = agents.get(sessionId); if (!entry) { // Use agent config overrides where available, falling back to global config let effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt; // If an active skill is specified, annotate the system prompt for clarity. const activeSkillName = skillOverride; const activeSkill = activeSkillName ? deps.skillRegistry?.get(activeSkillName) : undefined; if (activeSkillName) { effectiveSystemPrompt += `\n\n[Active skill: ${activeSkillName}. Tool access is capability-restricted and may be sandboxed.]`; } const modelsConfig = deps.config.models as Record; const tierConfig = modelsConfig[effectiveTier] ?? deps.config.models.default; const effectiveProvider = tierConfig?.provider ?? deps.config.models.default.provider; const effectiveModelName = tierConfig?.model ?? deps.config.models.default.model; const effectiveContextWindow = tierConfig?.context_window ?? deps.config.models.default.context_window; const delegationConfig: DelegationConfig = { compaction: deps.config.agents.delegation.compaction ?? 'fast', memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast', classification: deps.config.agents.delegation.classification ?? 'fast', tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast', complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex', }; // Clone the tool registry and replace high-risk tools with sandboxed versions if configured. let effectiveToolRegistry = deps.toolRegistry; const skillEnvPreference = activeSkill?.manifest.permissions?.execution_environment; const executionEnvironment: 'host' | 'sandbox' = skillOverride ? (skillEnvPreference === 'host' ? 'host' : (deps.sandboxManager && deps.config.sandbox.enabled ? 'sandbox' : 'host')) : 'host'; const useSandboxTools = executionEnvironment === 'sandbox' && deps.sandboxManager && deps.config.sandbox.enabled; if ((agentConfig?.sandbox || Boolean(skillOverride)) && useSandboxTools) { effectiveToolRegistry = deps.toolRegistry.clone(); // Lazy sandbox: create the sandboxed tools with a deferred sandbox reference // The sandbox is created on first use via SandboxManager.getOrCreate() const sandboxSessionId = sessionId; const sandboxManager = deps.sandboxManager!; // Create a proxy sandbox that lazily initializes const lazySandboxShell: Tool = { name: 'shell.exec', description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.', inputSchema: { type: 'object', properties: { command: { type: 'string', description: 'The shell command to execute' }, cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' }, }, required: ['command'], }, execute: async (rawArgs: unknown) => { const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); const tool = createSandboxedShellTool(sandbox); return tool.execute(rawArgs); }, }; const lazySandboxProcess: Tool = { name: 'process.start', description: 'Start a command in the background inside a sandboxed container.', inputSchema: { type: 'object', properties: { command: { type: 'string', description: 'The shell command to run in the background' }, cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, }, required: ['command'], }, execute: async (rawArgs: unknown) => { const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); const tool = createSandboxedProcessStartTool(sandbox); return tool.execute(rawArgs); }, }; effectiveToolRegistry.replace(lazySandboxShell); effectiveToolRegistry.replace(lazySandboxProcess); } // Create an attachment collector for this agent session const collector = new OutboundAttachmentCollector(); // Clone the tool registry to register the media.send tool bound to this collector effectiveToolRegistry = effectiveToolRegistry.clone(); effectiveToolRegistry.register(createMediaSendTool(collector)); const orchestrator = new AgentOrchestrator({ modelRouter: deps.modelRouter, systemPrompt: effectiveSystemPrompt, session, toolRegistry: effectiveToolRegistry, toolExecutor: deps.toolExecutor, primaryTier: effectiveTier, delegation: delegationConfig, maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, maxIterations: deps.config.agents.max_iterations, compaction: deps.config.compaction.enabled ? { thresholdPct: deps.config.compaction.threshold_pct, keepTurns: deps.config.compaction.keep_turns, summaryMaxTokens: deps.config.compaction.summary_max_tokens, importanceThreshold: deps.config.compaction.importance_threshold, } : undefined, modelName: effectiveModelName, contextWindow: effectiveContextWindow, memoryStore: deps.memoryStore, memoryAutoExtract: deps.config.memory?.auto_extract, memoryInjectionStrategy: deps.config.memory?.injection_strategy, memoryMaxInjectionTokens: deps.config.memory?.max_injection_tokens, toolPolicyContext: { agent: effectiveTier, provider: effectiveProvider, sessionId: session.id, channel, sender: senderId, tier: effectiveTier, autonomyLevel: deps.config.agents.autonomy_level ?? 'standard', skillName: activeSkillName, skillPermissions: activeSkill?.manifest.permissions, allowedSecretScopes: activeSkill?.manifest.permissions?.secrets, executionEnvironment, }, attachmentCollector: collector, }); entry = { orchestrator, collector }; agents.set(sessionId, entry); } return entry; } const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { let intentAgentOverride: string | undefined; let intentSkillOverride: string | undefined; if (deps.config.intents?.enabled && deps.intentRegistry) { const intentMatch = deps.intentRegistry.match(msg.text); if (intentMatch?.rule.target.type === 'agent') { let confidence = intentMatch.score; if (deps.config.history_index?.enabled) { const historySessionId = `${msg.channel}:${msg.senderId}`; const historyHits = deps.sessionManager.searchHistory(msg.text, { sessionId: historySessionId, limit: 1, }); if (historyHits.length > 0 && historyHits[0].score >= (deps.config.history_index.min_score ?? 0.15)) { confidence = Math.min(1, confidence + (deps.config.history_index.routing_boost ?? 0.05)); } } const decision = deps.routingPolicy ? deps.routingPolicy.decide({ confidence }) : { path: 'fast' as const, reason: 'high_confidence' as const }; console.log(`[routing] intent=${intentMatch.rule.name} confidence=${confidence.toFixed(3)} path=${decision.path} reason=${decision.reason}`); if (decision.path === 'fast') { intentAgentOverride = intentMatch.rule.target.name; } } if (intentMatch?.rule.target.type === 'skill') { let confidence = intentMatch.score; const decision = deps.routingPolicy ? deps.routingPolicy.decide({ confidence }) : { path: 'fast' as const, reason: 'high_confidence' as const }; console.log(`[routing] intent=${intentMatch.rule.name} confidence=${confidence.toFixed(3)} path=${decision.path} reason=${decision.reason}`); if (decision.path === 'fast') { intentSkillOverride = intentMatch.rule.target.name; } } } const effectiveMetadata = { ...(msg.metadata ?? {}), ...(intentSkillOverride ? { skillOverride: intentSkillOverride } : {}), }; const { orchestrator: agent, collector } = getOrCreateAgent(msg.channel, msg.senderId, effectiveMetadata, intentAgentOverride); const commandInput = msg.metadata?.isCommand && typeof msg.metadata.command === 'string' ? `/${msg.metadata.command}${msg.metadata.commandArgs ? ` ${msg.metadata.commandArgs}` : ''}` : msg.text; if (deps.commandRegistry && deps.commandRegistry.isCommand(commandInput)) { const session = deps.sessionManager.getSession(msg.channel, msg.senderId); const commandResult = await deps.commandRegistry.execute(commandInput, { channel: msg.channel, senderId: msg.senderId, sessionId: session.id, rawInput: commandInput, services: { getStatus: () => `Flynn is running. Active model tier: ${agent.getModelTier()}`, getUsage: () => { const usage = agent.getUsage(); const lines = [ '**Token Usage**', '', `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, ]; const delegationEntries = Object.entries(usage.delegation); if (delegationEntries.length > 0) { lines.push(''); lines.push('Delegation:'); for (const [tier, stats] of delegationEntries) { lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); } } lines.push(''); lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); if (usage.total.estimatedCost > 0) { lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); } return lines.join('\n'); }, getModel: () => { const currentTier = agent.getModelTier(); const sessionOverride = session.getConfig('modelTier'); const available = deps.modelRouter.getAvailableTiers(); const labels = deps.modelRouter.getAllLabels(); const lines = [`Active tier: ${currentTier}${sessionOverride ? ' (session override)' : ''}`]; for (const tier of available) { const label = labels[tier] ?? 'unknown'; const marker = tier === currentTier ? ' ←' : ''; lines.push(` ${tier}: ${label}${marker}`); } return lines.join('\n'); }, setModel: (tier) => { const raw = tier.trim(); if (!raw) { return 'Usage: /model OR /model OR /model reset'; } const parts = raw.split(/\s+/); const requestedTier = parts[0]; const validTiers = deps.modelRouter.getAvailableTiers(); if (!validTiers.includes(requestedTier as ModelTier)) { return `Model tier not available: ${requestedTier}`; } const modelTier = requestedTier as ModelTier; // /model if (parts.length === 1) { session.setConfig('modelTier', modelTier); agent.setModelTier(modelTier); const label = deps.modelRouter.getLabel(modelTier); return `Switched to model: ${modelTier} (${label})`; } const arg2 = parts[1]; // /model reset — restore configured provider/model and re-enable fallbacks if (arg2.toLowerCase() === 'reset') { const configured: ModelConfig | undefined = modelTier === 'default' ? deps.config.models.default : modelTier === 'fast' ? deps.config.models.fast : modelTier === 'complex' ? deps.config.models.complex : modelTier === 'local' ? deps.config.models.local : undefined; if (!configured) { return `No configured model for tier: ${modelTier}`; } const client = createClientFromConfig(configured); const label = `${configured.provider}/${configured.model}`; deps.modelRouter.setClient(modelTier, client, label); deps.modelRouter.setTierStrict(modelTier, false); session.setConfig('modelTier', modelTier); agent.setModelTier(modelTier); return `Reset ${modelTier} to: ${label}`; } // /model const providerModel = arg2; if (!providerModel.includes('/')) { return 'Invalid format. Use: /model (e.g. /model default github/gpt-5-mini)'; } const slashIdx = providerModel.indexOf('/'); const provider = providerModel.slice(0, slashIdx); const model = providerModel.slice(slashIdx + 1); if (!MODEL_PROVIDERS.includes(provider as ModelProvider)) { return `Unknown provider "${provider}". Known providers: ${MODEL_PROVIDERS.join(', ')}`; } const providerType = provider as ModelProvider; const providerConfigs = buildProviderConfigMap(deps.config); const template = providerConfigs[providerType]; try { const client = createClientFromConfig( template ? { ...template, provider: providerType, model } : { provider: providerType, model }, ); deps.modelRouter.setClient(modelTier, client, providerModel); deps.modelRouter.setTierStrict(modelTier, true); session.setConfig('modelTier', modelTier); agent.setModelTier(modelTier); const lines = [ `Set ${modelTier} to: ${providerModel}`, `Fallbacks for ${modelTier} disabled (strict tier mode).`, ]; if (parts.length > 2) { lines.push(`Note: ignored extra args: ${parts.slice(2).join(' ')}`); } return lines.join('\n'); } catch (error) { const message = error instanceof Error ? error.message : String(error); return `Failed to switch ${modelTier} to ${providerModel}: ${message}`; } }, compact: async () => { const result = await agent.compact(); if (result && result.compactedCount > 0) { return `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`; } return 'Nothing to compact.'; }, reset: () => { agent.reset(); session.deleteConfig('modelTier'); return ''; }, getElevation: () => { const untilRaw = session.getConfig('elevation.until_ms'); const reason = session.getConfig('elevation.reason') ?? ''; const id = session.getConfig('elevation.id') ?? ''; if (!untilRaw || !id) { return 'Elevated mode: off'; } const untilMs = Number.parseInt(untilRaw, 10); if (!Number.isFinite(untilMs)) { return 'Elevated mode: off'; } const now = Date.now(); if (untilMs <= now) { session.deleteConfig('elevation.until_ms'); session.deleteConfig('elevation.reason'); session.deleteConfig('elevation.id'); auditLogger?.securityElevationExpired({ session_id: session.id, channel: msg.channel, sender: msg.senderId, elevation_id: id, until_ms: untilMs, reason: reason || undefined, }); return 'Elevated mode: off (expired)'; } const remainingMs = untilMs - now; const remainingSec = Math.ceil(remainingMs / 1000); return `Elevated mode: on (${remainingSec}s remaining)${reason ? ` — ${reason}` : ''}`; }, setElevation: (input: string) => { const raw = input.trim(); const parts = raw.split(/\s+/); const hasYes = parts.includes('--yes') || parts.includes('--confirm'); const filtered = parts.filter(p => p !== '--yes' && p !== '--confirm'); if (filtered.length === 0) { return 'Usage: /elevate --yes | /elevate off --yes'; } if (filtered[0] === 'off') { if (!hasYes) { return 'Refusing to disable elevation without explicit confirmation. Use: /elevate off --yes'; } const existingId = session.getConfig('elevation.id') ?? randomUUID(); const existingUntil = session.getConfig('elevation.until_ms'); const existingReason = session.getConfig('elevation.reason') ?? ''; session.deleteConfig('elevation.until_ms'); session.deleteConfig('elevation.reason'); session.deleteConfig('elevation.id'); auditLogger?.securityElevationDisabled({ session_id: session.id, channel: msg.channel, sender: msg.senderId, elevation_id: existingId, until_ms: existingUntil ? Number.parseInt(existingUntil, 10) : undefined, reason: existingReason || undefined, }); return 'Elevated mode: off'; } if (!hasYes) { return 'Refusing to enable elevation without explicit confirmation. Use: /elevate --yes'; } const dur = filtered[0]; const reason = filtered.slice(1).join(' ').trim(); const ttlMs = (() => { const m = dur.match(/^(\d+)([smhd])$/i); if (!m) { return null; } const n = Number.parseInt(m[1], 10); if (!Number.isFinite(n) || n <= 0) { return null; } const unit = m[2].toLowerCase(); if (unit === 's') {return n * 1000;} if (unit === 'm') {return n * 60_000;} if (unit === 'h') {return n * 3_600_000;} if (unit === 'd') {return n * 86_400_000;} return null; })(); if (!ttlMs) { return 'Invalid duration. Use one of: 30s, 10m, 1h, 1d'; } const untilMs = Date.now() + ttlMs; const id = randomUUID(); session.setConfig('elevation.until_ms', String(untilMs)); session.setConfig('elevation.id', id); if (reason) { session.setConfig('elevation.reason', reason); } else { session.deleteConfig('elevation.reason'); } auditLogger?.securityElevationEnabled({ session_id: session.id, channel: msg.channel, sender: msg.senderId, elevation_id: id, until_ms: untilMs, ttl_ms: ttlMs, reason: reason || undefined, }); return `Elevated mode: on until ${new Date(untilMs).toISOString()}`; }, }, }); if (commandResult.handled) { if (commandResult.text.trim()) { await reply({ text: commandResult.text, replyTo: msg.id }); } return; } } try { // Determine if the active model supports native audio input let effectiveTier: string = deps.config.agents.primary_tier ?? 'default'; const session = deps.sessionManager.getSession(msg.channel, msg.senderId); const sessionTierOverride = session.getConfig('modelTier'); if (msg.metadata?.modelTier) { effectiveTier = msg.metadata.modelTier as string; } else if (sessionTierOverride) { effectiveTier = sessionTierOverride; } else if (deps.agentRouter && deps.agentConfigRegistry) { const agentName = deps.agentRouter.resolve(msg.channel, msg.senderId); if (agentName) { const agentCfg = deps.agentConfigRegistry.get(agentName); if (agentCfg?.modelTier) { effectiveTier = agentCfg.modelTier; } } } // Look up provider/model for the effective tier const modelsConfig = deps.config.models as Record; const tierConfig = modelsConfig[effectiveTier] ?? deps.config.models.default; const modelProvider = tierConfig?.provider ?? deps.config.models.default.provider; const modelName = tierConfig?.model ?? deps.config.models.default.model; const supportsAudioOverride = (tierConfig as Record | undefined)?.supports_audio as boolean | undefined; const nativeAudioSupported = supportsAudioInput(modelProvider, modelName, supportsAudioOverride); let messageText = msg.text; let attachments = msg.attachments; const audioAttachments = (msg.attachments ?? []).filter((a: Attachment) => isSupportedAudio(a)); if (audioAttachments.length > 0 && !nativeAudioSupported) { // Model doesn't support native audio — transcribe via Whisper and strip audio attachments const audioConfig: AudioTranscriptionConfig | undefined = deps.config.audio?.enabled && deps.config.audio.provider ? { endpoint: deps.config.audio.provider.endpoint, apiKey: deps.config.audio.provider.api_key, model: deps.config.audio.provider.model, } : undefined; if (!audioConfig?.endpoint) { // Without transcription, we cannot safely send audio to a non-audio-capable model. // Fast-path a deterministic, user-friendly reply instead of invoking the agent loop. await reply({ text: [ 'I received your voice message, but I cannot transcribe it yet because audio transcription is not configured.', '', 'To enable voice messages, set `audio.enabled: true` and configure an `audio.provider` in `config.yaml` (OpenAI/Groq/custom Whisper-compatible `/v1/audio/transcriptions`).', '', 'Workarounds:', '1. Paste the transcription text.', '2. Upload the audio file somewhere and send me a direct URL.', ].join('\n'), replyTo: msg.id, }); return; } for (const att of audioAttachments) { const transcript = await transcribeAudio(att, audioConfig); messageText = `[Voice message]: ${transcript}\n\n${messageText}`; } // Remove audio attachments so buildUserMessage doesn't create audio content parts attachments = (msg.attachments ?? []).filter((a: Attachment) => !isSupportedAudio(a)); if (attachments.length === 0) { attachments = undefined; } } // If native audio IS supported, we pass attachments through unchanged — // buildUserMessage() in the agent will create native audio content parts const response = await agent.process(messageText, attachments); const outboundAttachments = collector.drain(); await reply({ text: response, replyTo: msg.id, attachments: outboundAttachments.length > 0 ? outboundAttachments : undefined, }); } catch (error) { console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); await reply({ text: 'Sorry, an error occurred while processing your message.', replyTo: msg.id, }); } }; return { handler, agents }; }