From 08f5b6b8e70984db44324f8420cbabafe1708aea Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 9 Feb 2026 20:09:28 -0800 Subject: [PATCH] feat(01-02): extract message routing into src/daemon/routing.ts - Move createMessageRouter function (~220 lines) to dedicated routing module - Add import from ./routing.js in daemon/index.ts - routing.test.ts passes without modification - Zero type errors --- src/daemon/index.ts | 237 +---------------------------------------- src/daemon/routing.ts | 239 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 244 insertions(+), 232 deletions(-) create mode 100644 src/daemon/routing.ts diff --git a/src/daemon/index.ts b/src/daemon/index.ts index cf84114..1deabf6 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -1,16 +1,14 @@ import { Lifecycle } from './lifecycle.js'; -import { createClientFromConfig, anthropicToGitHubModel, createAutoFallbackClient, createModelRouter } from './models.js'; +import { createModelRouter } from './models.js'; +import { createMessageRouter } from './routing.js'; import type { Config } from '../config/index.js'; import type { AudioTranscriptionConfig } from '../models/media.js'; -import type { Attachment } from '../channels/types.js'; -import { isSupportedAudio, transcribeAudio } from '../models/media.js'; import { ModelRouter } from '../models/index.js'; -import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js'; +import { AgentOrchestrator } from '../backends/index.js'; import { OutboundAttachmentCollector } from '../backends/native/attachments.js'; import { SessionStore, SessionManager, parseDuration } from '../session/index.js'; import { HookEngine } from '../hooks/index.js'; -import { ToolRegistry, ToolExecutor, ToolPolicy, allBuiltinTools, createWebSearchTools, createProcessTools, ProcessManager, BrowserManager, createBrowserTools, createMediaSendTool, createSessionTools, createAgentsListTool, createMessageSendTool, createCronTools } from '../tools/index.js'; -import type { Tool } from '../tools/types.js'; +import { ToolRegistry, ToolExecutor, ToolPolicy, allBuiltinTools, createWebSearchTools, createProcessTools, ProcessManager, BrowserManager, createBrowserTools, createSessionTools, createAgentsListTool, createMessageSendTool, createCronTools } from '../tools/index.js'; import { MemoryStore } from '../memory/index.js'; import { VectorStore, HybridSearch, createEmbeddingProvider, chunkText, contentHash } from '../memory/index.js'; import type { EmbeddingProvider as EmbeddingProviderInterface } from '../memory/index.js'; @@ -18,12 +16,11 @@ import { createMemoryTools } from '../tools/builtin/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter, PairingManager } from '../channels/index.js'; import { CronScheduler, WebhookHandler, HeartbeatMonitor, GmailWatcher } from '../automation/index.js'; -import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; import { assembleSystemPrompt } from '../prompt/index.js'; import { AgentConfigRegistry, AgentRouter } from '../agents/index.js'; -import { DockerSandbox, SandboxManager, createSandboxedShellTool, createSandboxedProcessStartTool } from '../sandbox/index.js'; +import { DockerSandbox, SandboxManager } from '../sandbox/index.js'; import { resolve } from 'path'; import { homedir } from 'os'; import { mkdirSync } from 'fs'; @@ -67,230 +64,6 @@ function loadSystemPrompt(config: Config): string { return result.prompt; } -/** - * Create the unified message handler for the channel registry. - * Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session. - * The orchestrator wraps a NativeAgent and adds delegation to different model tiers. - * - * Returns both the message handler function and the agents map for usage tracking. - */ -function createMessageRouter(deps: { - sessionManager: SessionManager; - modelRouter: ModelRouter; - systemPrompt: string; - toolRegistry: ToolRegistry; - toolExecutor: ToolExecutor; - config: Config; - memoryStore?: MemoryStore; - agentConfigRegistry?: AgentConfigRegistry; - agentRouter?: AgentRouter; - sandboxManager?: SandboxManager; - audioConfig?: AudioTranscriptionConfig; -}): { - handler: (msg: InboundMessage, reply: (response: OutboundMessage) => Promise) => Promise; - agents: Map; -} { - // Cache agents by session ID + agent config name to avoid recreating on every message - const agents = new Map(); - - function getOrCreateAgent(channel: string, senderId: string): { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector } { - // Resolve agent config name via routing (sender → channel → default fallback) - const agentConfigName = deps.agentRouter?.resolve(channel, senderId); - const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined; - - // Include agent config name in cache key so different agents aren't shared - const sessionId = agentConfigName - ? `${channel}:${senderId}:${agentConfigName}` - : `${channel}:${senderId}`; - - let entry = agents.get(sessionId); - if (!entry) { - const session = deps.sessionManager.getSession(channel, senderId); - - // Use agent config overrides where available, falling back to global config - const effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt; - const effectiveTier = agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default'; - const effectiveProvider = deps.config.models.default.provider; - - const delegationConfig: DelegationConfig = { - compaction: deps.config.agents.delegation.compaction ?? 'fast', - memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast', - classification: deps.config.agents.delegation.classification ?? 'fast', - tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast', - complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex', - }; - - // Clone the tool registry and replace shell tools with sandboxed versions if configured - let effectiveToolRegistry = deps.toolRegistry; - if (agentConfig?.sandbox && deps.sandboxManager && deps.config.sandbox.enabled) { - effectiveToolRegistry = deps.toolRegistry.clone(); - // Lazy sandbox: create the sandboxed tools with a deferred sandbox reference - // The sandbox is created on first use via SandboxManager.getOrCreate() - const sandboxSessionId = sessionId; - const sandboxManager = deps.sandboxManager; - - // Create a proxy sandbox that lazily initializes - const lazySandboxShell: Tool = { - name: 'shell.exec', - description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.', - inputSchema: { - type: 'object', - properties: { - command: { type: 'string', description: 'The shell command to execute' }, - cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, - timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' }, - }, - required: ['command'], - }, - execute: async (rawArgs: unknown) => { - const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); - const tool = createSandboxedShellTool(sandbox); - return tool.execute(rawArgs); - }, - }; - - const lazySandboxProcess: Tool = { - name: 'process.start', - description: 'Start a command in the background inside a sandboxed container.', - inputSchema: { - type: 'object', - properties: { - command: { type: 'string', description: 'The shell command to run in the background' }, - cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, - }, - required: ['command'], - }, - execute: async (rawArgs: unknown) => { - const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); - const tool = createSandboxedProcessStartTool(sandbox); - return tool.execute(rawArgs); - }, - }; - - effectiveToolRegistry.replace(lazySandboxShell); - effectiveToolRegistry.replace(lazySandboxProcess); - } - - // Create an attachment collector for this agent session - const collector = new OutboundAttachmentCollector(); - - // Clone the tool registry to register the media.send tool bound to this collector - effectiveToolRegistry = effectiveToolRegistry.clone(); - effectiveToolRegistry.register(createMediaSendTool(collector)); - - const orchestrator = new AgentOrchestrator({ - modelRouter: deps.modelRouter, - systemPrompt: effectiveSystemPrompt, - session, - toolRegistry: effectiveToolRegistry, - toolExecutor: deps.toolExecutor, - primaryTier: effectiveTier, - delegation: delegationConfig, - maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, - compaction: deps.config.compaction.enabled ? { - thresholdPct: deps.config.compaction.threshold_pct, - keepTurns: deps.config.compaction.keep_turns, - summaryMaxTokens: deps.config.compaction.summary_max_tokens, - } : undefined, - modelName: deps.config.models.default.model, - contextWindow: deps.config.models.default.context_window, - memoryStore: deps.memoryStore, - toolPolicyContext: { - agent: effectiveTier, - provider: effectiveProvider, - }, - attachmentCollector: collector, - }); - entry = { orchestrator, collector }; - agents.set(sessionId, entry); - } - return entry; - } - - const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { - const { orchestrator: agent, collector } = getOrCreateAgent(msg.channel, msg.senderId); - - // Handle special commands - if (msg.metadata?.isCommand) { - if (msg.metadata.command === 'reset') { - agent.reset(); - return; - } - if (msg.metadata.command === 'compact') { - const result = await agent.compact(); - if (result && result.compactedCount > 0) { - await reply({ - text: `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`, - replyTo: msg.id, - }); - } else { - await reply({ - text: 'Nothing to compact.', - replyTo: msg.id, - }); - } - return; - } - if (msg.metadata.command === 'usage') { - const usage = agent.getUsage(); - const lines = [ - '**Token Usage**', - '', - `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, - ]; - - const delegationEntries = Object.entries(usage.delegation); - if (delegationEntries.length > 0) { - lines.push(''); - lines.push('Delegation:'); - for (const [tier, stats] of delegationEntries) { - lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); - } - } - - lines.push(''); - lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); - - if (usage.total.estimatedCost > 0) { - lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); - } - - await reply({ text: lines.join('\n'), replyTo: msg.id }); - return; - } - } - - try { - // Transcribe audio attachments before processing - let messageText = msg.text; - const audioAttachments = (msg.attachments ?? []).filter((a: Attachment) => isSupportedAudio(a)); - - if (audioAttachments.length > 0 && deps.audioConfig) { - for (const att of audioAttachments) { - const transcript = await transcribeAudio(att, deps.audioConfig); - messageText = `[Voice message]: ${transcript}\n\n${messageText}`; - } - } - - const response = await agent.process(messageText, msg.attachments); - const outboundAttachments = collector.drain(); - await reply({ - text: response, - replyTo: msg.id, - attachments: outboundAttachments.length > 0 ? outboundAttachments : undefined, - }); - } catch (error) { - console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); - await reply({ - text: 'Sorry, an error occurred while processing your message.', - replyTo: msg.id, - }); - } - }; - - return { handler, agents }; -} - export async function startDaemon(config: Config): Promise { const lifecycle = new Lifecycle(); diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts new file mode 100644 index 0000000..30bf3b0 --- /dev/null +++ b/src/daemon/routing.ts @@ -0,0 +1,239 @@ +import type { AudioTranscriptionConfig } from '../models/media.js'; +import type { Attachment } from '../channels/types.js'; +import { isSupportedAudio, transcribeAudio } from '../models/media.js'; +import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js'; +import { OutboundAttachmentCollector } from '../backends/native/attachments.js'; +import type { InboundMessage, OutboundMessage } from '../channels/index.js'; +import { MemoryStore } from '../memory/index.js'; +import type { Tool } from '../tools/types.js'; +import { createMediaSendTool } from '../tools/index.js'; +import { createSandboxedShellTool, createSandboxedProcessStartTool, SandboxManager } from '../sandbox/index.js'; +import type { Config } from '../config/index.js'; +import { ModelRouter } from '../models/index.js'; +import { ToolRegistry, ToolExecutor } from '../tools/index.js'; +import { SessionManager } from '../session/index.js'; +import { AgentConfigRegistry, AgentRouter } from '../agents/index.js'; + +/** + * Create the unified message handler for the channel registry. + * Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session. + * The orchestrator wraps a NativeAgent and adds delegation to different model tiers. + * + * Returns both the message handler function and the agents map for usage tracking. + */ +export function createMessageRouter(deps: { + sessionManager: SessionManager; + modelRouter: ModelRouter; + systemPrompt: string; + toolRegistry: ToolRegistry; + toolExecutor: ToolExecutor; + config: Config; + memoryStore?: MemoryStore; + agentConfigRegistry?: AgentConfigRegistry; + agentRouter?: AgentRouter; + sandboxManager?: SandboxManager; + audioConfig?: AudioTranscriptionConfig; +}): { + handler: (msg: InboundMessage, reply: (response: OutboundMessage) => Promise) => Promise; + agents: Map; +} { + // Cache agents by session ID + agent config name to avoid recreating on every message + const agents = new Map(); + + function getOrCreateAgent(channel: string, senderId: string): { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector } { + // Resolve agent config name via routing (sender → channel → default fallback) + const agentConfigName = deps.agentRouter?.resolve(channel, senderId); + const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined; + + // Include agent config name in cache key so different agents aren't shared + const sessionId = agentConfigName + ? `${channel}:${senderId}:${agentConfigName}` + : `${channel}:${senderId}`; + + let entry = agents.get(sessionId); + if (!entry) { + const session = deps.sessionManager.getSession(channel, senderId); + + // Use agent config overrides where available, falling back to global config + const effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt; + const effectiveTier = agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default'; + const effectiveProvider = deps.config.models.default.provider; + + const delegationConfig: DelegationConfig = { + compaction: deps.config.agents.delegation.compaction ?? 'fast', + memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast', + classification: deps.config.agents.delegation.classification ?? 'fast', + tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast', + complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex', + }; + + // Clone the tool registry and replace shell tools with sandboxed versions if configured + let effectiveToolRegistry = deps.toolRegistry; + if (agentConfig?.sandbox && deps.sandboxManager && deps.config.sandbox.enabled) { + effectiveToolRegistry = deps.toolRegistry.clone(); + // Lazy sandbox: create the sandboxed tools with a deferred sandbox reference + // The sandbox is created on first use via SandboxManager.getOrCreate() + const sandboxSessionId = sessionId; + const sandboxManager = deps.sandboxManager; + + // Create a proxy sandbox that lazily initializes + const lazySandboxShell: Tool = { + name: 'shell.exec', + description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.', + inputSchema: { + type: 'object', + properties: { + command: { type: 'string', description: 'The shell command to execute' }, + cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, + timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' }, + }, + required: ['command'], + }, + execute: async (rawArgs: unknown) => { + const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); + const tool = createSandboxedShellTool(sandbox); + return tool.execute(rawArgs); + }, + }; + + const lazySandboxProcess: Tool = { + name: 'process.start', + description: 'Start a command in the background inside a sandboxed container.', + inputSchema: { + type: 'object', + properties: { + command: { type: 'string', description: 'The shell command to run in the background' }, + cwd: { type: 'string', description: 'Working directory inside the container (optional)' }, + }, + required: ['command'], + }, + execute: async (rawArgs: unknown) => { + const sandbox = await sandboxManager.getOrCreate(sandboxSessionId); + const tool = createSandboxedProcessStartTool(sandbox); + return tool.execute(rawArgs); + }, + }; + + effectiveToolRegistry.replace(lazySandboxShell); + effectiveToolRegistry.replace(lazySandboxProcess); + } + + // Create an attachment collector for this agent session + const collector = new OutboundAttachmentCollector(); + + // Clone the tool registry to register the media.send tool bound to this collector + effectiveToolRegistry = effectiveToolRegistry.clone(); + effectiveToolRegistry.register(createMediaSendTool(collector)); + + const orchestrator = new AgentOrchestrator({ + modelRouter: deps.modelRouter, + systemPrompt: effectiveSystemPrompt, + session, + toolRegistry: effectiveToolRegistry, + toolExecutor: deps.toolExecutor, + primaryTier: effectiveTier, + delegation: delegationConfig, + maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3, + compaction: deps.config.compaction.enabled ? { + thresholdPct: deps.config.compaction.threshold_pct, + keepTurns: deps.config.compaction.keep_turns, + summaryMaxTokens: deps.config.compaction.summary_max_tokens, + } : undefined, + modelName: deps.config.models.default.model, + contextWindow: deps.config.models.default.context_window, + memoryStore: deps.memoryStore, + toolPolicyContext: { + agent: effectiveTier, + provider: effectiveProvider, + }, + attachmentCollector: collector, + }); + entry = { orchestrator, collector }; + agents.set(sessionId, entry); + } + return entry; + } + + const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise): Promise => { + const { orchestrator: agent, collector } = getOrCreateAgent(msg.channel, msg.senderId); + + // Handle special commands + if (msg.metadata?.isCommand) { + if (msg.metadata.command === 'reset') { + agent.reset(); + return; + } + if (msg.metadata.command === 'compact') { + const result = await agent.compact(); + if (result && result.compactedCount > 0) { + await reply({ + text: `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`, + replyTo: msg.id, + }); + } else { + await reply({ + text: 'Nothing to compact.', + replyTo: msg.id, + }); + } + return; + } + if (msg.metadata.command === 'usage') { + const usage = agent.getUsage(); + const lines = [ + '**Token Usage**', + '', + `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, + ]; + + const delegationEntries = Object.entries(usage.delegation); + if (delegationEntries.length > 0) { + lines.push(''); + lines.push('Delegation:'); + for (const [tier, stats] of delegationEntries) { + lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); + } + } + + lines.push(''); + lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); + + if (usage.total.estimatedCost > 0) { + lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); + } + + await reply({ text: lines.join('\n'), replyTo: msg.id }); + return; + } + } + + try { + // Transcribe audio attachments before processing + let messageText = msg.text; + const audioAttachments = (msg.attachments ?? []).filter((a: Attachment) => isSupportedAudio(a)); + + if (audioAttachments.length > 0 && deps.audioConfig) { + for (const att of audioAttachments) { + const transcript = await transcribeAudio(att, deps.audioConfig); + messageText = `[Voice message]: ${transcript}\n\n${messageText}`; + } + } + + const response = await agent.process(messageText, msg.attachments); + const outboundAttachments = collector.drain(); + await reply({ + text: response, + replyTo: msg.id, + attachments: outboundAttachments.length > 0 ? outboundAttachments : undefined, + }); + } catch (error) { + console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); + await reply({ + text: 'Sorry, an error occurred while processing your message.', + replyTo: msg.id, + }); + } + }; + + return { handler, agents }; +}