diff --git a/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md b/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md index fb5e5c0..d6e2cf9 100644 --- a/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md +++ b/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md @@ -58,6 +58,8 @@ Add additive audit event types and payload contracts: ## Ticket 0.2 — Gateway/Router Emitters for Baseline Events +Status: completed (2026-02-25) + ### Scope Emit new audit events without changing request handling behavior: diff --git a/docs/plans/state.json b/docs/plans/state.json index 29843ce..b750e00 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -17,6 +17,21 @@ ], "test_status": "pnpm test:run src/audit/logger.test.ts + pnpm typecheck passing" }, + "phase0-ticket-0.2-gateway-router-emitters": { + "status": "completed", + "date": "2026-02-25", + "updated": "2026-02-25", + "summary": "Implemented Phase 0 Ticket 0.2 by emitting run lifecycle/cancel and reaction match/skip audit events from gateway and daemon routing paths, including filter summaries and cancellation latency, without changing request handling behavior.", + "files_modified": [ + "src/daemon/routing.ts", + "src/daemon/routing.test.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/daemon/routing.test.ts + pnpm test:run src/gateway/handlers/agent.test.ts passing" + }, "phase0-instrumentation-ticket-checklist": { "status": "completed", "date": "2026-02-25", @@ -6628,7 +6643,8 @@ "deeper_surfaces_behavior_stack_plan": "completed — documented a decision-complete balanced-hybrid roadmap for OpenClaw-like end-user surface depth plus integrated behavior semantics with phased scope, acceptance gates, and rollout constraints", "deeper_surfaces_phase0_ticket_pack": "completed — produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)", "deeper_surfaces_phase0_ticket_01": "completed — audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage", - "next_up": "Implement Ticket 0.2 from docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md", + "deeper_surfaces_phase0_ticket_02": "completed — gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests", + "next_up": "Implement Ticket 0.3 from docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md", "pi_embedded_canary_spike": "completed — added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default", "pi_embedded_evaluation_phase": "completed — final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)", "pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index 9826b56..1bb1384 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect, vi, afterEach } from 'vitest'; +import { describe, it, expect, vi, afterEach, beforeEach } from 'vitest'; import { AgentRouter } from '../agents/router.js'; import { AgentConfigRegistry } from '../agents/registry.js'; import { HookEngine } from '../hooks/index.js'; @@ -355,6 +355,165 @@ describe('daemon command fast-path integration', () => { ); }); + it('emits run.cancel telemetry for /stop command fast-path', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process'); + const mockAuditLogger = { + userAction: vi.fn(), + runCancel: vi.fn(), + runState: vi.fn(), + }; + initAuditLogger(mockAuditLogger as any); + + const session = { + id: 'telegram:user-stop', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const commandRegistry = new CommandRegistry(); + registerBuiltinCommands(commandRegistry); + + const router = createMessageRouter({ + sessionManager: { + getSession: vi.fn(() => session), + } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + commandRegistry, + }); + + const reply = vi.fn(async (_message: OutboundMessage) => {}); + await router.handler({ + id: 'm-stop', + channel: 'telegram', + senderId: 'user-stop', + text: '/stop', + timestamp: Date.now(), + metadata: { isCommand: true, command: 'stop' }, + } as MessageRouterInput, reply); + + expect(processSpy).not.toHaveBeenCalled(); + expect(mockAuditLogger.runCancel).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'telegram:user-stop', + source: 'channel', + requested: true, + acknowledged: false, + }), + ); + expect(reply).toHaveBeenCalledWith({ + text: 'No active operation to cancel.', + replyTo: 'm-stop', + }); + }); + + it('emits run.state start and complete for non-command channel messages', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process').mockResolvedValue('ok'); + const mockAuditLogger = { + userAction: vi.fn(), + runState: vi.fn(), + }; + initAuditLogger(mockAuditLogger as any); + + const session = { + id: 'telegram:user-runstate', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const router = createMessageRouter({ + sessionManager: { + getSession: vi.fn(() => session), + } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + }); + + await router.handler({ + id: 'm-runstate', + channel: 'telegram', + senderId: 'user-runstate', + text: 'hello', + timestamp: Date.now(), + } as MessageRouterInput, vi.fn(async (_message: OutboundMessage) => {})); + + expect(processSpy).toHaveBeenCalledTimes(1); + expect(mockAuditLogger.runState).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'telegram:user-runstate', + source: 'channel', + state: 'start', + }), + ); + expect(mockAuditLogger.runState).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'telegram:user-runstate', + source: 'channel', + state: 'complete', + }), + ); + }); + it('handles model command via fast-path and persists tier override', async () => { const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process'); const setModelTierSpy = vi.spyOn(AgentOrchestrator.prototype, 'setModelTier'); @@ -2081,6 +2240,17 @@ describe('daemon tts routing integration', () => { }); describe('daemon reactions routing integration', () => { + const mockAuditLogger = { + userAction: vi.fn(), + reactionMatch: vi.fn(), + reactionSkip: vi.fn(), + }; + + beforeEach(() => { + vi.clearAllMocks(); + initAuditLogger(mockAuditLogger as any); + }); + afterEach(() => { vi.restoreAllMocks(); }); @@ -2149,6 +2319,14 @@ describe('daemon reactions routing integration', () => { expect(prompt).toBe( 'Summarize and suggest next steps:\n\nNew email from boss@company.com: Please share timeline', ); + expect(mockAuditLogger.reactionMatch).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'gmail:reaction-user-1', + source: 'channel', + rule_name: 'boss-email', + candidate_count: 1, + }), + ); }); it('keeps original prompt when no reaction rule matches', async () => { @@ -2213,6 +2391,14 @@ describe('daemon reactions routing integration', () => { expect(processSpy).toHaveBeenCalledTimes(1); const [prompt] = processSpy.mock.calls[0] ?? []; expect(prompt).toBe('New email from teammate@company.com: FYI'); + expect(mockAuditLogger.reactionSkip).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'gmail:reaction-user-2', + source: 'channel', + reason: 'no_match', + candidate_count: 1, + }), + ); }); }); diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 7118aef..159a994 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -148,6 +148,35 @@ function parseResearchPrefix(text: string): string | undefined { return undefined; } +function buildReactionFilterSummary( + rule: { + on?: string[]; + filter?: { + contains?: string; + regex?: string; + metadata?: Record; + }; + } | undefined, +): string | undefined { + if (!rule) { + return undefined; + } + const parts: string[] = []; + if (rule.on && rule.on.length > 0) { + parts.push(`on:${rule.on.join('|')}`); + } + if (rule.filter?.contains) { + parts.push(`contains:${rule.filter.contains}`); + } + if (rule.filter?.regex) { + parts.push(`regex:${rule.filter.regex}`); + } + if (rule.filter?.metadata && Object.keys(rule.filter.metadata).length > 0) { + parts.push(`metadata:${Object.keys(rule.filter.metadata).join('|')}`); + } + return parts.length > 0 ? parts.join(', ') : undefined; +} + function shouldForceNativeForCapabilityQuery(text: string): boolean { const normalized = text.trim().toLowerCase(); if (!normalized) { @@ -740,16 +769,46 @@ export function createMessageRouter(deps: { } const automationReactions = deps.config.automation?.reactions ?? []; - if (!msg.metadata?.isCommand && automationReactions.length > 0) { - const reactionMatch = matchReactionPrompt(automationReactions, { - channel: msg.channel, - senderId: msg.senderId, - text: incomingText, - metadata: msg.metadata, - }); - if (reactionMatch) { - matchedReactionName = reactionMatch.name; - incomingText = reactionMatch.prompt; + if (!msg.metadata?.isCommand) { + if (automationReactions.length === 0) { + auditLogger?.reactionSkip?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + reason: 'no_rules', + candidate_count: 0, + }); + } else { + const reactionMatch = matchReactionPrompt(automationReactions, { + channel: msg.channel, + senderId: msg.senderId, + text: incomingText, + metadata: msg.metadata, + }); + if (reactionMatch) { + matchedReactionName = reactionMatch.name; + incomingText = reactionMatch.prompt; + const matchedRule = automationReactions.find((rule) => rule.name === reactionMatch.name); + auditLogger?.reactionMatch?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + rule_name: reactionMatch.name, + candidate_count: automationReactions.length, + filter_summary: buildReactionFilterSummary(matchedRule), + }); + } else { + auditLogger?.reactionSkip?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + reason: 'no_match', + candidate_count: automationReactions.length, + }); + } } } @@ -1002,11 +1061,42 @@ export function createMessageRouter(deps: { return ''; }, cancelRun: () => { + const cancelStartedAt = Date.now(); const run = activeRuns.get(session.id); if (!run || !run.isCancellable()) { + auditLogger?.runCancel?.({ + session_id: session.id, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + requested: true, + acknowledged: false, + request_id: msg.id, + latency_ms: Date.now() - cancelStartedAt, + }); return 'No active operation to cancel.'; } run.cancel(); + const cancelLatencyMs = Date.now() - cancelStartedAt; + auditLogger?.runCancel?.({ + session_id: session.id, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + requested: true, + acknowledged: true, + request_id: msg.id, + latency_ms: cancelLatencyMs, + }); + auditLogger?.runState?.({ + session_id: session.id, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + state: 'cancel_requested', + request_id: msg.id, + duration_ms: cancelLatencyMs, + }); return 'Cancellation requested. The active operation will stop at the next safe point.'; }, @@ -1404,6 +1494,16 @@ export function createMessageRouter(deps: { } try { + const runStartedAt = Date.now(); + auditLogger?.runState?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + state: 'start', + request_id: msg.id, + }); + // Determine if the active model supports native audio input let effectiveTier: string = deps.config.agents.primary_tier ?? 'default'; const session = deps.sessionManager.getSession(msg.channel, msg.senderId); @@ -1465,9 +1565,18 @@ export function createMessageRouter(deps: { 'Workarounds:', '1. Paste the transcription text.', '2. Upload the audio file somewhere and send me a direct URL.', - ].join('\n'), + ].join('\n'), replyTo: msg.id, }); + auditLogger?.runState?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + state: 'complete', + request_id: msg.id, + duration_ms: Date.now() - runStartedAt, + }); return; } @@ -1508,13 +1617,12 @@ export function createMessageRouter(deps: { forcedNativeGuardReason = 'attachments_present'; } } - const sessionIdForAudit = `${msg.channel}:${msg.senderId}`; const selectedBackendForAudit: 'native' | ExternalBackendName = selectedBackend && requestedBackend && !forcedNativeGuardReason ? requestedBackend : 'native'; auditLogger?.backendRoute?.({ - session_id: sessionIdForAudit, + session_id: sessionIdForRun, channel: msg.channel, sender: msg.senderId, selected_backend: selectedBackendForAudit, @@ -1542,7 +1650,7 @@ export function createMessageRouter(deps: { ...(externalSystemPrompt ? { systemPrompt: externalSystemPrompt } : {}), }); auditLogger?.backendSuccess?.({ - session_id: sessionIdForAudit, + session_id: sessionIdForRun, channel: msg.channel, sender: msg.senderId, backend: selectedBackend.name, @@ -1556,12 +1664,21 @@ export function createMessageRouter(deps: { replyTo: msg.id, attachments: ttsAttachment ? [ttsAttachment] : undefined, }); + auditLogger?.runState?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + state: 'complete', + request_id: msg.id, + duration_ms: Date.now() - runStartedAt, + }); return; } catch (error) { const detail = error instanceof Error ? error.message : String(error); console.warn(`External backend "${selectedBackend.name}" failed, falling back to native: ${detail}`); auditLogger?.backendFallback?.({ - session_id: sessionIdForAudit, + session_id: sessionIdForRun, channel: msg.channel, sender: msg.senderId, from_backend: (requestedBackend && requestedBackend !== 'native') @@ -1599,12 +1716,32 @@ export function createMessageRouter(deps: { replyTo: msg.id, attachments: mergedAttachments.length > 0 ? mergedAttachments : undefined, }); + auditLogger?.runState?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + state: response.trim().toLowerCase() === 'operation cancelled by user.' + ? 'cancelled' + : 'complete', + request_id: msg.id, + duration_ms: Date.now() - runStartedAt, + }); } catch (error) { console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); await reply({ text: 'Sorry, an error occurred while processing your message.', replyTo: msg.id, }); + auditLogger?.runState?.({ + session_id: sessionIdForRun, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + state: 'error', + request_id: msg.id, + error: error instanceof Error ? error.message : String(error), + }); } finally { activeRuns.delete(sessionIdForRun); } diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index 699da78..56b8578 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -50,6 +50,8 @@ describe('createAgentHandlers command fast-path', () => { const mockAuditLogger = { userAction: vi.fn(), queuePreempt: vi.fn(), + runState: vi.fn(), + runCancel: vi.fn(), }; const handlers = createAgentHandlers({ @@ -237,6 +239,21 @@ describe('createAgentHandlers command fast-path', () => { await handlers['agent.send'](req, send); expect(sessionBridge.cancel).toHaveBeenCalledWith('conn-1'); + expect(mockAuditLogger.runCancel).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'ws:conn-1', + source: 'gateway', + requested: true, + acknowledged: true, + }), + ); + expect(mockAuditLogger.runState).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'ws:conn-1', + source: 'gateway', + state: 'cancel_requested', + }), + ); expect(mockAgent.process).not.toHaveBeenCalled(); expect(((sent[0] as GatewayEvent).data as { content: string }).content).toContain('Cancellation requested'); }); @@ -368,6 +385,62 @@ describe('createAgentHandlers command fast-path', () => { sender: 'conn-1', }), ); + expect(mockAuditLogger.runState).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'ws:conn-1', + source: 'gateway', + state: 'start', + }), + ); + expect(mockAuditLogger.runState).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'ws:conn-1', + source: 'gateway', + state: 'complete', + }), + ); + }); + + it('emits cancelled run state when agent returns cancellation text', async () => { + mockAgent.process.mockResolvedValueOnce('Operation cancelled by user.'); + + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + const req: GatewayRequest = { + id: 15, + method: 'agent.send', + params: { message: 'cancel me', connectionId: 'conn-1' }, + }; + + await handlers['agent.send'](req, send); + + expect(mockAuditLogger.runState).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'ws:conn-1', + source: 'gateway', + state: 'cancelled', + }), + ); + expect((sent[0] as GatewayEvent).event).toBe('done'); + }); + + it('emits run.cancel telemetry for agent.cancel requests', async () => { + const result = await handlers['agent.cancel']({ + id: 16, + method: 'agent.cancel', + params: { connectionId: 'conn-1' }, + }); + + expect(sessionBridge.cancel).toHaveBeenCalledWith('conn-1'); + expect(mockAuditLogger.runCancel).toHaveBeenCalledWith( + expect.objectContaining({ + session_id: 'ws:conn-1', + source: 'gateway', + requested: true, + acknowledged: true, + }), + ); + expect((result as { result: { cancelled: boolean } }).result.cancelled).toBe(true); }); it('handles /queue command via fast-path and persists queue session config', async () => { diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 8b067f7..09a3af0 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -121,17 +121,21 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { ? laneQueueWithProcessing.isProcessing(laneId) : false; const requestId = request.id.toString(); + const sessionIdForAudit = sessionId ?? `ws:${connectionId}`; + const runStartedAt = Date.now(); let interruptedPreviousRun = false; // Interrupt mode should preempt active work when a newer request arrives. // LaneQueue itself only rejects queued entries, so we also request agent cancellation. if (resolvedPolicy?.mode === 'interrupt' && laneIsProcessing) { + const cancelStartedAt = Date.now(); const cancelled = sessionId ? deps.sessionBridge.cancelSession(sessionId) : deps.sessionBridge.cancel(connectionId); + const cancelLatencyMs = Date.now() - cancelStartedAt; interruptedPreviousRun = cancelled; auditLogger?.queuePreempt?.({ - session_id: sessionId ?? `ws:${connectionId}`, + session_id: sessionIdForAudit, channel: 'ws', sender: connectionId, lane_id: laneId, @@ -139,6 +143,27 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { mode: 'interrupt', cancelled_active_run: cancelled, }); + auditLogger?.runCancel?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + requested: true, + acknowledged: cancelled, + request_id: requestId, + latency_ms: cancelLatencyMs, + }); + if (cancelled) { + auditLogger?.runState?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + state: 'cancel_requested', + request_id: requestId, + duration_ms: cancelLatencyMs, + }); + } } // Enqueue the work — if the lane is idle it runs immediately, @@ -163,7 +188,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { ? safeParams.metadata.command : commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined; auditLogger?.userAction({ - session_id: sessionId ?? `ws:${connectionId}`, + session_id: sessionIdForAudit, channel: 'ws', sender: connectionId, source: 'gateway', @@ -173,6 +198,18 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { command: parsedCommand, }); + const isCommand = Boolean(commandInput && deps.commandRegistry?.isCommand(commandInput)); + if (!isCommand) { + auditLogger?.runState?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + state: 'start', + request_id: requestId, + }); + } + if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { const sessionId = deps.sessionBridge.getSessionId(connectionId); const commandResult = await deps.commandRegistry.execute(commandInput, { @@ -340,7 +377,30 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { return 'Session reset.'; }, cancelRun: () => { + const cancelStartedAt = Date.now(); const cancelled = deps.sessionBridge.cancel(connectionId); + const cancelLatencyMs = Date.now() - cancelStartedAt; + auditLogger?.runCancel?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + requested: true, + acknowledged: cancelled, + request_id: requestId, + latency_ms: cancelLatencyMs, + }); + if (cancelled) { + auditLogger?.runState?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + state: 'cancel_requested', + request_id: requestId, + duration_ms: cancelLatencyMs, + }); + } return cancelled ? 'Cancellation requested. The active operation will stop at the next safe point.' : 'No active operation to cancel.'; @@ -635,6 +695,17 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { }); } send(makeEvent(request.id, 'done', { content: response })); + auditLogger?.runState?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + state: response.trim().toLowerCase() === 'operation cancelled by user.' + ? 'cancelled' + : 'complete', + request_id: requestId, + duration_ms: Date.now() - runStartedAt, + }); } catch (err) { const message = err instanceof Error ? err.message : 'Unknown error'; deps.metrics?.incrementErrors(); @@ -646,6 +717,16 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { context: { sessionId: laneId }, }); send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message })); + auditLogger?.runState?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + state: 'error', + request_id: requestId, + duration_ms: Date.now() - runStartedAt, + error: message, + }); } finally { deps.sessionBridge.setBusy(connectionId, false); deps.sessionBridge.setOnToolUse(connectionId, undefined); @@ -679,7 +760,31 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { // Clear any queued (not-yet-started) work first. deps.laneQueue.cancel(laneId); + const cancelStartedAt = Date.now(); const cancelled = deps.sessionBridge.cancel(connectionId); + const cancelLatencyMs = Date.now() - cancelStartedAt; + const sessionIdForAudit = sessionId ?? `ws:${connectionId}`; + auditLogger?.runCancel?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + requested: true, + acknowledged: cancelled, + request_id: request.id.toString(), + latency_ms: cancelLatencyMs, + }); + if (cancelled) { + auditLogger?.runState?.({ + session_id: sessionIdForAudit, + channel: 'ws', + sender: connectionId, + source: 'gateway', + state: 'cancel_requested', + request_id: request.id.toString(), + duration_ms: cancelLatencyMs, + }); + } return { id: request.id, result: {