Files
flynn/src/daemon/routing.ts
T
William Valentin 148219153e feat(audio): add tests, token estimation, and config override for native audio
- Add capabilities.test.ts (18 tests) for supportsAudioInput()
- Add 15 audio tests to media.test.ts (hasAudio, stripAudioParts, attachmentToAudioSource)
- Add estimateAudioTokens() to tokens.ts (base64→bytes→duration→tokens)
- Update estimateMessageTokens() to include audio content parts
- Add 5 audio token tests to tokens.test.ts
- Add supports_audio config override to model schema
- Wire supports_audio from tier config through routing to capability check

Total tests: 1369 (was 1331, +38 audio-related)
2026-02-11 18:27:19 -08:00

283 lines
13 KiB
TypeScript

import type { AudioTranscriptionConfig } from '../models/media.js';
import type { Attachment } from '../channels/types.js';
import { isSupportedAudio, transcribeAudio } from '../models/media.js';
import { supportsAudioInput } from '../models/capabilities.js';
import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js';
import { OutboundAttachmentCollector } from '../backends/native/attachments.js';
import type { InboundMessage, OutboundMessage } from '../channels/index.js';
import { MemoryStore } from '../memory/index.js';
import type { Tool } from '../tools/types.js';
import { createMediaSendTool } from '../tools/index.js';
import { createSandboxedShellTool, createSandboxedProcessStartTool, SandboxManager } from '../sandbox/index.js';
import type { Config } from '../config/index.js';
import { ModelRouter, type ModelTier } from '../models/index.js';
import { ToolRegistry, ToolExecutor } from '../tools/index.js';
import { SessionManager } from '../session/index.js';
import { AgentConfigRegistry, AgentRouter } from '../agents/index.js';
/**
* Create the unified message handler for the channel registry.
* Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session.
* The orchestrator wraps a NativeAgent and adds delegation to different model tiers.
*
* Returns both the message handler function and the agents map for usage tracking.
*/
export function createMessageRouter(deps: {
sessionManager: SessionManager;
modelRouter: ModelRouter;
systemPrompt: string;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
config: Config;
memoryStore?: MemoryStore;
agentConfigRegistry?: AgentConfigRegistry;
agentRouter?: AgentRouter;
sandboxManager?: SandboxManager;
}): {
handler: (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>) => Promise<void>;
agents: Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }>;
} {
// Cache agents by session ID + agent config name to avoid recreating on every message
const agents = new Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }>();
function getOrCreateAgent(channel: string, senderId: string, metadata?: Record<string, unknown>): { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector } {
// Resolve agent config name via routing (sender → channel → default fallback)
const agentConfigName = deps.agentRouter?.resolve(channel, senderId);
const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined;
// Cron job tier wins over agent config tier
const tierFromMetadata = metadata?.modelTier as ModelTier | undefined;
// Include agent config name in cache key so different agents aren't shared
const baseSid = agentConfigName
? `${channel}:${senderId}:${agentConfigName}`
: `${channel}:${senderId}`;
const sessionId = tierFromMetadata ? `${baseSid}:${tierFromMetadata}` : baseSid;
let entry = agents.get(sessionId);
if (!entry) {
const session = deps.sessionManager.getSession(channel, senderId);
// Use agent config overrides where available, falling back to global config
const effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt;
const effectiveTier = tierFromMetadata ?? agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default';
const effectiveProvider = deps.config.models.default.provider;
const delegationConfig: DelegationConfig = {
compaction: deps.config.agents.delegation.compaction ?? 'fast',
memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast',
classification: deps.config.agents.delegation.classification ?? 'fast',
tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast',
complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex',
};
// Clone the tool registry and replace shell tools with sandboxed versions if configured
let effectiveToolRegistry = deps.toolRegistry;
if (agentConfig?.sandbox && deps.sandboxManager && deps.config.sandbox.enabled) {
effectiveToolRegistry = deps.toolRegistry.clone();
// Lazy sandbox: create the sandboxed tools with a deferred sandbox reference
// The sandbox is created on first use via SandboxManager.getOrCreate()
const sandboxSessionId = sessionId;
const sandboxManager = deps.sandboxManager;
// Create a proxy sandbox that lazily initializes
const lazySandboxShell: Tool = {
name: 'shell.exec',
description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.',
inputSchema: {
type: 'object',
properties: {
command: { type: 'string', description: 'The shell command to execute' },
cwd: { type: 'string', description: 'Working directory inside the container (optional)' },
timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' },
},
required: ['command'],
},
execute: async (rawArgs: unknown) => {
const sandbox = await sandboxManager.getOrCreate(sandboxSessionId);
const tool = createSandboxedShellTool(sandbox);
return tool.execute(rawArgs);
},
};
const lazySandboxProcess: Tool = {
name: 'process.start',
description: 'Start a command in the background inside a sandboxed container.',
inputSchema: {
type: 'object',
properties: {
command: { type: 'string', description: 'The shell command to run in the background' },
cwd: { type: 'string', description: 'Working directory inside the container (optional)' },
},
required: ['command'],
},
execute: async (rawArgs: unknown) => {
const sandbox = await sandboxManager.getOrCreate(sandboxSessionId);
const tool = createSandboxedProcessStartTool(sandbox);
return tool.execute(rawArgs);
},
};
effectiveToolRegistry.replace(lazySandboxShell);
effectiveToolRegistry.replace(lazySandboxProcess);
}
// Create an attachment collector for this agent session
const collector = new OutboundAttachmentCollector();
// Clone the tool registry to register the media.send tool bound to this collector
effectiveToolRegistry = effectiveToolRegistry.clone();
effectiveToolRegistry.register(createMediaSendTool(collector));
const orchestrator = new AgentOrchestrator({
modelRouter: deps.modelRouter,
systemPrompt: effectiveSystemPrompt,
session,
toolRegistry: effectiveToolRegistry,
toolExecutor: deps.toolExecutor,
primaryTier: effectiveTier,
delegation: delegationConfig,
maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3,
maxIterations: deps.config.agents.max_iterations,
compaction: deps.config.compaction.enabled ? {
thresholdPct: deps.config.compaction.threshold_pct,
keepTurns: deps.config.compaction.keep_turns,
summaryMaxTokens: deps.config.compaction.summary_max_tokens,
} : undefined,
modelName: deps.config.models.default.model,
contextWindow: deps.config.models.default.context_window,
memoryStore: deps.memoryStore,
toolPolicyContext: {
agent: effectiveTier,
provider: effectiveProvider,
},
attachmentCollector: collector,
});
entry = { orchestrator, collector };
agents.set(sessionId, entry);
}
return entry;
}
const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>): Promise<void> => {
const { orchestrator: agent, collector } = getOrCreateAgent(msg.channel, msg.senderId, msg.metadata);
// Handle special commands
if (msg.metadata?.isCommand) {
if (msg.metadata.command === 'reset') {
agent.reset();
return;
}
if (msg.metadata.command === 'compact') {
const result = await agent.compact();
if (result && result.compactedCount > 0) {
await reply({
text: `Compacted ${result.compactedCount} messages: ${result.tokensBefore}${result.tokensAfter} tokens`,
replyTo: msg.id,
});
} else {
await reply({
text: 'Nothing to compact.',
replyTo: msg.id,
});
}
return;
}
if (msg.metadata.command === 'usage') {
const usage = agent.getUsage();
const lines = [
'**Token Usage**',
'',
`Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`,
];
const delegationEntries = Object.entries(usage.delegation);
if (delegationEntries.length > 0) {
lines.push('');
lines.push('Delegation:');
for (const [tier, stats] of delegationEntries) {
lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`);
}
}
lines.push('');
lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`);
if (usage.total.estimatedCost > 0) {
lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`);
}
await reply({ text: lines.join('\n'), replyTo: msg.id });
return;
}
}
try {
// Determine if the active model supports native audio input
let effectiveTier: string = deps.config.agents.primary_tier ?? 'default';
if (msg.metadata?.modelTier) {
effectiveTier = msg.metadata.modelTier as string;
} else if (deps.agentRouter && deps.agentConfigRegistry) {
const agentName = deps.agentRouter.resolve(msg.channel, msg.senderId);
if (agentName) {
const agentCfg = deps.agentConfigRegistry.get(agentName);
if (agentCfg?.modelTier) {
effectiveTier = agentCfg.modelTier;
}
}
}
// Look up provider/model for the effective tier
const modelsConfig = deps.config.models as Record<string, { provider?: string; model?: string } | undefined>;
const tierConfig = modelsConfig[effectiveTier] ?? deps.config.models.default;
const modelProvider = tierConfig?.provider ?? deps.config.models.default.provider;
const modelName = tierConfig?.model ?? deps.config.models.default.model;
const supportsAudioOverride = (tierConfig as Record<string, unknown> | undefined)?.supports_audio as boolean | undefined;
const nativeAudioSupported = supportsAudioInput(modelProvider, modelName, supportsAudioOverride);
let messageText = msg.text;
let attachments = msg.attachments;
const audioAttachments = (msg.attachments ?? []).filter((a: Attachment) => isSupportedAudio(a));
if (audioAttachments.length > 0 && !nativeAudioSupported) {
// Model doesn't support native audio — transcribe via Whisper and strip audio attachments
const audioConfig: AudioTranscriptionConfig | undefined = deps.config.audio?.enabled && deps.config.audio.provider
? {
endpoint: deps.config.audio.provider.endpoint,
apiKey: deps.config.audio.provider.api_key,
model: deps.config.audio.provider.model,
}
: undefined;
if (audioConfig?.endpoint) {
for (const att of audioAttachments) {
const transcript = await transcribeAudio(att, audioConfig);
messageText = `[Voice message]: ${transcript}\n\n${messageText}`;
}
}
// Remove audio attachments so buildUserMessage doesn't create audio content parts
attachments = (msg.attachments ?? []).filter((a: Attachment) => !isSupportedAudio(a));
if (attachments.length === 0) { attachments = undefined; }
}
// If native audio IS supported, we pass attachments through unchanged —
// buildUserMessage() in the agent will create native audio content parts
const response = await agent.process(messageText, attachments);
const outboundAttachments = collector.drain();
await reply({
text: response,
replyTo: msg.id,
attachments: outboundAttachments.length > 0 ? outboundAttachments : undefined,
});
} catch (error) {
console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error);
await reply({
text: 'Sorry, an error occurred while processing your message.',
replyTo: msg.id,
});
}
};
return { handler, agents };
}