diff --git a/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md b/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md index d6e2cf9..0bcf8d6 100644 --- a/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md +++ b/docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md @@ -90,6 +90,8 @@ Emit new audit events without changing request handling behavior: ## Ticket 0.3 — Metrics Collector Baseline Counters +Status: completed (2026-02-25) + ### Scope Extend in-memory gateway metrics with baseline counters: diff --git a/docs/plans/state.json b/docs/plans/state.json index b750e00..9ae3b40 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -32,6 +32,22 @@ ], "test_status": "pnpm test:run src/daemon/routing.test.ts + pnpm test:run src/gateway/handlers/agent.test.ts passing" }, + "phase0-ticket-0.3-metrics-baseline-counters": { + "status": "completed", + "date": "2026-02-25", + "updated": "2026-02-25", + "summary": "Implemented Phase 0 Ticket 0.3 by adding run-state counters, cancel-latency samples, and reaction decision counters to gateway metrics with routing/gateway emitters to populate the new baselines.", + "files_modified": [ + "src/gateway/metrics.ts", + "src/gateway/metrics.test.ts", + "src/gateway/handlers/agent.ts", + "src/daemon/routing.ts", + "src/daemon/index.ts", + "docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/gateway/metrics.test.ts src/daemon/routing.test.ts src/gateway/handlers/agent.test.ts passing" + }, "phase0-instrumentation-ticket-checklist": { "status": "completed", "date": "2026-02-25", @@ -6644,7 +6660,8 @@ "deeper_surfaces_phase0_ticket_pack": "completed — produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)", "deeper_surfaces_phase0_ticket_01": "completed — audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage", "deeper_surfaces_phase0_ticket_02": "completed — gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests", - "next_up": "Implement Ticket 0.3 from docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md", + "deeper_surfaces_phase0_ticket_03": "completed — gateway metrics now track run-state outcomes, cancel latency samples, and reaction decision counters with routing/gateway emitters", + "next_up": "Implement Ticket 0.4 from docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md", "pi_embedded_canary_spike": "completed — added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default", "pi_embedded_evaluation_phase": "completed — final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)", "pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing", diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 366eca3..1ae8789 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -260,6 +260,7 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions): const messageRouter = createMessageRouter({ sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor, config, memoryStore, agentConfigRegistry, agentRouter, sandboxManager, commandRegistry, hookEngine, intentRegistry, routingPolicy, skillRegistry, skillInstaller, + metrics: gateway.getMetrics(), getBackendMode: () => backendMode, setBackendMode: (mode) => { backendMode = mode; diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 159a994..9284b94 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -32,6 +32,7 @@ import { loadSkillRegistryCatalog } from '../skills/index.js'; import type { SkillInstaller, SkillRegistry, SkillRegistryEntry, SkillRegistrySource } from '../skills/index.js'; import { auditLogger } from '../audit/index.js'; import { getElevationStatusMessage, setElevationFromInput } from '../security/elevation.js'; +import type { MetricsCollector } from '../gateway/metrics.js'; import { dirname, resolve } from 'path'; import { loadCouncilScaffoldSafe } from '../councils/scaffold.js'; import { buildCouncilPreflightReport, shouldRunCouncilPreflight } from '../councils/preflight.js'; @@ -363,6 +364,7 @@ export function createMessageRouter(deps: { systemPrompt: string; toolRegistry: ToolRegistry; toolExecutor: ToolExecutor; + metrics?: MetricsCollector; config: Config; memoryStore?: MemoryStore; agentConfigRegistry?: AgentConfigRegistry; @@ -779,6 +781,7 @@ export function createMessageRouter(deps: { reason: 'no_rules', candidate_count: 0, }); + deps.metrics?.recordReactionDecision({ matched: false, reason: 'no_rules' }); } else { const reactionMatch = matchReactionPrompt(automationReactions, { channel: msg.channel, @@ -799,6 +802,7 @@ export function createMessageRouter(deps: { candidate_count: automationReactions.length, filter_summary: buildReactionFilterSummary(matchedRule), }); + deps.metrics?.recordReactionDecision({ matched: true }); } else { auditLogger?.reactionSkip?.({ session_id: sessionIdForRun, @@ -808,6 +812,7 @@ export function createMessageRouter(deps: { reason: 'no_match', candidate_count: automationReactions.length, }); + deps.metrics?.recordReactionDecision({ matched: false, reason: 'no_match' }); } } } @@ -1064,6 +1069,7 @@ export function createMessageRouter(deps: { const cancelStartedAt = Date.now(); const run = activeRuns.get(session.id); if (!run || !run.isCancellable()) { + deps.metrics?.recordCancelLatency(Date.now() - cancelStartedAt); auditLogger?.runCancel?.({ session_id: session.id, channel: msg.channel, @@ -1078,6 +1084,7 @@ export function createMessageRouter(deps: { } run.cancel(); const cancelLatencyMs = Date.now() - cancelStartedAt; + deps.metrics?.recordCancelLatency(cancelLatencyMs); auditLogger?.runCancel?.({ session_id: session.id, channel: msg.channel, @@ -1097,6 +1104,7 @@ export function createMessageRouter(deps: { request_id: msg.id, duration_ms: cancelLatencyMs, }); + deps.metrics?.recordRunState('cancel_requested'); return 'Cancellation requested. The active operation will stop at the next safe point.'; }, @@ -1503,6 +1511,7 @@ export function createMessageRouter(deps: { state: 'start', request_id: msg.id, }); + deps.metrics?.recordRunState('start'); // Determine if the active model supports native audio input let effectiveTier: string = deps.config.agents.primary_tier ?? 'default'; @@ -1577,6 +1586,7 @@ export function createMessageRouter(deps: { request_id: msg.id, duration_ms: Date.now() - runStartedAt, }); + deps.metrics?.recordRunState('complete'); return; } @@ -1673,6 +1683,7 @@ export function createMessageRouter(deps: { request_id: msg.id, duration_ms: Date.now() - runStartedAt, }); + deps.metrics?.recordRunState('complete'); return; } catch (error) { const detail = error instanceof Error ? error.message : String(error); @@ -1716,17 +1727,19 @@ export function createMessageRouter(deps: { replyTo: msg.id, attachments: mergedAttachments.length > 0 ? mergedAttachments : undefined, }); + const finalState = response.trim().toLowerCase() === 'operation cancelled by user.' + ? 'cancelled' + : 'complete'; auditLogger?.runState?.({ session_id: sessionIdForRun, channel: msg.channel, sender: msg.senderId, source: 'channel', - state: response.trim().toLowerCase() === 'operation cancelled by user.' - ? 'cancelled' - : 'complete', + state: finalState, request_id: msg.id, duration_ms: Date.now() - runStartedAt, }); + deps.metrics?.recordRunState(finalState); } catch (error) { console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error); await reply({ @@ -1742,6 +1755,7 @@ export function createMessageRouter(deps: { request_id: msg.id, error: error instanceof Error ? error.message : String(error), }); + deps.metrics?.recordRunState('error'); } finally { activeRuns.delete(sessionIdForRun); } diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 09a3af0..a3f244a 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -134,6 +134,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { : deps.sessionBridge.cancel(connectionId); const cancelLatencyMs = Date.now() - cancelStartedAt; interruptedPreviousRun = cancelled; + deps.metrics?.recordCancelLatency(cancelLatencyMs); auditLogger?.queuePreempt?.({ session_id: sessionIdForAudit, channel: 'ws', @@ -163,6 +164,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { request_id: requestId, duration_ms: cancelLatencyMs, }); + deps.metrics?.recordRunState('cancel_requested'); } } @@ -208,6 +210,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { state: 'start', request_id: requestId, }); + deps.metrics?.recordRunState('start'); } if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { @@ -380,6 +383,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const cancelStartedAt = Date.now(); const cancelled = deps.sessionBridge.cancel(connectionId); const cancelLatencyMs = Date.now() - cancelStartedAt; + deps.metrics?.recordCancelLatency(cancelLatencyMs); auditLogger?.runCancel?.({ session_id: sessionIdForAudit, channel: 'ws', @@ -400,6 +404,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { request_id: requestId, duration_ms: cancelLatencyMs, }); + deps.metrics?.recordRunState('cancel_requested'); } return cancelled ? 'Cancellation requested. The active operation will stop at the next safe point.' @@ -695,17 +700,19 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { }); } send(makeEvent(request.id, 'done', { content: response })); + const finalState = response.trim().toLowerCase() === 'operation cancelled by user.' + ? 'cancelled' + : 'complete'; auditLogger?.runState?.({ session_id: sessionIdForAudit, channel: 'ws', sender: connectionId, source: 'gateway', - state: response.trim().toLowerCase() === 'operation cancelled by user.' - ? 'cancelled' - : 'complete', + state: finalState, request_id: requestId, duration_ms: Date.now() - runStartedAt, }); + deps.metrics?.recordRunState(finalState); } catch (err) { const message = err instanceof Error ? err.message : 'Unknown error'; deps.metrics?.incrementErrors(); @@ -727,6 +734,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { duration_ms: Date.now() - runStartedAt, error: message, }); + deps.metrics?.recordRunState('error'); } finally { deps.sessionBridge.setBusy(connectionId, false); deps.sessionBridge.setOnToolUse(connectionId, undefined); @@ -763,6 +771,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { const cancelStartedAt = Date.now(); const cancelled = deps.sessionBridge.cancel(connectionId); const cancelLatencyMs = Date.now() - cancelStartedAt; + deps.metrics?.recordCancelLatency(cancelLatencyMs); const sessionIdForAudit = sessionId ?? `ws:${connectionId}`; auditLogger?.runCancel?.({ session_id: sessionIdForAudit, @@ -784,6 +793,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { request_id: request.id.toString(), duration_ms: cancelLatencyMs, }); + deps.metrics?.recordRunState('cancel_requested'); } return { id: request.id, diff --git a/src/gateway/metrics.test.ts b/src/gateway/metrics.test.ts index 6cf21e9..545271f 100644 --- a/src/gateway/metrics.test.ts +++ b/src/gateway/metrics.test.ts @@ -28,6 +28,65 @@ describe('MetricsCollector', () => { }); }); + describe('run state counters', () => { + it('tracks run lifecycle states', () => { + collector.recordRunState('start'); + collector.recordRunState('complete'); + collector.recordRunState('cancel_requested'); + collector.recordRunState('cancelled'); + collector.recordRunState('error'); + + expect(collector.getRunStateCounters()).toEqual({ + start: 1, + complete: 1, + cancel_requested: 1, + cancelled: 1, + error: 1, + }); + }); + }); + + describe('cancel latency tracking', () => { + it('records cancel latency samples', () => { + collector.recordCancelLatency(12.2); + collector.recordCancelLatency(30); + const snapshot = collector.getCancelLatencySnapshot(); + expect(snapshot.sampleCount).toBe(2); + expect(snapshot.samples).toEqual([12, 30]); + }); + + it('enforces cancel latency buffer size', () => { + const small = new MetricsCollector({ cancelLatencyBufferSize: 3 }); + small.recordCancelLatency(10); + small.recordCancelLatency(20); + small.recordCancelLatency(30); + small.recordCancelLatency(40); + + const snapshot = small.getCancelLatencySnapshot(); + expect(snapshot.sampleCount).toBe(4); + expect(snapshot.samples).toEqual([20, 30, 40]); + }); + }); + + describe('reaction decision counters', () => { + it('tracks reaction matches and skips by reason', () => { + collector.recordReactionDecision({ matched: true }); + collector.recordReactionDecision({ matched: true }); + collector.recordReactionDecision({ matched: false, reason: 'no_match' }); + collector.recordReactionDecision({ matched: false, reason: 'no_rules' }); + collector.recordReactionDecision({ matched: false, reason: 'no_match' }); + + expect(collector.getReactionDecisionCounters()).toEqual({ + matched: 2, + skipped: 3, + skipReasons: { + no_match: 2, + no_rules: 1, + }, + }); + }); + }); + describe('model call ring buffer', () => { function makeCall(overrides?: Partial): ModelCallEntry { return { @@ -193,6 +252,15 @@ describe('MetricsCollector', () => { expect(snapshot.modelCalls.avgLatency).toBe(0); expect(snapshot.modelCalls.errorRate).toBe(0); expect(snapshot.modelCalls.recentCalls).toEqual([]); + expect(snapshot.runStates).toEqual({ + start: 0, + complete: 0, + cancel_requested: 0, + cancelled: 0, + error: 0, + }); + expect(snapshot.cancelLatencyMs).toEqual({ sampleCount: 0, samples: [] }); + expect(snapshot.reactions).toEqual({ matched: 0, skipped: 0, skipReasons: {} }); expect(snapshot.queueDepth).toBe(0); }); diff --git a/src/gateway/metrics.ts b/src/gateway/metrics.ts index 6d0ae71..c977764 100644 --- a/src/gateway/metrics.ts +++ b/src/gateway/metrics.ts @@ -33,6 +33,27 @@ export interface ActiveRequestInfo { durationMs: number; } +export type RunState = 'start' | 'complete' | 'cancel_requested' | 'cancelled' | 'error'; + +export interface RunStateCounters { + start: number; + complete: number; + cancel_requested: number; + cancelled: number; + error: number; +} + +export interface CancelLatencySnapshot { + sampleCount: number; + samples: number[]; +} + +export interface ReactionDecisionCounters { + matched: number; + skipped: number; + skipReasons: Record; +} + export interface MetricsSnapshot { messagesProcessed: number; errors: number; @@ -44,6 +65,9 @@ export interface MetricsSnapshot { errorRate: number; recentCalls: ModelCallEntry[]; }; + runStates: RunStateCounters; + cancelLatencyMs: CancelLatencySnapshot; + reactions: ReactionDecisionCounters; queueDepth: number; } @@ -51,6 +75,7 @@ export interface MetricsCollectorConfig { getQueueDepth?: () => number; modelCallBufferSize?: number; eventBufferSize?: number; + cancelLatencyBufferSize?: number; } // ── Implementation ─────────────────────────────────────────────── @@ -76,11 +101,28 @@ export class MetricsCollector { private _activeRequests: Map = new Map(); private _getQueueDepth: () => number; + private _runStateCounts: RunStateCounters = { + start: 0, + complete: 0, + cancel_requested: 0, + cancelled: 0, + error: 0, + }; + private _cancelLatencies: number[] = []; + private _cancelLatencyBufferSize: number; + private _cancelLatencyTotal = 0; + private _reactionDecisions: ReactionDecisionCounters = { + matched: 0, + skipped: 0, + skipReasons: {}, + }; + constructor(config?: MetricsCollectorConfig) { this._startTime = Date.now(); this._getQueueDepth = config?.getQueueDepth ?? (() => 0); this._modelCallBufferSize = config?.modelCallBufferSize ?? 200; this._eventBufferSize = config?.eventBufferSize ?? 500; + this._cancelLatencyBufferSize = config?.cancelLatencyBufferSize ?? 200; } // ── Counters ───────────────────────────────────────────────── @@ -105,6 +147,60 @@ export class MetricsCollector { return this._activeRequestCount; } + // ── Run state + cancel latency ────────────────────────────── + + recordRunState(state: RunState): void { + this._runStateCounts[state] += 1; + } + + recordCancelLatency(latencyMs: number): void { + if (!Number.isFinite(latencyMs) || latencyMs < 0) { + return; + } + const normalized = Math.round(latencyMs); + this._cancelLatencies.push(normalized); + this._cancelLatencyTotal += 1; + if (this._cancelLatencies.length > this._cancelLatencyBufferSize) { + this._cancelLatencies.shift(); + } + } + + getRunStateCounters(): RunStateCounters { + return { ...this._runStateCounts }; + } + + getCancelLatencySnapshot(): CancelLatencySnapshot { + return { + sampleCount: this._cancelLatencyTotal, + samples: [...this._cancelLatencies], + }; + } + + // ── Reaction decisions ────────────────────────────────────── + + recordReactionDecision(decision: { matched: boolean; reason?: string }): void { + if (decision.matched) { + this._reactionDecisions.matched += 1; + return; + } + + this._reactionDecisions.skipped += 1; + if (decision.reason) { + const key = decision.reason.trim(); + if (key.length > 0) { + this._reactionDecisions.skipReasons[key] = (this._reactionDecisions.skipReasons[key] ?? 0) + 1; + } + } + } + + getReactionDecisionCounters(): ReactionDecisionCounters { + return { + matched: this._reactionDecisions.matched, + skipped: this._reactionDecisions.skipped, + skipReasons: { ...this._reactionDecisions.skipReasons }, + }; + } + // ── Model call ring buffer ─────────────────────────────────── recordModelCall(entry: ModelCallEntry): void { @@ -204,6 +300,9 @@ export class MetricsCollector { errorRate: Math.round(errorRate * 10000) / 10000, recentCalls: calls.slice(-20), }, + runStates: this.getRunStateCounters(), + cancelLatencyMs: this.getCancelLatencySnapshot(), + reactions: this.getReactionDecisionCounters(), queueDepth: this._getQueueDepth(), }; }