feat(gateway): expose context usage and warning events

This commit is contained in:
William Valentin
2026-02-16 15:44:09 -08:00
parent 8758ea8f1c
commit fee8be1de0
11 changed files with 645 additions and 334 deletions
+31
View File
@@ -411,6 +411,37 @@ export function createGateway(deps: GatewayDeps): GatewayServer {
}
}
return results;
},
getContextUsage: () => {
const results: Array<{
sessionId: string;
budget: {
estimatedTokens: number;
contextWindow: number;
remainingTokens: number;
usagePct: number;
thresholdPct: number;
thresholdTokens: number;
shouldCompact: boolean;
};
}> = [];
const sessionBridge = gateway.getSessionBridge();
for (const entry of sessionBridge.getAllContextUsage()) {
results.push(entry);
}
const channelAgents = getChannelAgents();
if (channelAgents) {
for (const [sessionId, { orchestrator }] of channelAgents) {
results.push({
sessionId,
budget: orchestrator.getContextBudget(),
});
}
}
return results;
},
});
+64
View File
@@ -9,6 +9,16 @@ import { initAuditLogger } from '../../audit/index.js';
describe('createAgentHandlers command fast-path', () => {
const mockAgent = {
process: vi.fn(async () => 'agent response'),
consumeContextAlert: vi.fn(() => undefined as unknown),
getContextBudget: vi.fn(() => ({
estimatedTokens: 100,
contextWindow: 200000,
remainingTokens: 199900,
usagePct: 0.05,
thresholdPct: 80,
thresholdTokens: 160000,
shouldCompact: false,
})),
getUsage: vi.fn(() => ({
primary: { inputTokens: 10, outputTokens: 5, calls: 1 },
delegation: {},
@@ -169,12 +179,56 @@ describe('createAgentHandlers command fast-path', () => {
expect(mockAgent.process).not.toHaveBeenCalled();
expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Set queue.mode=followup');
});
it('emits context_warning event before done when orchestrator reports an alert', async () => {
mockAgent.consumeContextAlert.mockReturnValueOnce({
level: 'checkpoint',
message: 'Context usage is 86.0% (172000/200000 estimated tokens).',
budget: {
estimatedTokens: 172000,
contextWindow: 200000,
remainingTokens: 28000,
usagePct: 86,
thresholdPct: 80,
thresholdTokens: 160000,
shouldCompact: true,
},
actions: {
checkpointSaved: true,
autoCompacted: false,
checkpointNamespace: 'session/checkpoints/ws/conn-1',
},
});
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
await handlers['agent.send']({
id: 6,
method: 'agent.send',
params: { message: 'hello', connectionId: 'conn-1' },
}, send);
expect(sent).toHaveLength(2);
expect((sent[0] as GatewayEvent).event).toBe('context_warning');
expect((sent[1] as GatewayEvent).event).toBe('done');
});
});
describe('createAgentHandlers queue policy resolution', () => {
it('passes resolved per-request queue policy into lane enqueue', async () => {
const mockAgent = {
process: vi.fn(async () => 'ok'),
consumeContextAlert: vi.fn(() => undefined),
getContextBudget: vi.fn(() => ({
estimatedTokens: 0,
contextWindow: 128000,
remainingTokens: 128000,
usagePct: 0,
thresholdPct: 80,
thresholdTokens: 102400,
shouldCompact: false,
})),
getUsage: vi.fn(() => ({
primary: { inputTokens: 0, outputTokens: 0, calls: 0 },
delegation: {},
@@ -234,6 +288,16 @@ describe('createAgentHandlers queue policy resolution', () => {
const sessionBridge = {
getAgent: vi.fn(() => ({
process: vi.fn(async () => 'ok'),
consumeContextAlert: vi.fn(() => undefined),
getContextBudget: vi.fn(() => ({
estimatedTokens: 0,
contextWindow: 128000,
remainingTokens: 128000,
usagePct: 0,
thresholdPct: 80,
thresholdTokens: 102400,
shouldCompact: false,
})),
getUsage: vi.fn(() => ({
primary: { inputTokens: 0, outputTokens: 0, calls: 0 },
delegation: {},
+349 -333
View File
@@ -66,361 +66,377 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
try {
return await deps.laneQueue.enqueue(laneId, async () => {
deps.sessionBridge.setBusy(connectionId, true);
deps.sessionBridge.setBusy(connectionId, true);
const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string'
? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}`
: (safeParams.message ?? '');
const commandInput = safeParams.metadata?.isCommand && typeof safeParams.metadata.command === 'string'
? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}`
: (safeParams.message ?? '');
const parsedCommand = safeParams.metadata?.isCommand
? safeParams.metadata.command
: commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined;
auditLogger?.userAction({
session_id: sessionId ?? `ws:${connectionId}`,
channel: 'ws',
sender: connectionId,
source: 'gateway',
action_type: parsedCommand ? 'command' : 'message',
content_length: commandInput.length,
attachments_count: safeParams.attachments?.length ?? 0,
command: parsedCommand,
});
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) {
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const commandResult = await deps.commandRegistry.execute(commandInput, {
const parsedCommand = safeParams.metadata?.isCommand
? safeParams.metadata.command
: commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined;
auditLogger?.userAction({
session_id: sessionId ?? `ws:${connectionId}`,
channel: 'ws',
senderId: connectionId,
sessionId: sessionId ?? `ws:${connectionId}`,
rawInput: commandInput,
services: {
getStatus: () => `Gateway session active. Current model tier: ${agent.getModelTier()}`,
getUsage: () => {
const usage = agent.getUsage();
const lines = [
'**Token Usage**',
'',
`Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`,
];
sender: connectionId,
source: 'gateway',
action_type: parsedCommand ? 'command' : 'message',
content_length: commandInput.length,
attachments_count: safeParams.attachments?.length ?? 0,
command: parsedCommand,
});
const delegationEntries = Object.entries(usage.delegation);
if (delegationEntries.length > 0) {
lines.push('');
lines.push('Delegation:');
for (const [tier, stats] of delegationEntries) {
lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`);
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) {
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const commandResult = await deps.commandRegistry.execute(commandInput, {
channel: 'ws',
senderId: connectionId,
sessionId: sessionId ?? `ws:${connectionId}`,
rawInput: commandInput,
services: {
getStatus: () => `Gateway session active. Current model tier: ${agent.getModelTier()}`,
getUsage: () => {
const usage = agent.getUsage();
const budget = agent.getContextBudget();
const lines = [
'**Token Usage**',
'',
`Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`,
];
const delegationEntries = Object.entries(usage.delegation);
if (delegationEntries.length > 0) {
lines.push('');
lines.push('Delegation:');
for (const [tier, stats] of delegationEntries) {
lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`);
}
}
}
lines.push('');
lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`);
lines.push('');
lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`);
lines.push('');
lines.push('**Context usage (estimated):** '
+ `${budget.estimatedTokens.toLocaleString()}/${budget.contextWindow.toLocaleString()} `
+ `(${budget.usagePct.toFixed(1)}%)`);
if (usage.total.estimatedCost > 0) {
lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`);
}
if (usage.total.estimatedCost > 0) {
lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`);
}
return lines.join('\n');
},
getModel: () => `Current model tier: ${agent.getModelTier()}`,
setModel: (input) => {
const raw = input.trim();
if (!raw) {
return 'Usage: /model <tier>';
}
const [requestedTier, ...rest] = raw.split(/\s+/);
const validTiers: ModelTier[] = ['fast', 'default', 'complex', 'local'];
const modelTier = requestedTier as ModelTier;
if (!validTiers.includes(modelTier)) {
return `Invalid tier: ${requestedTier}. Available: ${validTiers.join(', ')}`;
}
if (rest.length > 0) {
return `Switched to model tier: ${modelTier}\nNote: provider/model switching is not available via gateway (/model <tier> <provider/model>).`;
}
agent.setModelTier(modelTier);
if (sessionId && deps.sessionManager) {
deps.sessionManager.setSessionConfig('ws', sessionId, 'modelTier', modelTier);
}
return `Switched to model tier: ${modelTier}`;
},
compact: async () => {
const result = await agent.compact();
if (result && result.compactedCount > 0) {
return `Compacted ${result.compactedCount} messages: ${result.tokensBefore}${result.tokensAfter} tokens`;
}
return 'Nothing to compact.';
},
reset: () => {
agent.reset();
if (sessionId && deps.sessionManager) {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'modelTier');
}
return 'Session reset.';
},
return lines.join('\n');
},
getModel: () => `Current model tier: ${agent.getModelTier()}`,
setModel: (input) => {
const raw = input.trim();
if (!raw) {
return 'Usage: /model <tier>';
}
const [requestedTier, ...rest] = raw.split(/\s+/);
const validTiers: ModelTier[] = ['fast', 'default', 'complex', 'local'];
const modelTier = requestedTier as ModelTier;
if (!validTiers.includes(modelTier)) {
return `Invalid tier: ${requestedTier}. Available: ${validTiers.join(', ')}`;
}
if (rest.length > 0) {
return `Switched to model tier: ${modelTier}\nNote: provider/model switching is not available via gateway (/model <tier> <provider/model>).`;
}
agent.setModelTier(modelTier);
if (sessionId && deps.sessionManager) {
deps.sessionManager.setSessionConfig('ws', sessionId, 'modelTier', modelTier);
}
return `Switched to model tier: ${modelTier}`;
},
compact: async () => {
const result = await agent.compact();
if (result && result.compactedCount > 0) {
return `Compacted ${result.compactedCount} messages: ${result.tokensBefore}${result.tokensAfter} tokens`;
}
return 'Nothing to compact.';
},
reset: () => {
agent.reset();
if (sessionId && deps.sessionManager) {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'modelTier');
}
return 'Session reset.';
},
getElevation: () => {
if (!sessionId || !deps.sessionManager) {
return 'Elevated mode: off';
}
const untilRaw = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms');
const reason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? '';
const id = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? '';
if (!untilRaw || !id) {
return 'Elevated mode: off';
}
const untilMs = Number.parseInt(untilRaw, 10);
if (!Number.isFinite(untilMs)) {
return 'Elevated mode: off';
}
const now = Date.now();
if (untilMs <= now) {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id');
auditLogger?.securityElevationExpired({
getElevation: () => {
if (!sessionId || !deps.sessionManager) {
return 'Elevated mode: off';
}
const untilRaw = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms');
const reason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? '';
const id = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? '';
if (!untilRaw || !id) {
return 'Elevated mode: off';
}
const untilMs = Number.parseInt(untilRaw, 10);
if (!Number.isFinite(untilMs)) {
return 'Elevated mode: off';
}
const now = Date.now();
if (untilMs <= now) {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id');
auditLogger?.securityElevationExpired({
session_id: `ws:${sessionId}`,
channel: 'ws',
sender: connectionId,
elevation_id: id,
until_ms: untilMs,
reason: reason || undefined,
});
return 'Elevated mode: off (expired)';
}
const remainingMs = untilMs - now;
const remainingSec = Math.ceil(remainingMs / 1000);
return `Elevated mode: on (${remainingSec}s remaining)${reason ? `${reason}` : ''}`;
},
setElevation: (input: string) => {
if (!sessionId || !deps.sessionManager) {
return 'Elevate command is not available in this session.';
}
const raw = input.trim();
const parts = raw.split(/\s+/);
const hasYes = parts.includes('--yes') || parts.includes('--confirm');
const filtered = parts.filter(p => p !== '--yes' && p !== '--confirm');
if (filtered.length === 0) {
return 'Usage: /elevate <duration> <reason...> --yes | /elevate off --yes';
}
if (filtered[0] === 'off') {
if (!hasYes) {
return 'Refusing to disable elevation without explicit confirmation. Use: /elevate off --yes';
}
const existingId = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? randomUUID();
const existingUntil = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms');
const existingReason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? '';
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id');
auditLogger?.securityElevationDisabled({
session_id: `ws:${sessionId}`,
channel: 'ws',
sender: connectionId,
elevation_id: existingId,
until_ms: existingUntil ? Number.parseInt(existingUntil, 10) : undefined,
reason: existingReason || undefined,
});
return 'Elevated mode: off';
}
if (!hasYes) {
return 'Refusing to enable elevation without explicit confirmation. Use: /elevate <duration> <reason...> --yes';
}
const dur = filtered[0];
const reason = filtered.slice(1).join(' ').trim();
const ttlMs = (() => {
const m = dur.match(/^(\d+)([smhd])$/i);
if (!m) {
return null;
}
const n = Number.parseInt(m[1], 10);
if (!Number.isFinite(n) || n <= 0) {
return null;
}
const unit = m[2].toLowerCase();
if (unit === 's') {return n * 1000;}
if (unit === 'm') {return n * 60_000;}
if (unit === 'h') {return n * 3_600_000;}
if (unit === 'd') {return n * 86_400_000;}
return null;
})();
if (!ttlMs) {
return 'Invalid duration. Use one of: 30s, 10m, 1h, 1d';
}
const untilMs = Date.now() + ttlMs;
const id = randomUUID();
deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.until_ms', String(untilMs));
deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.id', id);
if (reason) {
deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.reason', reason);
} else {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason');
}
auditLogger?.securityElevationEnabled({
session_id: `ws:${sessionId}`,
channel: 'ws',
sender: connectionId,
elevation_id: id,
until_ms: untilMs,
ttl_ms: ttlMs,
reason: reason || undefined,
});
return 'Elevated mode: off (expired)';
}
const remainingMs = untilMs - now;
const remainingSec = Math.ceil(remainingMs / 1000);
return `Elevated mode: on (${remainingSec}s remaining)${reason ? `${reason}` : ''}`;
return `Elevated mode: on until ${new Date(untilMs).toISOString()}`;
},
getQueue: () => {
const mode = resolvedPolicy?.mode ?? 'collect';
const cap = resolvedPolicy?.cap ?? 50;
const overflow = resolvedPolicy?.overflow ?? 'drop_old';
const debounceMs = resolvedPolicy?.debounceMs ?? 0;
const summarizeOverflow = resolvedPolicy?.summarizeOverflow ?? true;
const source = deps.sessionManager && sessionId
? deps.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode') ? 'session override' : 'default/channel'
: 'default/channel';
return [
'**Queue policy**',
`mode: ${mode}`,
`cap: ${cap}`,
`overflow: ${overflow}`,
`debounce_ms: ${debounceMs}`,
`summarize_overflow: ${summarizeOverflow}`,
`source: ${source}`,
].join('\n');
},
setQueue: (input: string) => {
if (!deps.sessionManager || !sessionId) {
return 'Queue command is not available in this session.';
}
const [rawKey, ...rest] = input.trim().split(/\s+/);
const value = rest.join(' ').trim();
if (!rawKey || !value) {
return 'Usage: /queue <mode|cap|overflow|debounce_ms|summarize_overflow> <value>';
}
const key = rawKey.toLowerCase();
if (key === 'mode') {
if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(value)) {
return 'Invalid mode. Use one of: collect, followup, steer, steer_backlog, interrupt';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.mode', value);
return `Set queue.mode=${value} for this session`;
}
if (key === 'cap') {
const cap = Number.parseInt(value, 10);
if (!Number.isFinite(cap) || cap < 1 || cap > 1000) {
return 'Invalid cap. Use an integer between 1 and 1000';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.cap', String(cap));
return `Set queue.cap=${cap} for this session`;
}
if (key === 'overflow') {
if (value !== 'drop_old' && value !== 'drop_new') {
return 'Invalid overflow. Use drop_old or drop_new';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.overflow', value);
return `Set queue.overflow=${value} for this session`;
}
if (key === 'debounce_ms') {
const debounceMs = Number.parseInt(value, 10);
if (!Number.isFinite(debounceMs) || debounceMs < 0 || debounceMs > 60_000) {
return 'Invalid debounce_ms. Use an integer between 0 and 60000';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.debounce_ms', String(debounceMs));
return `Set queue.debounce_ms=${debounceMs} for this session`;
}
if (key === 'summarize_overflow') {
const normalized = value.toLowerCase();
if (normalized !== 'true' && normalized !== 'false') {
return 'Invalid summarize_overflow. Use true or false';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.summarize_overflow', normalized);
return `Set queue.summarize_overflow=${normalized} for this session`;
}
return 'Unknown queue key. Use one of: mode, cap, overflow, debounce_ms, summarize_overflow';
},
resetQueue: () => {
if (!deps.sessionManager || !sessionId) {
return 'Queue command is not available in this session.';
}
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.mode');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.cap');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.overflow');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.debounce_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.summarize_overflow');
return 'Reset session queue overrides.';
},
},
});
setElevation: (input: string) => {
if (!sessionId || !deps.sessionManager) {
return 'Elevate command is not available in this session.';
}
const raw = input.trim();
const parts = raw.split(/\s+/);
const hasYes = parts.includes('--yes') || parts.includes('--confirm');
const filtered = parts.filter(p => p !== '--yes' && p !== '--confirm');
if (filtered.length === 0) {
return 'Usage: /elevate <duration> <reason...> --yes | /elevate off --yes';
}
if (filtered[0] === 'off') {
if (!hasYes) {
return 'Refusing to disable elevation without explicit confirmation. Use: /elevate off --yes';
}
const existingId = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.id') ?? randomUUID();
const existingUntil = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.until_ms');
const existingReason = deps.sessionManager.getSessionConfig('ws', sessionId, 'elevation.reason') ?? '';
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.until_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.id');
auditLogger?.securityElevationDisabled({
session_id: `ws:${sessionId}`,
channel: 'ws',
sender: connectionId,
elevation_id: existingId,
until_ms: existingUntil ? Number.parseInt(existingUntil, 10) : undefined,
reason: existingReason || undefined,
});
return 'Elevated mode: off';
}
if (!hasYes) {
return 'Refusing to enable elevation without explicit confirmation. Use: /elevate <duration> <reason...> --yes';
}
const dur = filtered[0];
const reason = filtered.slice(1).join(' ').trim();
const ttlMs = (() => {
const m = dur.match(/^(\d+)([smhd])$/i);
if (!m) {
return null;
}
const n = Number.parseInt(m[1], 10);
if (!Number.isFinite(n) || n <= 0) {
return null;
}
const unit = m[2].toLowerCase();
if (unit === 's') {return n * 1000;}
if (unit === 'm') {return n * 60_000;}
if (unit === 'h') {return n * 3_600_000;}
if (unit === 'd') {return n * 86_400_000;}
return null;
})();
if (!ttlMs) {
return 'Invalid duration. Use one of: 30s, 10m, 1h, 1d';
}
const untilMs = Date.now() + ttlMs;
const id = randomUUID();
deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.until_ms', String(untilMs));
deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.id', id);
if (reason) {
deps.sessionManager.setSessionConfig('ws', sessionId, 'elevation.reason', reason);
} else {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'elevation.reason');
}
auditLogger?.securityElevationEnabled({
session_id: `ws:${sessionId}`,
channel: 'ws',
sender: connectionId,
elevation_id: id,
until_ms: untilMs,
ttl_ms: ttlMs,
reason: reason || undefined,
});
return `Elevated mode: on until ${new Date(untilMs).toISOString()}`;
},
getQueue: () => {
const mode = resolvedPolicy?.mode ?? 'collect';
const cap = resolvedPolicy?.cap ?? 50;
const overflow = resolvedPolicy?.overflow ?? 'drop_old';
const debounceMs = resolvedPolicy?.debounceMs ?? 0;
const summarizeOverflow = resolvedPolicy?.summarizeOverflow ?? true;
const source = deps.sessionManager && sessionId
? deps.sessionManager.getSessionConfig('ws', sessionId, 'queue.mode') ? 'session override' : 'default/channel'
: 'default/channel';
return [
'**Queue policy**',
`mode: ${mode}`,
`cap: ${cap}`,
`overflow: ${overflow}`,
`debounce_ms: ${debounceMs}`,
`summarize_overflow: ${summarizeOverflow}`,
`source: ${source}`,
].join('\n');
},
setQueue: (input: string) => {
if (!deps.sessionManager || !sessionId) {
return 'Queue command is not available in this session.';
}
const [rawKey, ...rest] = input.trim().split(/\s+/);
const value = rest.join(' ').trim();
if (!rawKey || !value) {
return 'Usage: /queue <mode|cap|overflow|debounce_ms|summarize_overflow> <value>';
}
const key = rawKey.toLowerCase();
if (key === 'mode') {
if (!['collect', 'followup', 'steer', 'steer_backlog', 'interrupt'].includes(value)) {
return 'Invalid mode. Use one of: collect, followup, steer, steer_backlog, interrupt';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.mode', value);
return `Set queue.mode=${value} for this session`;
}
if (key === 'cap') {
const cap = Number.parseInt(value, 10);
if (!Number.isFinite(cap) || cap < 1 || cap > 1000) {
return 'Invalid cap. Use an integer between 1 and 1000';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.cap', String(cap));
return `Set queue.cap=${cap} for this session`;
}
if (key === 'overflow') {
if (value !== 'drop_old' && value !== 'drop_new') {
return 'Invalid overflow. Use drop_old or drop_new';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.overflow', value);
return `Set queue.overflow=${value} for this session`;
}
if (key === 'debounce_ms') {
const debounceMs = Number.parseInt(value, 10);
if (!Number.isFinite(debounceMs) || debounceMs < 0 || debounceMs > 60_000) {
return 'Invalid debounce_ms. Use an integer between 0 and 60000';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.debounce_ms', String(debounceMs));
return `Set queue.debounce_ms=${debounceMs} for this session`;
}
if (key === 'summarize_overflow') {
const normalized = value.toLowerCase();
if (normalized !== 'true' && normalized !== 'false') {
return 'Invalid summarize_overflow. Use true or false';
}
deps.sessionManager.setSessionConfig('ws', sessionId, 'queue.summarize_overflow', normalized);
return `Set queue.summarize_overflow=${normalized} for this session`;
}
return 'Unknown queue key. Use one of: mode, cap, overflow, debounce_ms, summarize_overflow';
},
resetQueue: () => {
if (!deps.sessionManager || !sessionId) {
return 'Queue command is not available in this session.';
}
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.mode');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.cap');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.overflow');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.debounce_ms');
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'queue.summarize_overflow');
return 'Reset session queue overrides.';
},
},
});
if (commandResult.handled) {
send(makeEvent(request.id, 'done', { content: commandResult.text }));
return;
}
}
// Set up tool use callback to emit streaming events
deps.sessionBridge.setOnToolUse(connectionId, (event) => {
if (event.type === 'start') {
send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args }));
} else if (event.type === 'end') {
send(makeEvent(request.id, 'tool_end', {
tool: event.tool,
result: event.result ? {
success: event.result.success,
output: event.result.output,
error: event.result.error,
} : undefined,
}));
// Record tool failures as error events
if (event.result && !event.result.success) {
deps.metrics?.incrementErrors();
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'error',
source: 'tool',
message: `Tool '${event.tool}' failed: ${event.result.error ?? 'unknown error'}`,
context: { sessionId: laneId, tool: event.tool },
});
if (commandResult.handled) {
send(makeEvent(request.id, 'done', { content: commandResult.text }));
return;
}
}
});
try {
// Convert gateway attachments to channel attachments
const attachments: Attachment[] | undefined = safeParams.attachments?.map(a => ({
mimeType: a.mimeType,
data: a.data,
url: a.url,
filename: a.filename,
}));
const response = await agent.process(safeParams.message ?? '', attachments);
deps.metrics?.incrementMessages();
send(makeEvent(request.id, 'done', { content: response }));
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
deps.metrics?.incrementErrors();
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'error',
source: 'agent.send',
message,
context: { sessionId: laneId },
// Set up tool use callback to emit streaming events
deps.sessionBridge.setOnToolUse(connectionId, (event) => {
if (event.type === 'start') {
send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args }));
} else if (event.type === 'end') {
send(makeEvent(request.id, 'tool_end', {
tool: event.tool,
result: event.result ? {
success: event.result.success,
output: event.result.output,
error: event.result.error,
} : undefined,
}));
// Record tool failures as error events
if (event.result && !event.result.success) {
deps.metrics?.incrementErrors();
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'error',
source: 'tool',
message: `Tool '${event.tool}' failed: ${event.result.error ?? 'unknown error'}`,
context: { sessionId: laneId, tool: event.tool },
});
}
}
});
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
} finally {
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
deps.metrics?.endRequest(requestId);
}
}, resolvedPolicy);
try {
// Convert gateway attachments to channel attachments
const attachments: Attachment[] | undefined = safeParams.attachments?.map(a => ({
mimeType: a.mimeType,
data: a.data,
url: a.url,
filename: a.filename,
}));
const response = await agent.process(safeParams.message ?? '', attachments);
deps.metrics?.incrementMessages();
const contextAlert = agent.consumeContextAlert();
if (contextAlert) {
send(makeEvent(request.id, 'context_warning', contextAlert));
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'warn',
source: 'context',
message: contextAlert.message,
context: { sessionId: laneId, level: contextAlert.level },
});
}
send(makeEvent(request.id, 'done', { content: response }));
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
deps.metrics?.incrementErrors();
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'error',
source: 'agent.send',
message,
context: { sessionId: laneId },
});
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
} finally {
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
deps.metrics?.endRequest(requestId);
}
}, resolvedPolicy);
} catch (err) {
if (err instanceof LaneQueueRejectedError) {
send(makeEvent(request.id, 'error', {
+64
View File
@@ -327,6 +327,60 @@ describe('system.tokenUsage handler', () => {
});
});
describe('system.contextUsage handler', () => {
it('returns empty sessions when no getContextUsage provided', async () => {
const handlers = createSystemHandlers({
startTime: Date.now(),
version: '0.1.0',
getSessionCount: () => 0,
getToolCount: () => 0,
getConnectionCount: () => 0,
});
const req: GatewayRequest = { id: 21, method: 'system.contextUsage' };
const result = await handlers['system.contextUsage'](req) as GatewayResponse;
expect(result.id).toBe(21);
const r = result.result as { sessions: unknown[] };
expect(r.sessions).toEqual([]);
});
it('returns session context budget data from getContextUsage callback', async () => {
const mockUsage = [
{
sessionId: 'telegram:user1',
budget: {
estimatedTokens: 120000,
contextWindow: 200000,
remainingTokens: 80000,
usagePct: 60,
thresholdPct: 80,
thresholdTokens: 160000,
shouldCompact: false,
},
},
];
const handlers = createSystemHandlers({
startTime: Date.now(),
version: '0.1.0',
getSessionCount: () => 1,
getToolCount: () => 0,
getConnectionCount: () => 1,
getContextUsage: () => mockUsage,
});
const req: GatewayRequest = { id: 22, method: 'system.contextUsage' };
const result = await handlers['system.contextUsage'](req) as GatewayResponse;
const r = result.result as { sessions: typeof mockUsage };
expect(r.sessions).toHaveLength(1);
expect(r.sessions[0].sessionId).toBe('telegram:user1');
expect(r.sessions[0].budget.usagePct).toBe(60);
expect(r.sessions[0].budget.shouldCompact).toBe(false);
});
});
describe('system.sessionAnalytics handler', () => {
it('returns empty analytics when callback is not provided', async () => {
const handlers = createSystemHandlers({
@@ -614,6 +668,16 @@ describe('tool handlers', () => {
describe('agent handlers', () => {
const mockAgent = {
process: vi.fn(async () => 'response text'),
consumeContextAlert: vi.fn(() => undefined),
getContextBudget: vi.fn(() => ({
estimatedTokens: 0,
contextWindow: 128000,
remainingTokens: 128000,
usagePct: 0,
thresholdPct: 80,
thresholdTokens: 102400,
shouldCompact: false,
})),
setOnToolUse: vi.fn(),
};
+21
View File
@@ -13,6 +13,20 @@ export interface TokenUsageEntry {
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}
/** Per-session context budget report returned by system.contextUsage. */
export interface ContextUsageEntry {
sessionId: string;
budget: {
estimatedTokens: number;
contextWindow: number;
remainingTokens: number;
usagePct: number;
thresholdPct: number;
thresholdTokens: number;
shouldCompact: boolean;
};
}
export interface PresenceEntry {
channel: string;
senderId: string;
@@ -63,6 +77,8 @@ export interface SystemHandlerDeps {
getUsage?: () => { totalSessions: number; activeConnections: number };
/** Optional callback to retrieve per-session token usage data. */
getTokenUsage?: () => TokenUsageEntry[];
/** Optional callback to retrieve per-session context budget data. */
getContextUsage?: () => ContextUsageEntry[];
/** Optional callback to retrieve aggregated metrics snapshot. */
getMetrics?: () => MetricsSnapshot;
/** Optional callback to retrieve session analytics. */
@@ -202,6 +218,11 @@ export function createSystemHandlers(deps: SystemHandlerDeps) {
return makeResponse(request.id, { sessions });
},
'system.contextUsage': async (request: GatewayRequest): Promise<OutboundMessage> => {
const sessions = deps.getContextUsage?.() ?? [];
return makeResponse(request.id, { sessions });
},
'system.metrics': async (request: GatewayRequest): Promise<OutboundMessage> => {
if (!deps.getMetrics) {
return makeResponse(request.id, {});
+25
View File
@@ -308,5 +308,30 @@ describe('protocol', () => {
data,
});
});
it('creates a context warning event message', () => {
const data = {
level: 'warning',
message: 'Context usage is 76.0%',
budget: {
estimatedTokens: 76000,
contextWindow: 100000,
remainingTokens: 24000,
usagePct: 76,
thresholdPct: 80,
thresholdTokens: 80000,
shouldCompact: false,
},
actions: {
checkpointSaved: false,
autoCompacted: false,
},
};
expect(makeEvent(3, 'context_warning', data)).toEqual({
id: 3,
event: 'context_warning',
data,
});
});
});
});
+20
View File
@@ -93,6 +93,7 @@ export type EventType =
| 'content'
| 'tool_start'
| 'tool_end'
| 'context_warning'
| 'attachment'
| 'done'
| 'error';
@@ -115,6 +116,25 @@ export interface ToolEndEventData {
};
}
export interface ContextWarningEventData {
level: 'warning' | 'checkpoint' | 'critical';
message: string;
budget: {
estimatedTokens: number;
contextWindow: number;
remainingTokens: number;
usagePct: number;
thresholdPct: number;
thresholdTokens: number;
shouldCompact: boolean;
};
actions: {
checkpointSaved: boolean;
autoCompacted: boolean;
checkpointNamespace?: string;
};
}
export interface AttachmentEventData {
mimeType: string;
data?: string;
+4 -1
View File
@@ -33,7 +33,7 @@ import {
createNodeHandlers,
} from './handlers/index.js';
import { discoverServices } from './handlers/services.js';
import type { TokenUsageEntry } from './handlers/system.js';
import type { TokenUsageEntry, ContextUsageEntry } from './handlers/system.js';
import type { NodeConnectionState } from './handlers/node.js';
import type { SessionManager } from '../session/manager.js';
import type { Config } from '../config/index.js';
@@ -82,6 +82,8 @@ export interface GatewayServerConfig {
gmailHandler?: GmailWatcher;
/** Optional callback to retrieve per-session token usage data for the dashboard. */
getTokenUsage?: () => TokenUsageEntry[];
/** Optional callback to retrieve per-session context usage data for the dashboard. */
getContextUsage?: () => ContextUsageEntry[];
/** Maximum allowed request body size for inbound HTTP POST bodies. */
maxRequestBodyBytes?: number;
/** Per-connection WebSocket ingress rate limiting. */
@@ -294,6 +296,7 @@ export class GatewayServer {
activeConnections: this.sessionBridge.connectionCount,
}),
getTokenUsage: this.config.getTokenUsage,
getContextUsage: this.config.getContextUsage,
getMetrics: () => this.metrics.getSnapshot(),
getEvents: (opts) => this.metrics.getEvents(opts),
getActiveRequests: () => this.metrics.getActiveRequests(),
+49
View File
@@ -200,6 +200,47 @@ export class SessionBridge {
return results;
}
/** Get estimated context budget for all active sessions. */
getAllContextUsage(): Array<{
sessionId: string;
budget: {
estimatedTokens: number;
contextWindow: number;
remainingTokens: number;
usagePct: number;
thresholdPct: number;
thresholdTokens: number;
shouldCompact: boolean;
};
}> {
const results: Array<{
sessionId: string;
budget: {
estimatedTokens: number;
contextWindow: number;
remainingTokens: number;
usagePct: number;
thresholdPct: number;
thresholdTokens: number;
shouldCompact: boolean;
};
}> = [];
const seen = new Set<string>();
for (const client of this.clients.values()) {
if (seen.has(client.sessionId)) {
continue;
}
seen.add(client.sessionId);
results.push({
sessionId: client.sessionId,
budget: client.agent.getContextBudget(),
});
}
return results;
}
private getOrCreateAgent(sessionId: string): AgentOrchestrator {
let agent = this.agents.get(sessionId);
if (!agent) {
@@ -233,6 +274,14 @@ export class SessionBridge {
keepTurns: config.compaction.keep_turns,
summaryMaxTokens: config.compaction.summary_max_tokens,
importanceThreshold: config.compaction.importance_threshold,
proactive: {
enabled: config.compaction.proactive.enabled,
warnPct: config.compaction.proactive.warn_pct,
checkpointPct: config.compaction.proactive.checkpoint_pct,
autoCompactPct: config.compaction.proactive.auto_compact_pct,
checkpointCooldownMs: config.compaction.proactive.checkpoint_cooldown_ms,
memoryNamespace: config.compaction.proactive.memory_namespace,
},
} : undefined,
modelName: config?.models.default.model,
contextWindow: config?.models.default.context_window,
+9
View File
@@ -593,6 +593,15 @@ async function sendMessage(client, overrideText) {
scrollToBottom();
});
stream.on('context_warning', (data) => {
const note = document.createElement('div');
note.className = 'message assistant';
const text = data?.message || 'Context usage is getting high.';
note.innerHTML = renderSafeMarkdown(`> ${text}`);
_elements.messages.insertBefore(note, placeholder);
scrollToBottom();
});
const done = await stream.result;
// Replace placeholder with actual response
placeholder.classList.remove('streaming-cursor');
+9
View File
@@ -26,15 +26,18 @@ function truncateId(id) {
async function loadUsage(el, client) {
let data;
let contextData;
try {
data = await client.call('system.tokenUsage');
contextData = await client.call('system.contextUsage');
} catch (err) {
el.innerHTML = `<div class="empty-state">Failed to load usage: ${err.message}</div>`;
return;
}
const sessions = data?.sessions ?? [];
const contextBySession = new Map((contextData?.sessions ?? []).map(s => [s.sessionId, s.budget]));
// Compute totals across all sessions
let totalInput = 0;
@@ -89,6 +92,10 @@ async function loadUsage(el, client) {
const outTok = s.total?.outputTokens ?? 0;
const calls = s.total?.calls ?? 0;
const cost = s.total?.estimatedCost ?? 0;
const budget = contextBySession.get(s.sessionId);
const contextCell = budget
? `${budget.usagePct.toFixed(1)}% (${formatNumber(budget.estimatedTokens)}/${formatNumber(budget.contextWindow)})`
: '<span class="text-muted">-</span>';
// Build delegation breakdown if present
const delegationEntries = Object.entries(s.delegation ?? {});
@@ -107,6 +114,7 @@ async function loadUsage(el, client) {
<td>${formatNumber(inTok + outTok)}</td>
<td>${formatNumber(calls)}</td>
<td>${formatCost(cost)}</td>
<td>${contextCell}</td>
<td>${delegationCell}</td>
</tr>
`;
@@ -122,6 +130,7 @@ async function loadUsage(el, client) {
<th>Total</th>
<th>Calls</th>
<th>Cost</th>
<th>Context</th>
<th>Delegation</th>
</tr>
</thead>