From fee8be1de0db3d0e6274e72e6a4fe17c521fe1d7 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 16 Feb 2026 15:44:09 -0800 Subject: [PATCH] feat(gateway): expose context usage and warning events --- src/daemon/services.ts | 31 ++ src/gateway/handlers/agent.test.ts | 64 +++ src/gateway/handlers/agent.ts | 682 +++++++++++++------------- src/gateway/handlers/handlers.test.ts | 64 +++ src/gateway/handlers/system.ts | 21 + src/gateway/protocol.test.ts | 25 + src/gateway/protocol.ts | 20 + src/gateway/server.ts | 5 +- src/gateway/session-bridge.ts | 49 ++ src/gateway/ui/pages/chat.js | 9 + src/gateway/ui/pages/usage.js | 9 + 11 files changed, 645 insertions(+), 334 deletions(-) diff --git a/src/daemon/services.ts b/src/daemon/services.ts index dff1bbe..71a534a 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -411,6 +411,37 @@ export function createGateway(deps: GatewayDeps): GatewayServer { } } + return results; + }, + getContextUsage: () => { + const results: Array<{ + sessionId: string; + budget: { + estimatedTokens: number; + contextWindow: number; + remainingTokens: number; + usagePct: number; + thresholdPct: number; + thresholdTokens: number; + shouldCompact: boolean; + }; + }> = []; + + const sessionBridge = gateway.getSessionBridge(); + for (const entry of sessionBridge.getAllContextUsage()) { + results.push(entry); + } + + const channelAgents = getChannelAgents(); + if (channelAgents) { + for (const [sessionId, { orchestrator }] of channelAgents) { + results.push({ + sessionId, + budget: orchestrator.getContextBudget(), + }); + } + } + return results; }, }); diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 96f8670..2ca0e47 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -9,6 +9,16 @@ import { initAuditLogger } from '../../audit/index.js'; describe('createAgentHandlers command fast-path', () => { const mockAgent = { process: vi.fn(async () => 'agent response'), + consumeContextAlert: vi.fn(() => undefined as unknown), + getContextBudget: vi.fn(() => ({ + estimatedTokens: 100, + contextWindow: 200000, + remainingTokens: 199900, + usagePct: 0.05, + thresholdPct: 80, + thresholdTokens: 160000, + shouldCompact: false, + })), getUsage: vi.fn(() => ({ primary: { inputTokens: 10, outputTokens: 5, calls: 1 }, delegation: {}, @@ -169,12 +179,56 @@ describe('createAgentHandlers command fast-path', () => { expect(mockAgent.process).not.toHaveBeenCalled(); expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Set queue.mode=followup'); }); + + it('emits context_warning event before done when orchestrator reports an alert', async () => { + mockAgent.consumeContextAlert.mockReturnValueOnce({ + level: 'checkpoint', + message: 'Context usage is 86.0% (172000/200000 estimated tokens).', + budget: { + estimatedTokens: 172000, + contextWindow: 200000, + remainingTokens: 28000, + usagePct: 86, + thresholdPct: 80, + thresholdTokens: 160000, + shouldCompact: true, + }, + actions: { + checkpointSaved: true, + autoCompacted: false, + checkpointNamespace: 'session/checkpoints/ws/conn-1', + }, + }); + + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + + await handlers['agent.send']({ + id: 6, + method: 'agent.send', + params: { message: 'hello', connectionId: 'conn-1' }, + }, send); + + expect(sent).toHaveLength(2); + expect((sent[0] as GatewayEvent).event).toBe('context_warning'); + expect((sent[1] as GatewayEvent).event).toBe('done'); + }); }); describe('createAgentHandlers queue policy resolution', () => { it('passes resolved per-request queue policy into lane enqueue', async () => { const mockAgent = { process: vi.fn(async () => 'ok'), + consumeContextAlert: vi.fn(() => undefined), + getContextBudget: vi.fn(() => ({ + estimatedTokens: 0, + contextWindow: 128000, + remainingTokens: 128000, + usagePct: 0, + thresholdPct: 80, + thresholdTokens: 102400, + shouldCompact: false, + })), getUsage: vi.fn(() => ({ primary: { inputTokens: 0, outputTokens: 0, calls: 0 }, delegation: {}, @@ -234,6 +288,16 @@ describe('createAgentHandlers queue policy resolution', () => { const sessionBridge = { getAgent: vi.fn(() => ({ process: vi.fn(async () => 'ok'), + consumeContextAlert: vi.fn(() => undefined), + getContextBudget: vi.fn(() => ({ + estimatedTokens: 0, + contextWindow: 128000, + remainingTokens: 128000, + usagePct: 0, + thresholdPct: 80, + thresholdTokens: 102400, + shouldCompact: false, + })), getUsage: vi.fn(() => ({ primary: { inputTokens: 0, outputTokens: 0, calls: 0 }, delegation: {}, diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index c7bf68c..5354aa6 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -66,361 +66,377 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { try { return await deps.laneQueue.enqueue(laneId, async () => { - deps.sessionBridge.setBusy(connectionId, true); + deps.sessionBridge.setBusy(connectionId, true); - const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string' - ? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}` - : (safeParams.message ?? ''); + const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string' + ? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}` + : (safeParams.message ?? ''); - const parsedCommand = safeParams.metadata?.isCommand - ? safeParams.metadata.command - : commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined; - auditLogger?.userAction({ - session_id: sessionId ?? `ws:${connectionId}`, - channel: 'ws', - sender: connectionId, - source: 'gateway', - action_type: parsedCommand ? 'command' : 'message', - content_length: commandInput.length, - attachments_count: safeParams.attachments?.length ?? 0, - command: parsedCommand, - }); - - if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { - const sessionId = deps.sessionBridge.getSessionId(connectionId); - const commandResult = await deps.commandRegistry.execute(commandInput, { + const parsedCommand = safeParams.metadata?.isCommand + ? safeParams.metadata.command + : commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined; + auditLogger?.userAction({ + session_id: sessionId ?? `ws:${connectionId}`, channel: 'ws', - senderId: connectionId, - sessionId: sessionId ?? `ws:${connectionId}`, - rawInput: commandInput, - services: { - getStatus: () => `Gateway session active. Current model tier: ${agent.getModelTier()}`, - getUsage: () => { - const usage = agent.getUsage(); - const lines = [ - '**Token Usage**', - '', - `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, - ]; + sender: connectionId, + source: 'gateway', + action_type: parsedCommand ? 'command' : 'message', + content_length: commandInput.length, + attachments_count: safeParams.attachments?.length ?? 0, + command: parsedCommand, + }); - const delegationEntries = Object.entries(usage.delegation); - if (delegationEntries.length > 0) { - lines.push(''); - lines.push('Delegation:'); - for (const [tier, stats] of delegationEntries) { - lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); + if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { + const sessionId = deps.sessionBridge.getSessionId(connectionId); + const commandResult = await deps.commandRegistry.execute(commandInput, { + channel: 'ws', + senderId: connectionId, + sessionId: sessionId ?? `ws:${connectionId}`, + rawInput: commandInput, + services: { + getStatus: () => `Gateway session active. Current model tier: ${agent.getModelTier()}`, + getUsage: () => { + const usage = agent.getUsage(); + const budget = agent.getContextBudget(); + const lines = [ + '**Token Usage**', + '', + `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, + ]; + + const delegationEntries = Object.entries(usage.delegation); + if (delegationEntries.length > 0) { + lines.push(''); + lines.push('Delegation:'); + for (const [tier, stats] of delegationEntries) { + lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); + } } - } - lines.push(''); - lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); + lines.push(''); + lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); + lines.push(''); + lines.push('**Context usage (estimated):** ' + + `${budget.estimatedTokens.toLocaleString()}/${budget.contextWindow.toLocaleString()} ` + + `(${budget.usagePct.toFixed(1)}%)`); - if (usage.total.estimatedCost > 0) { - lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); - } + if (usage.total.estimatedCost > 0) { + lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); + } - return lines.join('\n'); - }, - getModel: () => `Current model tier: ${agent.getModelTier()}`, - setModel: (input) => { - const raw = input.trim(); - if (!raw) { - return 'Usage: /model '; - } - const [requestedTier, ...rest] = raw.split(/\s+/); - const validTiers: ModelTier[] = ['fast', 'default', 'complex', 'local']; - const modelTier = requestedTier as ModelTier; - if (!validTiers.includes(modelTier)) { - return `Invalid tier: ${requestedTier}. Available: ${validTiers.join(', ')}`; - } - if (rest.length > 0) { - return `Switched to model tier: ${modelTier}\nNote: provider/model switching is not available via gateway (/model ).`; - } - agent.setModelTier(modelTier); - if (sessionId && deps.sessionManager) { - deps.sessionManager.setSessionConfig('ws', sessionId, 'modelTier', modelTier); - } - return `Switched to model tier: ${modelTier}`; - }, - compact: async () => { - const result = await agent.compact(); - if (result && result.compactedCount > 0) { - return `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`; - } - return 'Nothing to compact.'; - }, - reset: () => { - agent.reset(); - if (sessionId && deps.sessionManager) { - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'modelTier'); - } - return 'Session reset.'; - }, + return lines.join('\n'); + }, + getModel: () => `Current model tier: ${agent.getModelTier()}`, + setModel: (input) => { + const raw = input.trim(); + if (!raw) { + return 'Usage: /model '; + } + const [requestedTier, ...rest] = raw.split(/\s+/); + const validTiers: ModelTier[] = ['fast', 'default', 'complex', 'local']; + const modelTier = requestedTier as ModelTier; + if (!validTiers.includes(modelTier)) { + return `Invalid tier: ${requestedTier}. Available: ${validTiers.join(', ')}`; + } + if (rest.length > 0) { + return `Switched to model tier: ${modelTier}\nNote: provider/model switching is not available via gateway (/model ).`; + } + agent.setModelTier(modelTier); + if (sessionId && deps.sessionManager) { + deps.sessionManager.setSessionConfig('ws', sessionId, 'modelTier', modelTier); + } + return `Switched to model tier: ${modelTier}`; + }, + compact: async () => { + const result = await agent.compact(); + if (result && result.compactedCount > 0) { + return `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`; + } + return 'Nothing to compact.'; + }, + reset: () => { + agent.reset(); + if (sessionId && deps.sessionManager) { + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'modelTier'); + } + return 'Session reset.'; + }, - getElevation: () => { - if (!sessionId || !deps.sessionManager) { - return 'Elevated mode: off'; - } - const untilRaw = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms'); - const reason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? ''; - const id = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? ''; - if (!untilRaw || !id) { - return 'Elevated mode: off'; - } - const untilMs = Number.parseInt(untilRaw, 10); - if (!Number.isFinite(untilMs)) { - return 'Elevated mode: off'; - } - const now = Date.now(); - if (untilMs <= now) { - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id'); - auditLogger?.securityElevationExpired({ + getElevation: () => { + if (!sessionId || !deps.sessionManager) { + return 'Elevated mode: off'; + } + const untilRaw = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms'); + const reason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? ''; + const id = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? ''; + if (!untilRaw || !id) { + return 'Elevated mode: off'; + } + const untilMs = Number.parseInt(untilRaw, 10); + if (!Number.isFinite(untilMs)) { + return 'Elevated mode: off'; + } + const now = Date.now(); + if (untilMs <= now) { + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id'); + auditLogger?.securityElevationExpired({ + session_id: `ws:${sessionId}`, + channel: 'ws', + sender: connectionId, + elevation_id: id, + until_ms: untilMs, + reason: reason || undefined, + }); + return 'Elevated mode: off (expired)'; + } + const remainingMs = untilMs - now; + const remainingSec = Math.ceil(remainingMs / 1000); + return `Elevated mode: on (${remainingSec}s remaining)${reason ? ` — ${reason}` : ''}`; + }, + + setElevation: (input: string) => { + if (!sessionId || !deps.sessionManager) { + return 'Elevate command is not available in this session.'; + } + const raw = input.trim(); + const parts = raw.split(/\s+/); + const hasYes = parts.includes('--yes') || parts.includes('--confirm'); + const filtered = parts.filter(p => p !== '--yes' && p !== '--confirm'); + + if (filtered.length === 0) { + return 'Usage: /elevate --yes | /elevate off --yes'; + } + + if (filtered[0] === 'off') { + if (!hasYes) { + return 'Refusing to disable elevation without explicit confirmation. Use: /elevate off --yes'; + } + const existingId = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? randomUUID(); + const existingUntil = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms'); + const existingReason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? ''; + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id'); + auditLogger?.securityElevationDisabled({ + session_id: `ws:${sessionId}`, + channel: 'ws', + sender: connectionId, + elevation_id: existingId, + until_ms: existingUntil ? Number.parseInt(existingUntil, 10) : undefined, + reason: existingReason || undefined, + }); + return 'Elevated mode: off'; + } + + if (!hasYes) { + return 'Refusing to enable elevation without explicit confirmation. Use: /elevate --yes'; + } + + const dur = filtered[0]; + const reason = filtered.slice(1).join(' ').trim(); + const ttlMs = (() => { + const m = dur.match(/^(\d+)([smhd])$/i); + if (!m) { + return null; + } + const n = Number.parseInt(m[1], 10); + if (!Number.isFinite(n) || n <= 0) { + return null; + } + const unit = m[2].toLowerCase(); + if (unit === 's') {return n * 1000;} + if (unit === 'm') {return n * 60_000;} + if (unit === 'h') {return n * 3_600_000;} + if (unit === 'd') {return n * 86_400_000;} + return null; + })(); + if (!ttlMs) { + return 'Invalid duration. Use one of: 30s, 10m, 1h, 1d'; + } + + const untilMs = Date.now() + ttlMs; + const id = randomUUID(); + deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.until_ms', String(untilMs)); + deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.id', id); + if (reason) { + deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.reason', reason); + } else { + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason'); + } + + auditLogger?.securityElevationEnabled({ session_id: `ws:${sessionId}`, channel: 'ws', sender: connectionId, elevation_id: id, until_ms: untilMs, + ttl_ms: ttlMs, reason: reason || undefined, }); - return 'Elevated mode: off (expired)'; - } - const remainingMs = untilMs - now; - const remainingSec = Math.ceil(remainingMs / 1000); - return `Elevated mode: on (${remainingSec}s remaining)${reason ? ` — ${reason}` : ''}`; + + return `Elevated mode: on until ${new Date(untilMs).toISOString()}`; + }, + + getQueue: () => { + const mode = resolvedPolicy?.mode ?? 'collect'; + const cap = resolvedPolicy?.cap ?? 50; + const overflow = resolvedPolicy?.overflow ?? 'drop_old'; + const debounceMs = resolvedPolicy?.debounceMs ?? 0; + const summarizeOverflow = resolvedPolicy?.summarizeOverflow ?? true; + const source = deps.sessionManager && sessionId + ? deps.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode') ? 'session override' : 'default/channel' + : 'default/channel'; + return [ + '**Queue policy**', + `mode: ${mode}`, + `cap: ${cap}`, + `overflow: ${overflow}`, + `debounce_ms: ${debounceMs}`, + `summarize_overflow: ${summarizeOverflow}`, + `source: ${source}`, + ].join('\n'); + }, + + setQueue: (input: string) => { + if (!deps.sessionManager || !sessionId) { + return 'Queue command is not available in this session.'; + } + const [rawKey, ...rest] = input.trim().split(/\s+/); + const value = rest.join(' ').trim(); + if (!rawKey || !value) { + return 'Usage: /queue '; + } + const key = rawKey.toLowerCase(); + if (key === 'mode') { + if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(value)) { + return 'Invalid mode. Use one of: collect, followup, steer, steer_backlog, interrupt'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.mode', value); + return `Set queue.mode=${value} for this session`; + } + if (key === 'cap') { + const cap = Number.parseInt(value, 10); + if (!Number.isFinite(cap) || cap < 1 || cap > 1000) { + return 'Invalid cap. Use an integer between 1 and 1000'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.cap', String(cap)); + return `Set queue.cap=${cap} for this session`; + } + if (key === 'overflow') { + if (value !== 'drop_old' && value !== 'drop_new') { + return 'Invalid overflow. Use drop_old or drop_new'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.overflow', value); + return `Set queue.overflow=${value} for this session`; + } + if (key === 'debounce_ms') { + const debounceMs = Number.parseInt(value, 10); + if (!Number.isFinite(debounceMs) || debounceMs < 0 || debounceMs > 60_000) { + return 'Invalid debounce_ms. Use an integer between 0 and 60000'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.debounce_ms', String(debounceMs)); + return `Set queue.debounce_ms=${debounceMs} for this session`; + } + if (key === 'summarize_overflow') { + const normalized = value.toLowerCase(); + if (normalized !== 'true' && normalized !== 'false') { + return 'Invalid summarize_overflow. Use true or false'; + } + deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.summarize_overflow', normalized); + return `Set queue.summarize_overflow=${normalized} for this session`; + } + return 'Unknown queue key. Use one of: mode, cap, overflow, debounce_ms, summarize_overflow'; + }, + + resetQueue: () => { + if (!deps.sessionManager || !sessionId) { + return 'Queue command is not available in this session.'; + } + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.mode'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.cap'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.overflow'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.debounce_ms'); + deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.summarize_overflow'); + return 'Reset session queue overrides.'; + }, }, + }); - setElevation: (input: string) => { - if (!sessionId || !deps.sessionManager) { - return 'Elevate command is not available in this session.'; - } - const raw = input.trim(); - const parts = raw.split(/\s+/); - const hasYes = parts.includes('--yes') || parts.includes('--confirm'); - const filtered = parts.filter(p => p !== '--yes' && p !== '--confirm'); - - if (filtered.length === 0) { - return 'Usage: /elevate --yes | /elevate off --yes'; - } - - if (filtered[0] === 'off') { - if (!hasYes) { - return 'Refusing to disable elevation without explicit confirmation. Use: /elevate off --yes'; - } - const existingId = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? randomUUID(); - const existingUntil = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms'); - const existingReason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? ''; - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id'); - auditLogger?.securityElevationDisabled({ - session_id: `ws:${sessionId}`, - channel: 'ws', - sender: connectionId, - elevation_id: existingId, - until_ms: existingUntil ? Number.parseInt(existingUntil, 10) : undefined, - reason: existingReason || undefined, - }); - return 'Elevated mode: off'; - } - - if (!hasYes) { - return 'Refusing to enable elevation without explicit confirmation. Use: /elevate --yes'; - } - - const dur = filtered[0]; - const reason = filtered.slice(1).join(' ').trim(); - const ttlMs = (() => { - const m = dur.match(/^(\d+)([smhd])$/i); - if (!m) { - return null; - } - const n = Number.parseInt(m[1], 10); - if (!Number.isFinite(n) || n <= 0) { - return null; - } - const unit = m[2].toLowerCase(); - if (unit === 's') {return n * 1000;} - if (unit === 'm') {return n * 60_000;} - if (unit === 'h') {return n * 3_600_000;} - if (unit === 'd') {return n * 86_400_000;} - return null; - })(); - if (!ttlMs) { - return 'Invalid duration. Use one of: 30s, 10m, 1h, 1d'; - } - - const untilMs = Date.now() + ttlMs; - const id = randomUUID(); - deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.until_ms', String(untilMs)); - deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.id', id); - if (reason) { - deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.reason', reason); - } else { - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason'); - } - - auditLogger?.securityElevationEnabled({ - session_id: `ws:${sessionId}`, - channel: 'ws', - sender: connectionId, - elevation_id: id, - until_ms: untilMs, - ttl_ms: ttlMs, - reason: reason || undefined, - }); - - return `Elevated mode: on until ${new Date(untilMs).toISOString()}`; - }, - - getQueue: () => { - const mode = resolvedPolicy?.mode ?? 'collect'; - const cap = resolvedPolicy?.cap ?? 50; - const overflow = resolvedPolicy?.overflow ?? 'drop_old'; - const debounceMs = resolvedPolicy?.debounceMs ?? 0; - const summarizeOverflow = resolvedPolicy?.summarizeOverflow ?? true; - const source = deps.sessionManager && sessionId - ? deps.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode') ? 'session override' : 'default/channel' - : 'default/channel'; - return [ - '**Queue policy**', - `mode: ${mode}`, - `cap: ${cap}`, - `overflow: ${overflow}`, - `debounce_ms: ${debounceMs}`, - `summarize_overflow: ${summarizeOverflow}`, - `source: ${source}`, - ].join('\n'); - }, - - setQueue: (input: string) => { - if (!deps.sessionManager || !sessionId) { - return 'Queue command is not available in this session.'; - } - const [rawKey, ...rest] = input.trim().split(/\s+/); - const value = rest.join(' ').trim(); - if (!rawKey || !value) { - return 'Usage: /queue '; - } - const key = rawKey.toLowerCase(); - if (key === 'mode') { - if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(value)) { - return 'Invalid mode. Use one of: collect, followup, steer, steer_backlog, interrupt'; - } - deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.mode', value); - return `Set queue.mode=${value} for this session`; - } - if (key === 'cap') { - const cap = Number.parseInt(value, 10); - if (!Number.isFinite(cap) || cap < 1 || cap > 1000) { - return 'Invalid cap. Use an integer between 1 and 1000'; - } - deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.cap', String(cap)); - return `Set queue.cap=${cap} for this session`; - } - if (key === 'overflow') { - if (value !== 'drop_old' && value !== 'drop_new') { - return 'Invalid overflow. Use drop_old or drop_new'; - } - deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.overflow', value); - return `Set queue.overflow=${value} for this session`; - } - if (key === 'debounce_ms') { - const debounceMs = Number.parseInt(value, 10); - if (!Number.isFinite(debounceMs) || debounceMs < 0 || debounceMs > 60_000) { - return 'Invalid debounce_ms. Use an integer between 0 and 60000'; - } - deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.debounce_ms', String(debounceMs)); - return `Set queue.debounce_ms=${debounceMs} for this session`; - } - if (key === 'summarize_overflow') { - const normalized = value.toLowerCase(); - if (normalized !== 'true' && normalized !== 'false') { - return 'Invalid summarize_overflow. Use true or false'; - } - deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.summarize_overflow', normalized); - return `Set queue.summarize_overflow=${normalized} for this session`; - } - return 'Unknown queue key. Use one of: mode, cap, overflow, debounce_ms, summarize_overflow'; - }, - - resetQueue: () => { - if (!deps.sessionManager || !sessionId) { - return 'Queue command is not available in this session.'; - } - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.mode'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.cap'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.overflow'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.debounce_ms'); - deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.summarize_overflow'); - return 'Reset session queue overrides.'; - }, - }, - }); - - if (commandResult.handled) { - send(makeEvent(request.id, 'done', { content: commandResult.text })); - return; - } - } - - // Set up tool use callback to emit streaming events - deps.sessionBridge.setOnToolUse(connectionId, (event) => { - if (event.type === 'start') { - send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args })); - } else if (event.type === 'end') { - send(makeEvent(request.id, 'tool_end', { - tool: event.tool, - result: event.result ? { - success: event.result.success, - output: event.result.output, - error: event.result.error, - } : undefined, - })); - // Record tool failures as error events - if (event.result && !event.result.success) { - deps.metrics?.incrementErrors(); - deps.metrics?.recordEvent({ - timestamp: Date.now(), - level: 'error', - source: 'tool', - message: `Tool '${event.tool}' failed: ${event.result.error ?? 'unknown error'}`, - context: { sessionId: laneId, tool: event.tool }, - }); + if (commandResult.handled) { + send(makeEvent(request.id, 'done', { content: commandResult.text })); + return; } } - }); - try { - // Convert gateway attachments to channel attachments - const attachments: Attachment[] | undefined = safeParams.attachments?.map(a => ({ - mimeType: a.mimeType, - data: a.data, - url: a.url, - filename: a.filename, - })); - - const response = await agent.process(safeParams.message ?? '', attachments); - deps.metrics?.incrementMessages(); - send(makeEvent(request.id, 'done', { content: response })); - } catch (err) { - const message = err instanceof Error ? err.message : 'Unknown error'; - deps.metrics?.incrementErrors(); - deps.metrics?.recordEvent({ - timestamp: Date.now(), - level: 'error', - source: 'agent.send', - message, - context: { sessionId: laneId }, + // Set up tool use callback to emit streaming events + deps.sessionBridge.setOnToolUse(connectionId, (event) => { + if (event.type === 'start') { + send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args })); + } else if (event.type === 'end') { + send(makeEvent(request.id, 'tool_end', { + tool: event.tool, + result: event.result ? { + success: event.result.success, + output: event.result.output, + error: event.result.error, + } : undefined, + })); + // Record tool failures as error events + if (event.result && !event.result.success) { + deps.metrics?.incrementErrors(); + deps.metrics?.recordEvent({ + timestamp: Date.now(), + level: 'error', + source: 'tool', + message: `Tool '${event.tool}' failed: ${event.result.error ?? 'unknown error'}`, + context: { sessionId: laneId, tool: event.tool }, + }); + } + } }); - send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message })); - } finally { - deps.sessionBridge.setBusy(connectionId, false); - deps.sessionBridge.setOnToolUse(connectionId, undefined); - deps.metrics?.endRequest(requestId); - } - }, resolvedPolicy); + + try { + // Convert gateway attachments to channel attachments + const attachments: Attachment[] | undefined = safeParams.attachments?.map(a => ({ + mimeType: a.mimeType, + data: a.data, + url: a.url, + filename: a.filename, + })); + + const response = await agent.process(safeParams.message ?? '', attachments); + deps.metrics?.incrementMessages(); + const contextAlert = agent.consumeContextAlert(); + if (contextAlert) { + send(makeEvent(request.id, 'context_warning', contextAlert)); + deps.metrics?.recordEvent({ + timestamp: Date.now(), + level: 'warn', + source: 'context', + message: contextAlert.message, + context: { sessionId: laneId, level: contextAlert.level }, + }); + } + send(makeEvent(request.id, 'done', { content: response })); + } catch (err) { + const message = err instanceof Error ? err.message : 'Unknown error'; + deps.metrics?.incrementErrors(); + deps.metrics?.recordEvent({ + timestamp: Date.now(), + level: 'error', + source: 'agent.send', + message, + context: { sessionId: laneId }, + }); + send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message })); + } finally { + deps.sessionBridge.setBusy(connectionId, false); + deps.sessionBridge.setOnToolUse(connectionId, undefined); + deps.metrics?.endRequest(requestId); + } + }, resolvedPolicy); } catch (err) { if (err instanceof LaneQueueRejectedError) { send(makeEvent(request.id, 'error', { diff --git a/src/gateway/handlers/handlers.test.ts b/src/gateway/handlers/handlers.test.ts index b3b95de..dbe946b 100644 --- a/src/gateway/handlers/handlers.test.ts +++ b/src/gateway/handlers/handlers.test.ts @@ -327,6 +327,60 @@ describe('system.tokenUsage handler', () => { }); }); +describe('system.contextUsage handler', () => { + it('returns empty sessions when no getContextUsage provided', async () => { + const handlers = createSystemHandlers({ + startTime: Date.now(), + version: '0.1.0', + getSessionCount: () => 0, + getToolCount: () => 0, + getConnectionCount: () => 0, + }); + + const req: GatewayRequest = { id: 21, method: 'system.contextUsage' }; + const result = await handlers['system.contextUsage'](req) as GatewayResponse; + + expect(result.id).toBe(21); + const r = result.result as { sessions: unknown[] }; + expect(r.sessions).toEqual([]); + }); + + it('returns session context budget data from getContextUsage callback', async () => { + const mockUsage = [ + { + sessionId: 'telegram:user1', + budget: { + estimatedTokens: 120000, + contextWindow: 200000, + remainingTokens: 80000, + usagePct: 60, + thresholdPct: 80, + thresholdTokens: 160000, + shouldCompact: false, + }, + }, + ]; + + const handlers = createSystemHandlers({ + startTime: Date.now(), + version: '0.1.0', + getSessionCount: () => 1, + getToolCount: () => 0, + getConnectionCount: () => 1, + getContextUsage: () => mockUsage, + }); + + const req: GatewayRequest = { id: 22, method: 'system.contextUsage' }; + const result = await handlers['system.contextUsage'](req) as GatewayResponse; + + const r = result.result as { sessions: typeof mockUsage }; + expect(r.sessions).toHaveLength(1); + expect(r.sessions[0].sessionId).toBe('telegram:user1'); + expect(r.sessions[0].budget.usagePct).toBe(60); + expect(r.sessions[0].budget.shouldCompact).toBe(false); + }); +}); + describe('system.sessionAnalytics handler', () => { it('returns empty analytics when callback is not provided', async () => { const handlers = createSystemHandlers({ @@ -614,6 +668,16 @@ describe('tool handlers', () => { describe('agent handlers', () => { const mockAgent = { process: vi.fn(async () => 'response text'), + consumeContextAlert: vi.fn(() => undefined), + getContextBudget: vi.fn(() => ({ + estimatedTokens: 0, + contextWindow: 128000, + remainingTokens: 128000, + usagePct: 0, + thresholdPct: 80, + thresholdTokens: 102400, + shouldCompact: false, + })), setOnToolUse: vi.fn(), }; diff --git a/src/gateway/handlers/system.ts b/src/gateway/handlers/system.ts index 5e88cd2..e364f52 100644 --- a/src/gateway/handlers/system.ts +++ b/src/gateway/handlers/system.ts @@ -13,6 +13,20 @@ export interface TokenUsageEntry { total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number }; } +/** Per-session context budget report returned by system.contextUsage. */ +export interface ContextUsageEntry { + sessionId: string; + budget: { + estimatedTokens: number; + contextWindow: number; + remainingTokens: number; + usagePct: number; + thresholdPct: number; + thresholdTokens: number; + shouldCompact: boolean; + }; +} + export interface PresenceEntry { channel: string; senderId: string; @@ -63,6 +77,8 @@ export interface SystemHandlerDeps { getUsage?: () => { totalSessions: number; activeConnections: number }; /** Optional callback to retrieve per-session token usage data. */ getTokenUsage?: () => TokenUsageEntry[]; + /** Optional callback to retrieve per-session context budget data. */ + getContextUsage?: () => ContextUsageEntry[]; /** Optional callback to retrieve aggregated metrics snapshot. */ getMetrics?: () => MetricsSnapshot; /** Optional callback to retrieve session analytics. */ @@ -202,6 +218,11 @@ export function createSystemHandlers(deps: SystemHandlerDeps) { return makeResponse(request.id, { sessions }); }, + 'system.contextUsage': async (request: GatewayRequest): Promise => { + const sessions = deps.getContextUsage?.() ?? []; + return makeResponse(request.id, { sessions }); + }, + 'system.metrics': async (request: GatewayRequest): Promise => { if (!deps.getMetrics) { return makeResponse(request.id, {}); diff --git a/src/gateway/protocol.test.ts b/src/gateway/protocol.test.ts index 06fcce2..a1680a1 100644 --- a/src/gateway/protocol.test.ts +++ b/src/gateway/protocol.test.ts @@ -308,5 +308,30 @@ describe('protocol', () => { data, }); }); + + it('creates a context warning event message', () => { + const data = { + level: 'warning', + message: 'Context usage is 76.0%', + budget: { + estimatedTokens: 76000, + contextWindow: 100000, + remainingTokens: 24000, + usagePct: 76, + thresholdPct: 80, + thresholdTokens: 80000, + shouldCompact: false, + }, + actions: { + checkpointSaved: false, + autoCompacted: false, + }, + }; + expect(makeEvent(3, 'context_warning', data)).toEqual({ + id: 3, + event: 'context_warning', + data, + }); + }); }); }); diff --git a/src/gateway/protocol.ts b/src/gateway/protocol.ts index 70972ed..c57e566 100644 --- a/src/gateway/protocol.ts +++ b/src/gateway/protocol.ts @@ -93,6 +93,7 @@ export type EventType = | 'content' | 'tool_start' | 'tool_end' + | 'context_warning' | 'attachment' | 'done' | 'error'; @@ -115,6 +116,25 @@ export interface ToolEndEventData { }; } +export interface ContextWarningEventData { + level: 'warning' | 'checkpoint' | 'critical'; + message: string; + budget: { + estimatedTokens: number; + contextWindow: number; + remainingTokens: number; + usagePct: number; + thresholdPct: number; + thresholdTokens: number; + shouldCompact: boolean; + }; + actions: { + checkpointSaved: boolean; + autoCompacted: boolean; + checkpointNamespace?: string; + }; +} + export interface AttachmentEventData { mimeType: string; data?: string; diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 56f273a..4ae4a67 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -33,7 +33,7 @@ import { createNodeHandlers, } from './handlers/index.js'; import { discoverServices } from './handlers/services.js'; -import type { TokenUsageEntry } from './handlers/system.js'; +import type { TokenUsageEntry, ContextUsageEntry } from './handlers/system.js'; import type { NodeConnectionState } from './handlers/node.js'; import type { SessionManager } from '../session/manager.js'; import type { Config } from '../config/index.js'; @@ -82,6 +82,8 @@ export interface GatewayServerConfig { gmailHandler?: GmailWatcher; /** Optional callback to retrieve per-session token usage data for the dashboard. */ getTokenUsage?: () => TokenUsageEntry[]; + /** Optional callback to retrieve per-session context usage data for the dashboard. */ + getContextUsage?: () => ContextUsageEntry[]; /** Maximum allowed request body size for inbound HTTP POST bodies. */ maxRequestBodyBytes?: number; /** Per-connection WebSocket ingress rate limiting. */ @@ -294,6 +296,7 @@ export class GatewayServer { activeConnections: this.sessionBridge.connectionCount, }), getTokenUsage: this.config.getTokenUsage, + getContextUsage: this.config.getContextUsage, getMetrics: () => this.metrics.getSnapshot(), getEvents: (opts) => this.metrics.getEvents(opts), getActiveRequests: () => this.metrics.getActiveRequests(), diff --git a/src/gateway/session-bridge.ts b/src/gateway/session-bridge.ts index 367c544..fc67c88 100644 --- a/src/gateway/session-bridge.ts +++ b/src/gateway/session-bridge.ts @@ -200,6 +200,47 @@ export class SessionBridge { return results; } + /** Get estimated context budget for all active sessions. */ + getAllContextUsage(): Array<{ + sessionId: string; + budget: { + estimatedTokens: number; + contextWindow: number; + remainingTokens: number; + usagePct: number; + thresholdPct: number; + thresholdTokens: number; + shouldCompact: boolean; + }; + }> { + const results: Array<{ + sessionId: string; + budget: { + estimatedTokens: number; + contextWindow: number; + remainingTokens: number; + usagePct: number; + thresholdPct: number; + thresholdTokens: number; + shouldCompact: boolean; + }; + }> = []; + + const seen = new Set(); + for (const client of this.clients.values()) { + if (seen.has(client.sessionId)) { + continue; + } + seen.add(client.sessionId); + results.push({ + sessionId: client.sessionId, + budget: client.agent.getContextBudget(), + }); + } + + return results; + } + private getOrCreateAgent(sessionId: string): AgentOrchestrator { let agent = this.agents.get(sessionId); if (!agent) { @@ -233,6 +274,14 @@ export class SessionBridge { keepTurns: config.compaction.keep_turns, summaryMaxTokens: config.compaction.summary_max_tokens, importanceThreshold: config.compaction.importance_threshold, + proactive: { + enabled: config.compaction.proactive.enabled, + warnPct: config.compaction.proactive.warn_pct, + checkpointPct: config.compaction.proactive.checkpoint_pct, + autoCompactPct: config.compaction.proactive.auto_compact_pct, + checkpointCooldownMs: config.compaction.proactive.checkpoint_cooldown_ms, + memoryNamespace: config.compaction.proactive.memory_namespace, + }, } : undefined, modelName: config?.models.default.model, contextWindow: config?.models.default.context_window, diff --git a/src/gateway/ui/pages/chat.js b/src/gateway/ui/pages/chat.js index d573a9b..1d5453d 100644 --- a/src/gateway/ui/pages/chat.js +++ b/src/gateway/ui/pages/chat.js @@ -593,6 +593,15 @@ async function sendMessage(client, overrideText) { scrollToBottom(); }); + stream.on('context_warning', (data) => { + const note = document.createElement('div'); + note.className = 'message assistant'; + const text = data?.message || 'Context usage is getting high.'; + note.innerHTML = renderSafeMarkdown(`> ${text}`); + _elements.messages.insertBefore(note, placeholder); + scrollToBottom(); + }); + const done = await stream.result; // Replace placeholder with actual response placeholder.classList.remove('streaming-cursor'); diff --git a/src/gateway/ui/pages/usage.js b/src/gateway/ui/pages/usage.js index 5b4d743..a0337f8 100644 --- a/src/gateway/ui/pages/usage.js +++ b/src/gateway/ui/pages/usage.js @@ -26,15 +26,18 @@ function truncateId(id) { async function loadUsage(el, client) { let data; + let contextData; try { data = await client.call('system.tokenUsage'); + contextData = await client.call('system.contextUsage'); } catch (err) { el.innerHTML = `
Failed to load usage: ${err.message}
`; return; } const sessions = data?.sessions ?? []; + const contextBySession = new Map((contextData?.sessions ?? []).map(s => [s.sessionId, s.budget])); // Compute totals across all sessions let totalInput = 0; @@ -89,6 +92,10 @@ async function loadUsage(el, client) { const outTok = s.total?.outputTokens ?? 0; const calls = s.total?.calls ?? 0; const cost = s.total?.estimatedCost ?? 0; + const budget = contextBySession.get(s.sessionId); + const contextCell = budget + ? `${budget.usagePct.toFixed(1)}% (${formatNumber(budget.estimatedTokens)}/${formatNumber(budget.contextWindow)})` + : '-'; // Build delegation breakdown if present const delegationEntries = Object.entries(s.delegation ?? {}); @@ -107,6 +114,7 @@ async function loadUsage(el, client) { ${formatNumber(inTok + outTok)} ${formatNumber(calls)} ${formatCost(cost)} + ${contextCell} ${delegationCell} `; @@ -122,6 +130,7 @@ async function loadUsage(el, client) { Total Calls Cost + Context Delegation