feat(observability): emit run and reaction baseline audit events

Diagrams reviewed: docs/architecture/AGENT_DIAGRAM.md, docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md, docs/api/PROTOCOL.md (no changes required).
This commit is contained in:
William Valentin
2026-02-25 08:54:19 -08:00
parent 23b813a92f
commit 6bcdb13bf4
6 changed files with 538 additions and 19 deletions
+187 -1
View File
@@ -1,4 +1,4 @@
import { describe, it, expect, vi, afterEach } from 'vitest';
import { describe, it, expect, vi, afterEach, beforeEach } from 'vitest';
import { AgentRouter } from '../agents/router.js';
import { AgentConfigRegistry } from '../agents/registry.js';
import { HookEngine } from '../hooks/index.js';
@@ -355,6 +355,165 @@ describe('daemon command fast-path integration', () => {
);
});
it('emits run.cancel telemetry for /stop command fast-path', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process');
const mockAuditLogger = {
userAction: vi.fn(),
runCancel: vi.fn(),
runState: vi.fn(),
};
initAuditLogger(mockAuditLogger as any);
const session = {
id: 'telegram:user-stop',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const commandRegistry = new CommandRegistry();
registerBuiltinCommands(commandRegistry);
const router = createMessageRouter({
sessionManager: {
getSession: vi.fn(() => session),
} as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: {
clone() { return this; },
register: vi.fn(),
} as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'fast',
tool_summarisation: 'fast',
complex_reasoning: 'complex',
},
max_delegation_depth: 3,
max_iterations: 10,
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
} as unknown as MessageRouterDeps['config'],
commandRegistry,
});
const reply = vi.fn(async (_message: OutboundMessage) => {});
await router.handler({
id: 'm-stop',
channel: 'telegram',
senderId: 'user-stop',
text: '/stop',
timestamp: Date.now(),
metadata: { isCommand: true, command: 'stop' },
} as MessageRouterInput, reply);
expect(processSpy).not.toHaveBeenCalled();
expect(mockAuditLogger.runCancel).toHaveBeenCalledWith(
expect.objectContaining({
session_id: 'telegram:user-stop',
source: 'channel',
requested: true,
acknowledged: false,
}),
);
expect(reply).toHaveBeenCalledWith({
text: 'No active operation to cancel.',
replyTo: 'm-stop',
});
});
it('emits run.state start and complete for non-command channel messages', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok');
const mockAuditLogger = {
userAction: vi.fn(),
runState: vi.fn(),
};
initAuditLogger(mockAuditLogger as any);
const session = {
id: 'telegram:user-runstate',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const router = createMessageRouter({
sessionManager: {
getSession: vi.fn(() => session),
} as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: {
clone() { return this; },
register: vi.fn(),
} as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'fast',
tool_summarisation: 'fast',
complex_reasoning: 'complex',
},
max_delegation_depth: 3,
max_iterations: 10,
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
} as unknown as MessageRouterDeps['config'],
});
await router.handler({
id: 'm-runstate',
channel: 'telegram',
senderId: 'user-runstate',
text: 'hello',
timestamp: Date.now(),
} as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {}));
expect(processSpy).toHaveBeenCalledTimes(1);
expect(mockAuditLogger.runState).toHaveBeenCalledWith(
expect.objectContaining({
session_id: 'telegram:user-runstate',
source: 'channel',
state: 'start',
}),
);
expect(mockAuditLogger.runState).toHaveBeenCalledWith(
expect.objectContaining({
session_id: 'telegram:user-runstate',
source: 'channel',
state: 'complete',
}),
);
});
it('handles model command via fast-path and persists tier override', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process');
const setModelTierSpy = vi.spyOn(AgentOrchestrator.prototype, 'setModelTier');
@@ -2081,6 +2240,17 @@ describe('daemon tts routing integration', () => {
});
describe('daemon reactions routing integration', () => {
const mockAuditLogger = {
userAction: vi.fn(),
reactionMatch: vi.fn(),
reactionSkip: vi.fn(),
};
beforeEach(() => {
vi.clearAllMocks();
initAuditLogger(mockAuditLogger as any);
});
afterEach(() => {
vi.restoreAllMocks();
});
@@ -2149,6 +2319,14 @@ describe('daemon reactions routing integration', () => {
expect(prompt).toBe(
'Summarize and suggest next steps:\n\nNew email from boss@company.com: Please share timeline',
);
expect(mockAuditLogger.reactionMatch).toHaveBeenCalledWith(
expect.objectContaining({
session_id: 'gmail:reaction-user-1',
source: 'channel',
rule_name: 'boss-email',
candidate_count: 1,
}),
);
});
it('keeps original prompt when no reaction rule matches', async () => {
@@ -2213,6 +2391,14 @@ describe('daemon reactions routing integration', () => {
expect(processSpy).toHaveBeenCalledTimes(1);
const [prompt] = processSpy.mock.calls[0] ?? [];
expect(prompt).toBe('New email from teammate@company.com: FYI');
expect(mockAuditLogger.reactionSkip).toHaveBeenCalledWith(
expect.objectContaining({
session_id: 'gmail:reaction-user-2',
source: 'channel',
reason: 'no_match',
candidate_count: 1,
}),
);
});
});
+152 -15
View File
@@ -148,6 +148,35 @@ function parseResearchPrefix(text: string): string | undefined {
return undefined;
}
function buildReactionFilterSummary(
rule: {
on?: string[];
filter?: {
contains?: string;
regex?: string;
metadata?: Record<string, string>;
};
} | undefined,
): string | undefined {
if (!rule) {
return undefined;
}
const parts: string[] = [];
if (rule.on && rule.on.length > 0) {
parts.push(`on:${rule.on.join('|')}`);
}
if (rule.filter?.contains) {
parts.push(`contains:${rule.filter.contains}`);
}
if (rule.filter?.regex) {
parts.push(`regex:${rule.filter.regex}`);
}
if (rule.filter?.metadata && Object.keys(rule.filter.metadata).length > 0) {
parts.push(`metadata:${Object.keys(rule.filter.metadata).join('|')}`);
}
return parts.length > 0 ? parts.join(', ') : undefined;
}
function shouldForceNativeForCapabilityQuery(text: string): boolean {
const normalized = text.trim().toLowerCase();
if (!normalized) {
@@ -740,16 +769,46 @@ export function createMessageRouter(deps: {
}
const automationReactions = deps.config.automation?.reactions ?? [];
if (!msg.metadata?.isCommand && automationReactions.length > 0) {
const reactionMatch = matchReactionPrompt(automationReactions, {
channel: msg.channel,
senderId: msg.senderId,
text: incomingText,
metadata: msg.metadata,
});
if (reactionMatch) {
matchedReactionName = reactionMatch.name;
incomingText = reactionMatch.prompt;
if (!msg.metadata?.isCommand) {
if (automationReactions.length === 0) {
auditLogger?.reactionSkip?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
reason: 'no_rules',
candidate_count: 0,
});
} else {
const reactionMatch = matchReactionPrompt(automationReactions, {
channel: msg.channel,
senderId: msg.senderId,
text: incomingText,
metadata: msg.metadata,
});
if (reactionMatch) {
matchedReactionName = reactionMatch.name;
incomingText = reactionMatch.prompt;
const matchedRule = automationReactions.find((rule) => rule.name === reactionMatch.name);
auditLogger?.reactionMatch?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
rule_name: reactionMatch.name,
candidate_count: automationReactions.length,
filter_summary: buildReactionFilterSummary(matchedRule),
});
} else {
auditLogger?.reactionSkip?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
reason: 'no_match',
candidate_count: automationReactions.length,
});
}
}
}
@@ -1002,11 +1061,42 @@ export function createMessageRouter(deps: {
return '';
},
cancelRun: () => {
const cancelStartedAt = Date.now();
const run = activeRuns.get(session.id);
if (!run || !run.isCancellable()) {
auditLogger?.runCancel?.({
session_id: session.id,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
requested: true,
acknowledged: false,
request_id: msg.id,
latency_ms: Date.now() - cancelStartedAt,
});
return 'No active operation to cancel.';
}
run.cancel();
const cancelLatencyMs = Date.now() - cancelStartedAt;
auditLogger?.runCancel?.({
session_id: session.id,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
requested: true,
acknowledged: true,
request_id: msg.id,
latency_ms: cancelLatencyMs,
});
auditLogger?.runState?.({
session_id: session.id,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
state: 'cancel_requested',
request_id: msg.id,
duration_ms: cancelLatencyMs,
});
return 'Cancellation requested. The active operation will stop at the next safe point.';
},
@@ -1404,6 +1494,16 @@ export function createMessageRouter(deps: {
}
try {
const runStartedAt = Date.now();
auditLogger?.runState?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
state: 'start',
request_id: msg.id,
});
// Determine if the active model supports native audio input
let effectiveTier: string = deps.config.agents.primary_tier ?? 'default';
const session = deps.sessionManager.getSession(msg.channel, msg.senderId);
@@ -1465,9 +1565,18 @@ export function createMessageRouter(deps: {
'Workarounds:',
'1. Paste the transcription text.',
'2. Upload the audio file somewhere and send me a direct URL.',
].join('\n'),
].join('\n'),
replyTo: msg.id,
});
auditLogger?.runState?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
state: 'complete',
request_id: msg.id,
duration_ms: Date.now() - runStartedAt,
});
return;
}
@@ -1508,13 +1617,12 @@ export function createMessageRouter(deps: {
forcedNativeGuardReason = 'attachments_present';
}
}
const sessionIdForAudit = `${msg.channel}:${msg.senderId}`;
const selectedBackendForAudit: 'native' | ExternalBackendName = selectedBackend && requestedBackend && !forcedNativeGuardReason
? requestedBackend
: 'native';
auditLogger?.backendRoute?.({
session_id: sessionIdForAudit,
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
selected_backend: selectedBackendForAudit,
@@ -1542,7 +1650,7 @@ export function createMessageRouter(deps: {
...(externalSystemPrompt ? { systemPrompt: externalSystemPrompt } : {}),
});
auditLogger?.backendSuccess?.({
session_id: sessionIdForAudit,
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
backend: selectedBackend.name,
@@ -1556,12 +1664,21 @@ export function createMessageRouter(deps: {
replyTo: msg.id,
attachments: ttsAttachment ? [ttsAttachment] : undefined,
});
auditLogger?.runState?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
state: 'complete',
request_id: msg.id,
duration_ms: Date.now() - runStartedAt,
});
return;
} catch (error) {
const detail = error instanceof Error ? error.message : String(error);
console.warn(`External backend "${selectedBackend.name}" failed, falling back to native: ${detail}`);
auditLogger?.backendFallback?.({
session_id: sessionIdForAudit,
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
from_backend: (requestedBackend && requestedBackend !== 'native')
@@ -1599,12 +1716,32 @@ export function createMessageRouter(deps: {
replyTo: msg.id,
attachments: mergedAttachments.length > 0 ? mergedAttachments : undefined,
});
auditLogger?.runState?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
state: response.trim().toLowerCase() === 'operation cancelled by user.'
? 'cancelled'
: 'complete',
request_id: msg.id,
duration_ms: Date.now() - runStartedAt,
});
} catch (error) {
console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error);
await reply({
text: 'Sorry, an error occurred while processing your message.',
replyTo: msg.id,
});
auditLogger?.runState?.({
session_id: sessionIdForRun,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
state: 'error',
request_id: msg.id,
error: error instanceof Error ? error.message : String(error),
});
} finally {
activeRuns.delete(sessionIdForRun);
}