import type { AudioTranscriptionConfig } from '../models/media.js'; import type { Attachment } from '../channels/types.js'; import { isSupportedAudio, transcribeAudio } from '../models/media.js'; import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js'; import { OutboundAttachmentCollector } from '../backends/native/attachments.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { MemoryStore } from '../memory/index.js'; import type { Tool } from '../tools/types.js'; import { createMediaSendTool } from '../tools/index.js'; import { createSandboxedShellTool, createSandboxedProcessStartTool, SandboxManager } from '../sandbox/index.js'; import type { Config } from '../config/index.js'; import { ModelRouter, type ModelTier } from '../models/index.js'; import { ToolRegistry, ToolExecutor } from '../tools/index.js'; import { SessionManager } from '../session/index.js'; import { AgentConfigRegistry, AgentRouter } from '../agents/index.js'; /** * Create the unified message handler for the channel registry. * Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session. * The orchestrator wraps a NativeAgent and adds delegation to different model tiers. * * Returns both the message handler function and the agents map for usage tracking. */ export function createMessageRouter(deps: { sessionManager: SessionManager; modelRouter: ModelRouter; systemPrompt: string; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; config: Config; memoryStore?: MemoryStore; agentConfigRegistry?: AgentConfigRegistry; agentRouter?: AgentRouter; sandboxManager?: SandboxManager; audioConfig?: AudioTranscriptionConfig; }): { handler: (msg: InboundMessage, reply: (response: OutboundMessage) => Promise) => Promise; agents: Map; } { // Cache agents by session ID + agent config name to avoid recreating on every message const agents = new Map(); function getOrCreateAgent(channel: string, senderId: string, metadata?: Record): { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector } { // Resolve agent config name via routing (sender → channel → default fallback) const agentConfigName = deps.agentRouter?.resolve(channel, senderId); const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined; // Cron job tier wins over agent config tier const tierFromMetadata = metadata?.modelTier as ModelTier | undefined; // Include agent config name in cache key so different agents aren't shared const baseSid = agentConfigName ? `${channel}:${senderId}:${agentConfigName}` : `${channel}:${senderId}`; const sessionId = tierFromMetadata ? `${baseSid}:${tierFromMetadata}` : baseSid; let entry = agents.get(sessionId); if (!entry) { const session = deps.sessionManager.getSession(channel, senderId); // Use agent config overrides where available, falling back to global config const effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt; const effectiveTier = tierFromMetadata ?? agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default'; const effectiveProvider = deps.config.models.default.provider; const delegationConfig: DelegationConfig = { compaction: deps.config.agents.delegation.compaction ?? 'fast', memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast', classification: deps.config.agents.delegation.classification ?? 'fast', tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast', complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex', }; // Clone the tool registry and replace shell tools with sandboxed versions if configured let effectiveToolRegistry = deps.toolRegistry; if (agentConfig?.sandbox && deps.sandboxManager && deps.config.sandbox.enabled) { effectiveToolRegistry = deps.toolRegistry.clone(); // Lazy sandbox: create the sandboxed tools with a deferred sandbox reference // The sandbox is created on first use via SandboxManager.getOrCreate() const sandboxSessionId = sessionId; const sandboxManager = deps.sandboxManager; // Create a proxy sandbox that lazily initializes const lazySandboxShell: Tool = { name: 'shell.exec', description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.', inputSchema: { type: 'object', properties: { command: { type: 'string', description: 'The shell command to execute' }, cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' }, }, required: ['command'], }, execute: async (rawArgs: unknown) => { const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); const tool = createSandboxedShellTool(sandbox); return tool.execute(rawArgs); }, }; const lazySandboxProcess: Tool = { name: 'process.start', description: 'Start a command in the background inside a sandboxed container.', inputSchema: { type: 'object', properties: { command: { type: 'string', description: 'The shell command to run in the background' }, cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, }, required: ['command'], }, execute: async (rawArgs: unknown) => { const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); const tool = createSandboxedProcessStartTool(sandbox); return tool.execute(rawArgs); }, }; effectiveToolRegistry.replace(lazySandboxShell); effectiveToolRegistry.replace(lazySandboxProcess); } // Create an attachment collector for this agent session const collector = new OutboundAttachmentCollector(); // Clone the tool registry to register the media.send tool bound to this collector effectiveToolRegistry = effectiveToolRegistry.clone(); effectiveToolRegistry.register(createMediaSendTool(collector)); const orchestrator = new AgentOrchestrator({ modelRouter: deps.modelRouter, systemPrompt: effectiveSystemPrompt, session, toolRegistry: effectiveToolRegistry, toolExecutor: deps.toolExecutor, primaryTier: effectiveTier, delegation: delegationConfig, maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, maxIterations: deps.config.agents.max_iterations, compaction: deps.config.compaction.enabled ? { thresholdPct: deps.config.compaction.threshold_pct, keepTurns: deps.config.compaction.keep_turns, summaryMaxTokens: deps.config.compaction.summary_max_tokens, } : undefined, modelName: deps.config.models.default.model, contextWindow: deps.config.models.default.context_window, memoryStore: deps.memoryStore, toolPolicyContext: { agent: effectiveTier, provider: effectiveProvider, }, attachmentCollector: collector, }); entry = { orchestrator, collector }; agents.set(sessionId, entry); } return entry; } const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { const { orchestrator: agent, collector } = getOrCreateAgent(msg.channel, msg.senderId, msg.metadata); // Handle special commands if (msg.metadata?.isCommand) { if (msg.metadata.command === 'reset') { agent.reset(); return; } if (msg.metadata.command === 'compact') { const result = await agent.compact(); if (result && result.compactedCount > 0) { await reply({ text: `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`, replyTo: msg.id, }); } else { await reply({ text: 'Nothing to compact.', replyTo: msg.id, }); } return; } if (msg.metadata.command === 'usage') { const usage = agent.getUsage(); const lines = [ '**Token Usage**', '', `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, ]; const delegationEntries = Object.entries(usage.delegation); if (delegationEntries.length > 0) { lines.push(''); lines.push('Delegation:'); for (const [tier, stats] of delegationEntries) { lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); } } lines.push(''); lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); if (usage.total.estimatedCost > 0) { lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); } await reply({ text: lines.join('\n'), replyTo: msg.id }); return; } } try { // Transcribe audio attachments before processing let messageText = msg.text; const audioAttachments = (msg.attachments ?? []).filter((a: Attachment) => isSupportedAudio(a)); if (audioAttachments.length > 0 && deps.audioConfig) { for (const att of audioAttachments) { const transcript = await transcribeAudio(att, deps.audioConfig); messageText = `[Voice message]: ${transcript}\n\n${messageText}`; } } const response = await agent.process(messageText, msg.attachments); const outboundAttachments = collector.drain(); await reply({ text: response, replyTo: msg.id, attachments: outboundAttachments.length > 0 ? outboundAttachments : undefined, }); } catch (error) { console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); await reply({ text: 'Sorry, an error occurred while processing your message.', replyTo: msg.id, }); } }; return { handler, agents }; }