import type { ModelRouter, ModelTier } from '../../models/router.js'; import type { ChatRequest, Message, TokenUsage } from '../../models/types.js'; import type { Session } from '../../session/index.js'; import type { ToolRegistry } from '../../tools/registry.js'; import type { ToolExecutor } from '../../tools/executor.js'; import type { MemoryStore } from '../../memory/store.js'; import type { ToolPolicyContext } from '../../tools/policy.js'; import type { Attachment } from '../../channels/types.js'; import { NativeAgent } from './agent.js'; import type { ToolUseEvent } from './agent.js'; import type { OutboundAttachmentCollector } from './attachments.js'; import { estimateMessageTokens, shouldCompact } from '../../context/tokens.js'; import { compactHistory, type CompactionConfig, type CompactionResult, DEFAULT_COMPACTION_CONFIG } from '../../context/compaction.js'; import { estimateCost } from '../../models/costs.js'; import { auditLogger } from '../../audit/index.js'; import { buildAdaptiveMemoryContext, buildRecentMemoryContext } from '../../memory/adaptive.js'; import { buildUserMessage } from '../../models/media.js'; // ── Public types ────────────────────────────────────────────────────── /** A single-turn, stateless request to a sub-agent at a specific tier. */ export interface SubAgentRequest { tier: ModelTier; systemPrompt: string; message: string; maxTokens?: number; /** When true, include tools from the toolRegistry in the request. */ tools?: boolean; } /** Result returned from a sub-agent delegation call. */ export interface SubAgentResult { content: string; usage: TokenUsage; tier: ModelTier; } /** Maps each delegation task to the model tier that should handle it. */ export interface DelegationConfig { compaction: ModelTier; memory_extraction: ModelTier; classification: ModelTier; tool_summarisation: ModelTier; complex_reasoning: ModelTier; } /** Per-tier cumulative usage statistics. */ interface TierUsageStats { inputTokens: number; outputTokens: number; calls: number; } /** Full usage stats for an orchestrator session. */ export interface UsageReport { /** Primary agent (user-facing) usage. */ primary: { inputTokens: number; outputTokens: number; calls: number; }; /** Delegation (sub-agent) usage, broken down by tier. */ delegation: Record; /** Combined totals. */ total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number; }; } /** Full configuration for the AgentOrchestrator. */ export interface OrchestratorConfig { modelRouter: ModelRouter; systemPrompt: string; session?: Session; toolRegistry?: ToolRegistry; toolExecutor?: ToolExecutor; maxIterations?: number; /** The tier used by the primary NativeAgent for user-facing conversation. */ primaryTier: ModelTier; /** Which tier to use for each delegation task type. */ delegation: DelegationConfig; /** Maximum nesting depth for delegation calls (safety guard). */ maxDelegationDepth: number; onToolUse?: (event: ToolUseEvent) => void; /** Context compaction settings. When provided, enables automatic compaction. */ compaction?: CompactionConfig; /** Model identifier for the primary model (used for context window lookup). */ modelName?: string; /** Optional override for the context window size (in tokens). */ contextWindow?: number; /** Optional memory store for injecting persistent memory into the system prompt. */ memoryStore?: MemoryStore; /** Enable/disable automatic memory extraction during compaction. */ memoryAutoExtract?: boolean; /** Strategy for memory prompt injection. */ memoryInjectionStrategy?: 'all' | 'recent' | 'adaptive'; /** Maximum tokens allowed for injected memory context. */ memoryMaxInjectionTokens?: number; /** Policy context for tool filtering (agent tier, provider). */ toolPolicyContext?: ToolPolicyContext; /** Collector for outbound attachments queued by tools (e.g. media.send). */ attachmentCollector?: OutboundAttachmentCollector; } // ── AgentOrchestrator ───────────────────────────────────────────────── /** * Wraps a primary NativeAgent and adds the ability to delegate * single-turn sub-tasks to different model tiers via the ModelRouter. * * The primary agent handles the main conversation loop (with tools), * while `delegate()` enables cheap, stateless calls for tasks like * compaction, classification, and memory extraction. */ export class AgentOrchestrator { private _agent: NativeAgent; private _modelRouter: ModelRouter; private _delegation: DelegationConfig; private _maxDelegationDepth: number; private _toolRegistry?: ToolRegistry; private _session?: Session; private _compactionConfig?: CompactionConfig; private _modelName?: string; private _contextWindow?: number; private _memoryStore?: MemoryStore; private _memoryAutoExtract: boolean; private _memoryInjectionStrategy: 'all' | 'recent' | 'adaptive'; private _memoryMaxInjectionTokens: number; private _systemPromptBase: string; private _usageByTier: Map = new Map(); constructor(config: OrchestratorConfig) { this._modelRouter = config.modelRouter; this._delegation = config.delegation; this._maxDelegationDepth = config.maxDelegationDepth; this._toolRegistry = config.toolRegistry; this._session = config.session; this._compactionConfig = config.compaction; this._modelName = config.modelName; this._contextWindow = config.contextWindow; this._memoryStore = config.memoryStore; this._memoryAutoExtract = config.memoryAutoExtract ?? true; this._memoryInjectionStrategy = config.memoryInjectionStrategy ?? 'all'; this._memoryMaxInjectionTokens = config.memoryMaxInjectionTokens ?? 2000; this._systemPromptBase = config.systemPrompt; // Create the primary NativeAgent for user-facing conversation this._agent = new NativeAgent({ modelClient: config.modelRouter, systemPrompt: config.systemPrompt, session: config.session, toolRegistry: config.toolRegistry, toolExecutor: config.toolExecutor, maxIterations: config.maxIterations, onToolUse: config.onToolUse, toolPolicyContext: config.toolPolicyContext, attachmentCollector: config.attachmentCollector, }); // Set the primary tier on the agent this._agent.setModelTier(config.primaryTier); } // ── Delegation ──────────────────────────────────────────────────── /** * Perform a single-turn, stateless call to a model at the specified tier. * * This is used for internal sub-tasks (compaction, classification, etc.) * that don't need the full conversation history or tool loop. * * If the requested tier is not available on the router, falls back to * the 'default' tier with a warning. */ async delegate(request: SubAgentRequest): Promise { let tier = request.tier; // Check if the requested tier is available; fall back to 'default' if not const client = this._modelRouter.getClient(tier); if (!client) { console.warn( `[Flynn:delegate] Tier '${tier}' not available, falling back to 'default'`, ); tier = 'default'; } // Build the single-turn chat request const messages: Message[] = [ { role: 'user', content: request.message }, ]; const chatRequest: ChatRequest = { messages, system: request.systemPrompt, maxTokens: request.maxTokens, }; // Optionally include tools from the registry (filtered by policy) if (request.tools && this._toolRegistry) { const policyContext = this._agent.getToolPolicyContext(); chatRequest.tools = this._toolRegistry.filteredToAnthropicFormat(policyContext); } const response = await this._modelRouter.chat(chatRequest, tier); // Track cumulative usage for this tier this._trackUsage(tier, response.usage); console.log( `[Flynn:delegate] tier=${tier} tokens=${response.usage.inputTokens}+${response.usage.outputTokens}`, ); return { content: response.content, usage: response.usage, tier, }; } // ── Primary agent proxies ───────────────────────────────────────── /** * Process a user message through the primary NativeAgent. * This is the main entry point for user-facing conversation. * * When compaction is configured, checks whether the conversation history * exceeds the context window threshold and compacts it before processing. */ async process(userMessage: string, attachments?: Attachment[]): Promise { this._injectMemoryContext(userMessage); await this.compactIfNeeded(); // Snapshot history so we can rollback if the underlying tool loop returns an error message. // This avoids persisting low-level provider errors to the user-visible conversation state. const before = this.getHistory(); const result = await this._agent.process(userMessage, attachments); // NativeAgent currently converts tool-loop exceptions into a user-visible error string. // Intercept a few common cases here to self-heal (context overflow) and/or degrade gracefully. if (this._isToolLoopErrorMessage(result)) { // Roll back the user message + error message inserted by the agent. this._restoreHistory(before); const underlying = this._stripToolLoopErrorPrefix(result); const ctx = this._extractContextWindowFromError(underlying); if (ctx) { // Attempt: compact + hard-trim to fit the discovered context window, then retry once. await this._compactAndTrimToFit(ctx); const retry = await this._agent.process(userMessage, attachments); if (!this._isToolLoopErrorMessage(retry)) { return retry; } // If we still failed, roll back again so we don't persist the error string. this._restoreHistory(before); } // Persist a short, user-friendly failure message (without provider internals). const friendly = [ 'I ran into a model/provider error while processing that message.', '', 'Try again. If it keeps happening:', '1. Run `/compact` or `/reset` to shrink the conversation context.', '2. Switch to a different model tier (e.g. `/model local`).', ].join('\n'); // Re-add the user message so the conversation state matches what the user sent. this._appendUserAndAssistant(userMessage, attachments, friendly); return friendly; } return result; } /** * Force-compact the current conversation history regardless of threshold. * Returns the compaction result, or null if there was nothing to compact * (e.g. no session, too few messages). */ async compact(): Promise { const config = this._compactionConfig ?? DEFAULT_COMPACTION_CONFIG; const messages = this.getHistory(); if (messages.length === 0) { return null; } const result = await compactHistory({ messages, orchestrator: this, config, memoryStore: this._memoryStore, autoExtract: this._memoryAutoExtract, }); // If nothing was actually compacted, skip the replace if (result.compactedCount === 0) { return result; } // Persist the compacted history if (this._session) { this._session.replaceHistory(result.messages); } console.log( `[Flynn:compact] Compacted ${result.compactedCount} messages: ` + `${result.tokensBefore} → ${result.tokensAfter} tokens`, ); if (this._session) { auditLogger?.sessionCompact({ session_id: this._session.id, messages_before: messages.length, messages_after: result.messages.length, tokens_before: result.tokensBefore, tokens_after: result.tokensAfter, }); } return result; } /** Reset the primary agent's conversation history and usage stats. */ reset(): void { this._agent.reset(); this._usageByTier.clear(); } /** Get the primary agent's conversation history. */ getHistory(): Message[] { return this._agent.getHistory(); } /** Set the model tier on the primary agent. */ setModelTier(tier: ModelTier): void { this._agent.setModelTier(tier); } /** Get the current model tier of the primary agent. */ getModelTier(): ModelTier { return this._agent.getModelTier(); } /** Set the tool-use callback on the primary agent. */ setOnToolUse(callback: ((event: ToolUseEvent) => void) | undefined): void { this._agent.setOnToolUse(callback); } /** Request cancellation for the current primary-agent operation. */ cancel(): void { this._agent.cancel(); } /** Whether the primary agent currently has an in-flight operation. */ isCancellable(): boolean { return this._agent.isCancellable(); } // ── Usage & config accessors ────────────────────────────────────── /** * Returns cumulative delegation usage stats per tier. * Useful for cost tracking and visibility into sub-agent calls. */ getDelegationUsage(): Record { const result: Record = {}; for (const [tier, stats] of this._usageByTier) { result[tier] = { ...stats }; } return result; } /** * Returns comprehensive usage stats combining primary agent and delegation usage. * Includes estimated cost based on the primary model's pricing. */ getUsage(): UsageReport { const primary = this._agent.getUsage(); const delegation = this.getDelegationUsage(); let totalInput = primary.inputTokens; let totalOutput = primary.outputTokens; let totalCalls = primary.calls; for (const stats of Object.values(delegation)) { totalInput += stats.inputTokens; totalOutput += stats.outputTokens; totalCalls += stats.calls; } return { primary, delegation, total: { inputTokens: totalInput, outputTokens: totalOutput, calls: totalCalls, estimatedCost: estimateCost(totalInput, totalOutput, this._modelName), }, }; } /** * Look up which model tier is configured for a given delegation task. * Convenience method so callers don't need to access the config directly. */ getDelegationTier(task: keyof DelegationConfig): ModelTier { return this._delegation[task]; } // ── Private helpers ─────────────────────────────────────────────── /** * Inject persistent memory context into the primary agent's system prompt. * Reads from the memory store and appends relevant context to the base * system prompt. If no memory store is configured or no memory content * exists, restores the original base prompt. */ private _injectMemoryContext(userMessage: string): void { if (!this._memoryStore) { return; } let memoryContext = ''; try { if (this._memoryInjectionStrategy === 'recent') { memoryContext = buildRecentMemoryContext(this._memoryStore, this._memoryMaxInjectionTokens); } else if (this._memoryInjectionStrategy === 'adaptive') { memoryContext = buildAdaptiveMemoryContext({ store: this._memoryStore, userMessage, recentMessages: this.getHistory(), config: { maxTokens: this._memoryMaxInjectionTokens, }, }); } else { memoryContext = this._memoryStore.getContextForPrompt(); } } catch (error) { console.warn('[Flynn:memory] Adaptive memory injection failed, falling back to default context:', error); memoryContext = this._memoryStore.getContextForPrompt(); } memoryContext = this._clipMemoryContext(memoryContext); if (!memoryContext) { this._agent.setSystemPrompt(this._systemPromptBase); return; } const enrichedPrompt = `${this._systemPromptBase}\n\n# Memory Context\n\nThe following is your persistent memory. Use it to maintain continuity across sessions.\n\n${memoryContext}`; this._agent.setSystemPrompt(enrichedPrompt); } private _clipMemoryContext(context: string): string { if (!context) { return context; } const maxChars = this._memoryMaxInjectionTokens * 4; if (context.length <= maxChars) { return context; } return context.slice(0, maxChars); } /** * Check whether automatic compaction should run, and if so, compact. * Called before each `process()` call when compaction is configured. */ private async compactIfNeeded(): Promise { if (!this._compactionConfig) {return;} const messages = this.getHistory(); if (messages.length === 0) {return;} const model = this._modelName ?? 'unknown'; const needs = shouldCompact({ messages, model, contextWindow: this._contextWindow, thresholdPct: this._compactionConfig.thresholdPct, }); if (!needs) {return;} await this.compact(); } private _isToolLoopErrorMessage(text: string): boolean { return text.startsWith('Error in tool loop (iteration '); } private _stripToolLoopErrorPrefix(text: string): string { const m = text.match(/^Error in tool loop \(iteration \d+\):\s*(.*)$/s); return m ? m[1] : text; } private _restoreHistory(messages: Message[]): void { if (this._session) { this._session.replaceHistory(messages); } // No session available; nothing safe to do here. } private _appendUserAndAssistant(userMessage: string, attachments: Attachment[] | undefined, assistantText: string): void { if (!this._session) { return; } const userMsg = buildUserMessage(userMessage, attachments); this._session.addMessage(userMsg); this._session.addMessage({ role: 'assistant', content: assistantText }); } private _extractContextWindowFromError(errorText: string): number | undefined { // Try a few common patterns and pick the smallest plausible context window. // Example llama.cpp error: // exceeds the available context size (4096 tokens) ... "nctx":4096 const candidates: number[] = []; const jsonNctx = errorText.match(/"nctx"\s*:\s*(\d{3,7})/); if (jsonNctx) { candidates.push(Number(jsonNctx[1])); } const paren = errorText.match(/context size \((\d{3,7}) tokens\)/); if (paren) { candidates.push(Number(paren[1])); } const maxContext = errorText.match(/maximum context length is (\d{3,7}) tokens/i); if (maxContext) { candidates.push(Number(maxContext[1])); } const valid = candidates.filter(n => Number.isFinite(n) && n >= 256); if (valid.length === 0) { return undefined; } return Math.min(...valid); } private async _compactAndTrimToFit(contextWindow: number): Promise { // Compaction is best-effort; if it fails (e.g., providers down), fall back to a hard trim. try { await this.compact(); } catch (error) { console.warn('[Flynn:compact] Emergency compaction failed:', error); } if (!this._session) { return; } const threshold = Math.floor((this._compactionConfig?.thresholdPct ?? 80) / 100 * contextWindow); let messages = this.getHistory(); let estimated = estimateMessageTokens(messages); // Drop oldest messages until we're under budget. // This is intentionally blunt; it only triggers after a real provider context overflow. while (messages.length > 1 && estimated > threshold) { messages = messages.slice(1); estimated = estimateMessageTokens(messages); } this._session.replaceHistory(messages); } /** Accumulate usage stats for a given tier. */ private _trackUsage(tier: ModelTier, usage: TokenUsage): void { const existing = this._usageByTier.get(tier); if (existing) { existing.inputTokens += usage.inputTokens; existing.outputTokens += usage.outputTokens; existing.calls += 1; } else { this._usageByTier.set(tier, { inputTokens: usage.inputTokens, outputTokens: usage.outputTokens, calls: 1, }); } } }