feat(compaction): add proactive context budget and checkpointing

This commit is contained in:
William Valentin
2026-02-16 15:44:00 -08:00
parent 65efda3533
commit 8758ea8f1c
7 changed files with 478 additions and 1 deletions
+171
View File
@@ -871,4 +871,175 @@ describe('AgentOrchestrator', () => {
expect(orchestrator.getModelTier()).toBe('default');
});
});
describe('context budget and proactive maintenance', () => {
it('reports context budget with estimated usage and threshold', async () => {
const orchestrator = new AgentOrchestrator({
modelRouter: mockRouter,
systemPrompt: 'You are helpful.',
primaryTier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'default',
classification: 'complex',
tool_summarisation: 'default',
complex_reasoning: 'complex',
},
maxDelegationDepth: 10,
contextWindow: 1000,
compaction: {
thresholdPct: 80,
keepTurns: 2,
summaryMaxTokens: 256,
importanceThreshold: 1,
},
});
await orchestrator.process('hello world');
const budget = orchestrator.getContextBudget();
expect(budget.contextWindow).toBe(1000);
expect(budget.thresholdPct).toBe(80);
expect(budget.thresholdTokens).toBe(800);
expect(budget.estimatedTokens).toBeGreaterThan(0);
expect(budget.remainingTokens).toBeLessThanOrEqual(1000);
});
it('creates proactive checkpoint alerts and writes checkpoint to memory', async () => {
const tempDir = mkdtempSync(join(tmpdir(), 'flynn-orchestrator-checkpoint-'));
const memoryStore = new MemoryStore({ dir: tempDir, maxContextTokens: 2000 });
const large = 'x'.repeat(2400);
const history: Message[] = [
{ role: 'user', content: large },
{ role: 'assistant', content: large },
];
const session: Session = {
id: 'ws:context-test',
addMessage: vi.fn((m: Message) => { history.push(m); }),
getHistory: vi.fn(() => [...history]),
clear: vi.fn(() => { history.length = 0; }),
replaceHistory: vi.fn((msgs: Message[]) => {
history.length = 0;
history.push(...msgs);
}),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const orchestrator = new AgentOrchestrator({
modelRouter: mockRouter,
systemPrompt: 'You are helpful.',
session,
primaryTier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'default',
classification: 'complex',
tool_summarisation: 'default',
complex_reasoning: 'complex',
},
maxDelegationDepth: 10,
memoryStore,
contextWindow: 2000,
compaction: {
thresholdPct: 95,
keepTurns: 1,
summaryMaxTokens: 256,
importanceThreshold: 1,
proactive: {
enabled: true,
warnPct: 40,
checkpointPct: 50,
autoCompactPct: 95,
checkpointCooldownMs: 1000,
memoryNamespace: 'session/checkpoints',
},
},
});
await orchestrator.process('ping');
const alert = orchestrator.consumeContextAlert();
expect(alert?.level).toBe('checkpoint');
expect(alert?.actions.checkpointSaved).toBe(true);
expect(alert?.actions.checkpointNamespace).toContain('session/checkpoints');
expect(orchestrator.consumeContextAlert()).toBeUndefined();
const stored = memoryStore.read('session/checkpoints/ws/context-test');
expect(stored.length).toBeGreaterThan(0);
rmSync(tempDir, { recursive: true, force: true });
});
it('auto-compacts proactively at critical threshold and emits alert', async () => {
const compactClient: ModelClient = {
chat: vi.fn().mockResolvedValue({
content: 'summary',
stopReason: 'end_turn',
usage: { inputTokens: 8, outputTokens: 4 },
}),
};
const compactRouter = new ModelRouter({
default: compactClient,
fast: compactClient,
fallbackChain: [],
});
const large = 'y'.repeat(2400);
const history: Message[] = [
{ role: 'user', content: large },
{ role: 'assistant', content: large },
{ role: 'user', content: large },
{ role: 'assistant', content: large },
];
const session: Session = {
id: 'ws:auto-compact',
addMessage: vi.fn((m: Message) => { history.push(m); }),
getHistory: vi.fn(() => [...history]),
clear: vi.fn(() => { history.length = 0; }),
replaceHistory: vi.fn((msgs: Message[]) => {
history.length = 0;
history.push(...msgs);
}),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const orchestrator = new AgentOrchestrator({
modelRouter: compactRouter,
systemPrompt: 'You are helpful.',
session,
primaryTier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'complex',
tool_summarisation: 'default',
complex_reasoning: 'complex',
},
maxDelegationDepth: 10,
contextWindow: 2000,
compaction: {
thresholdPct: 99,
keepTurns: 1,
summaryMaxTokens: 128,
importanceThreshold: 1,
proactive: {
enabled: true,
warnPct: 70,
checkpointPct: 80,
autoCompactPct: 90,
checkpointCooldownMs: 1000,
memoryNamespace: 'session/checkpoints',
},
},
});
await orchestrator.process('continue');
const alert = orchestrator.consumeContextAlert();
expect(alert?.actions.autoCompacted).toBe(true);
expect(history.length).toBeLessThan(6);
});
});
});
+225 -1
View File
@@ -9,12 +9,13 @@ import type { Attachment } from '../../channels/types.js';
import { NativeAgent } from './agent.js';
import type { ToolUseEvent } from './agent.js';
import type { OutboundAttachmentCollector } from './attachments.js';
import { estimateMessageTokens, shouldCompact } from '../../context/tokens.js';
import { estimateMessageTokens, getContextWindow, shouldCompact } from '../../context/tokens.js';
import { compactHistory, type CompactionConfig, type CompactionResult, DEFAULT_COMPACTION_CONFIG } from '../../context/compaction.js';
import { estimateCost } from '../../models/costs.js';
import { auditLogger } from '../../audit/index.js';
import { buildAdaptiveMemoryContext, buildRecentMemoryContext } from '../../memory/adaptive.js';
import { buildUserMessage } from '../../models/media.js';
import { CONTEXT_CHECKPOINT_PROMPT } from './prompts.js';
// ── Public types ──────────────────────────────────────────────────────
@@ -70,6 +71,29 @@ export interface UsageReport {
};
}
export type ContextAlertLevel = 'warning' | 'checkpoint' | 'critical';
export interface ContextBudget {
estimatedTokens: number;
contextWindow: number;
remainingTokens: number;
usagePct: number;
thresholdPct: number;
thresholdTokens: number;
shouldCompact: boolean;
}
export interface ContextAlert {
level: ContextAlertLevel;
message: string;
budget: ContextBudget;
actions: {
checkpointSaved: boolean;
autoCompacted: boolean;
checkpointNamespace?: string;
};
}
/** Full configuration for the AgentOrchestrator. */
export interface OrchestratorConfig {
modelRouter: ModelRouter;
@@ -131,6 +155,9 @@ export class AgentOrchestrator {
private _memoryMaxInjectionTokens: number;
private _systemPromptBase: string;
private _usageByTier: Map<string, TierUsageStats> = new Map();
private _lastContextAlertLevel: ContextAlertLevel | null = null;
private _pendingContextAlert?: ContextAlert;
private _lastCheckpointAt = 0;
constructor(config: OrchestratorConfig) {
this._modelRouter = config.modelRouter;
@@ -231,6 +258,7 @@ export class AgentOrchestrator {
*/
async process(userMessage: string, attachments?: Attachment[]): Promise<string> {
this._injectMemoryContext(userMessage);
await this._runProactiveContextMaintenance();
await this.compactIfNeeded();
// Snapshot history so we can rollback if the underlying tool loop returns an error message.
@@ -329,6 +357,9 @@ export class AgentOrchestrator {
reset(): void {
this._agent.reset();
this._usageByTier.clear();
this._lastContextAlertLevel = null;
this._pendingContextAlert = undefined;
this._lastCheckpointAt = 0;
}
/** Get the primary agent's conversation history. */
@@ -405,6 +436,36 @@ export class AgentOrchestrator {
};
}
/**
* Returns current context usage budget for this session.
* Uses estimator-based history token counting and configured context window.
*/
getContextBudget(): ContextBudget {
const messages = this.getHistory();
const model = this._modelName ?? 'unknown';
const thresholdPct = this._compactionConfig?.thresholdPct ?? DEFAULT_COMPACTION_CONFIG.thresholdPct;
const window = getContextWindow(model, this._contextWindow);
const estimated = estimateMessageTokens(messages);
const thresholdTokens = Math.floor((thresholdPct / 100) * window);
const usagePct = window > 0 ? Math.min(100, (estimated / window) * 100) : 0;
return {
estimatedTokens: estimated,
contextWindow: window,
remainingTokens: Math.max(0, window - estimated),
usagePct,
thresholdPct,
thresholdTokens,
shouldCompact: estimated > thresholdTokens,
};
}
/** Returns and clears the pending proactive context alert, if any. */
consumeContextAlert(): ContextAlert | undefined {
const alert = this._pendingContextAlert;
this._pendingContextAlert = undefined;
return alert;
}
/**
* Look up which model tier is configured for a given delegation task.
* Convenience method so callers don't need to access the config directly.
@@ -492,6 +553,169 @@ export class AgentOrchestrator {
await this.compact();
}
private async _runProactiveContextMaintenance(): Promise<void> {
const proactive = this._compactionConfig?.proactive;
if (!proactive?.enabled) {
return;
}
let budget = this.getContextBudget();
let level = this._resolveContextAlertLevel(budget.usagePct);
const levelBeforeMaintenance = level;
let checkpointSaved = false;
let autoCompacted = false;
let checkpointNamespace: string | undefined;
if (level !== null && this._shouldCheckpoint(level)) {
const checkpointResult = await this._writeProactiveCheckpoint();
checkpointSaved = checkpointResult.saved;
checkpointNamespace = checkpointResult.namespace;
}
if (level === 'critical') {
try {
const result = await this.compact();
autoCompacted = Boolean(result && result.compactedCount > 0);
} catch (error) {
console.warn('[Flynn:compact] Proactive auto-compaction failed:', error);
}
budget = this.getContextBudget();
level = this._resolveContextAlertLevel(budget.usagePct);
}
if (!level && (autoCompacted || checkpointSaved)) {
level = levelBeforeMaintenance ?? 'warning';
}
if (!level) {
this._lastContextAlertLevel = null;
return;
}
const shouldEmit = (
this._lastContextAlertLevel !== level ||
checkpointSaved ||
autoCompacted
);
this._lastContextAlertLevel = level;
if (!shouldEmit) {
return;
}
const pct = budget.usagePct.toFixed(1);
const message = [
`Context usage is ${pct}% (${budget.estimatedTokens.toLocaleString()}/${budget.contextWindow.toLocaleString()} estimated tokens).`,
checkpointSaved && checkpointNamespace ? `Checkpoint saved to memory namespace \`${checkpointNamespace}\`.` : '',
autoCompacted ? 'Auto-compaction ran proactively to preserve headroom.' : '',
].filter(Boolean).join(' ');
this._pendingContextAlert = {
level,
message,
budget,
actions: {
checkpointSaved,
autoCompacted,
checkpointNamespace,
},
};
}
private _resolveContextAlertLevel(usagePct: number): ContextAlertLevel | null {
const proactive = this._compactionConfig?.proactive ?? DEFAULT_COMPACTION_CONFIG.proactive;
if (!proactive) {
return null;
}
const warnPct = proactive.warnPct;
const checkpointPct = Math.max(proactive.checkpointPct, warnPct);
const autoCompactPct = Math.max(proactive.autoCompactPct, checkpointPct);
if (usagePct >= autoCompactPct) {
return 'critical';
}
if (usagePct >= checkpointPct) {
return 'checkpoint';
}
if (usagePct >= warnPct) {
return 'warning';
}
return null;
}
private _shouldCheckpoint(level: ContextAlertLevel): boolean {
if (level === 'warning') {
return false;
}
const proactive = this._compactionConfig?.proactive ?? DEFAULT_COMPACTION_CONFIG.proactive;
if (!proactive) {
return false;
}
const now = Date.now();
if (now - this._lastCheckpointAt < proactive.checkpointCooldownMs) {
return false;
}
return true;
}
private async _writeProactiveCheckpoint(): Promise<{ saved: boolean; namespace?: string }> {
if (!this._memoryStore || !this._session) {
return { saved: false };
}
const messages = this.getHistory();
if (messages.length === 0) {
return { saved: false };
}
const conversation = this._formatCheckpointConversation(messages, 20_000);
if (!conversation.trim()) {
return { saved: false };
}
try {
const tier = this.getDelegationTier('compaction');
const result = await this.delegate({
tier,
systemPrompt: CONTEXT_CHECKPOINT_PROMPT,
message: conversation,
maxTokens: Math.min(this._compactionConfig?.summaryMaxTokens ?? 512, 1024),
});
const summary = result.content.trim();
if (!summary) {
return { saved: false };
}
const namespaceBase = this._compactionConfig?.proactive?.memoryNamespace
?? DEFAULT_COMPACTION_CONFIG.proactive?.memoryNamespace
?? 'session/checkpoints';
const namespace = `${namespaceBase}/${this._sanitizeSessionId(this._session.id)}`;
const block = `## ${new Date().toISOString()}\n\n${summary}\n\n`;
this._memoryStore.write(namespace, block, 'append');
this._lastCheckpointAt = Date.now();
return { saved: true, namespace };
} catch (error) {
console.warn('[Flynn:memory] Proactive checkpoint write failed:', error);
return { saved: false };
}
}
private _formatCheckpointConversation(messages: Message[], maxChars: number): string {
const text = messages
.map((msg) => `${msg.role}: ${typeof msg.content === 'string' ? msg.content : '[multimodal content]'}`)
.join('\n\n');
if (text.length <= maxChars) {
return text;
}
return text.slice(text.length - maxChars);
}
private _sanitizeSessionId(sessionId: string): string {
return sessionId
.replace(/:/g, '/')
.replace(/[^a-zA-Z0-9/_-]+/g, '_')
.replace(/_+/g, '_');
}
private _isToolLoopErrorMessage(text: string): boolean {
return text.startsWith('Error in tool loop (iteration ');
}
+21
View File
@@ -92,3 +92,24 @@ Rules:
Output format:
Return the summarised output directly. No preamble or meta-commentary.`;
/**
* Instructs a model to create a compact continuity checkpoint before context pressure.
* The checkpoint is appended to memory so the session can recover after compaction.
*/
export const CONTEXT_CHECKPOINT_PROMPT = `You are generating a continuity checkpoint for an ongoing conversation.
Focus on:
- durable facts and decisions
- explicit user preferences
- active work state (what is done, what is pending, blockers)
- concrete next steps
Rules:
- Use concise bullets with short headings.
- Preserve concrete values, file paths, IDs, and error text verbatim when present.
- Do not include sensitive secrets.
- Do not invent facts not present in the conversation.
Output format:
Return markdown only (no preamble).`;
+20
View File
@@ -1169,6 +1169,12 @@ describe('configSchema — compaction importance threshold', () => {
it('defaults compaction importance threshold to disabled behavior', () => {
const result = configSchema.parse(minimalConfig);
expect(result.compaction.importance_threshold).toBe(1);
expect(result.compaction.proactive.enabled).toBe(false);
expect(result.compaction.proactive.warn_pct).toBe(75);
expect(result.compaction.proactive.checkpoint_pct).toBe(85);
expect(result.compaction.proactive.auto_compact_pct).toBe(95);
expect(result.compaction.proactive.checkpoint_cooldown_ms).toBe(300000);
expect(result.compaction.proactive.memory_namespace).toBe('session/checkpoints');
});
it('accepts a custom importance threshold', () => {
@@ -1176,9 +1182,23 @@ describe('configSchema — compaction importance threshold', () => {
...minimalConfig,
compaction: {
importance_threshold: 0.5,
proactive: {
enabled: true,
warn_pct: 70,
checkpoint_pct: 82,
auto_compact_pct: 93,
checkpoint_cooldown_ms: 120000,
memory_namespace: 'notes/checkpoints',
},
},
});
expect(result.compaction.importance_threshold).toBe(0.5);
expect(result.compaction.proactive.enabled).toBe(true);
expect(result.compaction.proactive.warn_pct).toBe(70);
expect(result.compaction.proactive.checkpoint_pct).toBe(82);
expect(result.compaction.proactive.auto_compact_pct).toBe(93);
expect(result.compaction.proactive.checkpoint_cooldown_ms).toBe(120000);
expect(result.compaction.proactive.memory_namespace).toBe('notes/checkpoints');
});
});
+8
View File
@@ -486,6 +486,14 @@ const compactionSchema = z.object({
keep_turns: z.number().min(1).max(50).default(4),
summary_max_tokens: z.number().min(128).max(4096).default(1024),
importance_threshold: z.number().min(0).max(1).default(1),
proactive: z.object({
enabled: z.boolean().default(false),
warn_pct: z.number().min(10).max(100).default(75),
checkpoint_pct: z.number().min(10).max(100).default(85),
auto_compact_pct: z.number().min(10).max(100).default(95),
checkpoint_cooldown_ms: z.number().min(1000).max(86_400_000).default(300_000),
memory_namespace: z.string().default('session/checkpoints'),
}).default({}),
}).default({});
const discordSchema = z.object({
+25
View File
@@ -15,6 +15,23 @@ export interface CompactionConfig {
summaryMaxTokens: number;
/** Preserve messages at or above this importance score from compaction. */
importanceThreshold: number;
/** Optional proactive context usage thresholds and actions. */
proactive?: ProactiveCompactionConfig;
}
export interface ProactiveCompactionConfig {
/** Enable proactive context warnings/checkpoints before hard compaction cliffs. */
enabled: boolean;
/** Emit warning signals when usage crosses this percentage. */
warnPct: number;
/** Save a checkpoint summary to memory when usage crosses this percentage. */
checkpointPct: number;
/** Auto-run compaction when usage crosses this percentage. */
autoCompactPct: number;
/** Cooldown window between checkpoint writes. */
checkpointCooldownMs: number;
/** Memory namespace base for proactive checkpoints. */
memoryNamespace: string;
}
export interface CompactionResult {
@@ -33,6 +50,14 @@ export const DEFAULT_COMPACTION_CONFIG: CompactionConfig = {
keepTurns: 4,
summaryMaxTokens: 1024,
importanceThreshold: 1,
proactive: {
enabled: false,
warnPct: 75,
checkpointPct: 85,
autoCompactPct: 95,
checkpointCooldownMs: 300_000,
memoryNamespace: 'session/checkpoints',
},
};
export async function compactHistory(opts: {
+8
View File
@@ -218,6 +218,14 @@ export function createMessageRouter(deps: {
keepTurns: deps.config.compaction.keep_turns,
summaryMaxTokens: deps.config.compaction.summary_max_tokens,
importanceThreshold: deps.config.compaction.importance_threshold,
proactive: {
enabled: deps.config.compaction.proactive.enabled,
warnPct: deps.config.compaction.proactive.warn_pct,
checkpointPct: deps.config.compaction.proactive.checkpoint_pct,
autoCompactPct: deps.config.compaction.proactive.auto_compact_pct,
checkpointCooldownMs: deps.config.compaction.proactive.checkpoint_cooldown_ms,
memoryNamespace: deps.config.compaction.proactive.memory_namespace,
},
} : undefined,
modelName: effectiveModelName,
contextWindow: effectiveContextWindow,