feat(metrics): add phase-0 baseline counters
Diagrams reviewed: docs/architecture/AGENT_DIAGRAM.md, docs/architecture/GATEWAY_SESSIONS_AND_QUEUE.md, docs/api/PROTOCOL.md (no changes required).
This commit is contained in:
@@ -90,6 +90,8 @@ Emit new audit events without changing request handling behavior:
|
|||||||
|
|
||||||
## Ticket 0.3 — Metrics Collector Baseline Counters
|
## Ticket 0.3 — Metrics Collector Baseline Counters
|
||||||
|
|
||||||
|
Status: completed (2026-02-25)
|
||||||
|
|
||||||
### Scope
|
### Scope
|
||||||
|
|
||||||
Extend in-memory gateway metrics with baseline counters:
|
Extend in-memory gateway metrics with baseline counters:
|
||||||
|
|||||||
+18
-1
@@ -32,6 +32,22 @@
|
|||||||
],
|
],
|
||||||
"test_status": "pnpm test:run src/daemon/routing.test.ts + pnpm test:run src/gateway/handlers/agent.test.ts passing"
|
"test_status": "pnpm test:run src/daemon/routing.test.ts + pnpm test:run src/gateway/handlers/agent.test.ts passing"
|
||||||
},
|
},
|
||||||
|
"phase0-ticket-0.3-metrics-baseline-counters": {
|
||||||
|
"status": "completed",
|
||||||
|
"date": "2026-02-25",
|
||||||
|
"updated": "2026-02-25",
|
||||||
|
"summary": "Implemented Phase 0 Ticket 0.3 by adding run-state counters, cancel-latency samples, and reaction decision counters to gateway metrics with routing/gateway emitters to populate the new baselines.",
|
||||||
|
"files_modified": [
|
||||||
|
"src/gateway/metrics.ts",
|
||||||
|
"src/gateway/metrics.test.ts",
|
||||||
|
"src/gateway/handlers/agent.ts",
|
||||||
|
"src/daemon/routing.ts",
|
||||||
|
"src/daemon/index.ts",
|
||||||
|
"docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md",
|
||||||
|
"docs/plans/state.json"
|
||||||
|
],
|
||||||
|
"test_status": "pnpm test:run src/gateway/metrics.test.ts src/daemon/routing.test.ts src/gateway/handlers/agent.test.ts passing"
|
||||||
|
},
|
||||||
"phase0-instrumentation-ticket-checklist": {
|
"phase0-instrumentation-ticket-checklist": {
|
||||||
"status": "completed",
|
"status": "completed",
|
||||||
"date": "2026-02-25",
|
"date": "2026-02-25",
|
||||||
@@ -6644,7 +6660,8 @@
|
|||||||
"deeper_surfaces_phase0_ticket_pack": "completed — produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)",
|
"deeper_surfaces_phase0_ticket_pack": "completed — produced an atomic implementation checklist for Phase 0 baseline observability work (audit events, router/gateway emitters, metrics counters, baseline summary tooling, docs sync)",
|
||||||
"deeper_surfaces_phase0_ticket_01": "completed — audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage",
|
"deeper_surfaces_phase0_ticket_01": "completed — audit schema/logger now capture run lifecycle and reaction decision baseline events (`run.state`, `run.cancel`, `reaction.match`, `reaction.skip`) with regression test coverage",
|
||||||
"deeper_surfaces_phase0_ticket_02": "completed — gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests",
|
"deeper_surfaces_phase0_ticket_02": "completed — gateway + daemon routing emit run lifecycle/cancel telemetry and reaction match/skip audit events with filter summaries and cancellation latency, plus focused tests",
|
||||||
"next_up": "Implement Ticket 0.3 from docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md",
|
"deeper_surfaces_phase0_ticket_03": "completed — gateway metrics now track run-state outcomes, cancel latency samples, and reaction decision counters with routing/gateway emitters",
|
||||||
|
"next_up": "Implement Ticket 0.4 from docs/plans/2026-02-25-phase0-instrumentation-ticket-checklist.md",
|
||||||
"pi_embedded_canary_spike": "completed — added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default",
|
"pi_embedded_canary_spike": "completed — added optional pi_embedded backend adapter, canary-safe no-tools routing guard, backend success/fallback latency audit events, and docs/diagram updates while native remains default",
|
||||||
"pi_embedded_evaluation_phase": "completed — final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)",
|
"pi_embedded_evaluation_phase": "completed — final decision rollback (applied in runtime config): Window A failed latency/fallback gates (p50 +259ms, p95 +5695ms, fallback 25%, categories: pi_module_interface/empty_assistant_text); Window B remained sample-insufficient; controlled probes verified guard coverage (pi_no_tools_mode/capability_query/attachments_present each hit once)",
|
||||||
"pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing",
|
"pi_embedded_manual_mode": "completed — added persisted runtime backend controls for manual Pi activation/deactivation (`/runtime` preferred, `/backend` alias; `status`, `activate pi`, `deactivate pi`, `use config`) while keeping config-driven default routing",
|
||||||
|
|||||||
@@ -260,6 +260,7 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions):
|
|||||||
const messageRouter = createMessageRouter({
|
const messageRouter = createMessageRouter({
|
||||||
sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor,
|
sessionManager, modelRouter, systemPrompt, toolRegistry, toolExecutor,
|
||||||
config, memoryStore, agentConfigRegistry, agentRouter, sandboxManager, commandRegistry, hookEngine, intentRegistry, routingPolicy, skillRegistry, skillInstaller,
|
config, memoryStore, agentConfigRegistry, agentRouter, sandboxManager, commandRegistry, hookEngine, intentRegistry, routingPolicy, skillRegistry, skillInstaller,
|
||||||
|
metrics: gateway.getMetrics(),
|
||||||
getBackendMode: () => backendMode,
|
getBackendMode: () => backendMode,
|
||||||
setBackendMode: (mode) => {
|
setBackendMode: (mode) => {
|
||||||
backendMode = mode;
|
backendMode = mode;
|
||||||
|
|||||||
+17
-3
@@ -32,6 +32,7 @@ import { loadSkillRegistryCatalog } from '../skills/index.js';
|
|||||||
import type { SkillInstaller, SkillRegistry, SkillRegistryEntry, SkillRegistrySource } from '../skills/index.js';
|
import type { SkillInstaller, SkillRegistry, SkillRegistryEntry, SkillRegistrySource } from '../skills/index.js';
|
||||||
import { auditLogger } from '../audit/index.js';
|
import { auditLogger } from '../audit/index.js';
|
||||||
import { getElevationStatusMessage, setElevationFromInput } from '../security/elevation.js';
|
import { getElevationStatusMessage, setElevationFromInput } from '../security/elevation.js';
|
||||||
|
import type { MetricsCollector } from '../gateway/metrics.js';
|
||||||
import { dirname, resolve } from 'path';
|
import { dirname, resolve } from 'path';
|
||||||
import { loadCouncilScaffoldSafe } from '../councils/scaffold.js';
|
import { loadCouncilScaffoldSafe } from '../councils/scaffold.js';
|
||||||
import { buildCouncilPreflightReport, shouldRunCouncilPreflight } from '../councils/preflight.js';
|
import { buildCouncilPreflightReport, shouldRunCouncilPreflight } from '../councils/preflight.js';
|
||||||
@@ -363,6 +364,7 @@ export function createMessageRouter(deps: {
|
|||||||
systemPrompt: string;
|
systemPrompt: string;
|
||||||
toolRegistry: ToolRegistry;
|
toolRegistry: ToolRegistry;
|
||||||
toolExecutor: ToolExecutor;
|
toolExecutor: ToolExecutor;
|
||||||
|
metrics?: MetricsCollector;
|
||||||
config: Config;
|
config: Config;
|
||||||
memoryStore?: MemoryStore;
|
memoryStore?: MemoryStore;
|
||||||
agentConfigRegistry?: AgentConfigRegistry;
|
agentConfigRegistry?: AgentConfigRegistry;
|
||||||
@@ -779,6 +781,7 @@ export function createMessageRouter(deps: {
|
|||||||
reason: 'no_rules',
|
reason: 'no_rules',
|
||||||
candidate_count: 0,
|
candidate_count: 0,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordReactionDecision({ matched: false, reason: 'no_rules' });
|
||||||
} else {
|
} else {
|
||||||
const reactionMatch = matchReactionPrompt(automationReactions, {
|
const reactionMatch = matchReactionPrompt(automationReactions, {
|
||||||
channel: msg.channel,
|
channel: msg.channel,
|
||||||
@@ -799,6 +802,7 @@ export function createMessageRouter(deps: {
|
|||||||
candidate_count: automationReactions.length,
|
candidate_count: automationReactions.length,
|
||||||
filter_summary: buildReactionFilterSummary(matchedRule),
|
filter_summary: buildReactionFilterSummary(matchedRule),
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordReactionDecision({ matched: true });
|
||||||
} else {
|
} else {
|
||||||
auditLogger?.reactionSkip?.({
|
auditLogger?.reactionSkip?.({
|
||||||
session_id: sessionIdForRun,
|
session_id: sessionIdForRun,
|
||||||
@@ -808,6 +812,7 @@ export function createMessageRouter(deps: {
|
|||||||
reason: 'no_match',
|
reason: 'no_match',
|
||||||
candidate_count: automationReactions.length,
|
candidate_count: automationReactions.length,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordReactionDecision({ matched: false, reason: 'no_match' });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1064,6 +1069,7 @@ export function createMessageRouter(deps: {
|
|||||||
const cancelStartedAt = Date.now();
|
const cancelStartedAt = Date.now();
|
||||||
const run = activeRuns.get(session.id);
|
const run = activeRuns.get(session.id);
|
||||||
if (!run || !run.isCancellable()) {
|
if (!run || !run.isCancellable()) {
|
||||||
|
deps.metrics?.recordCancelLatency(Date.now() - cancelStartedAt);
|
||||||
auditLogger?.runCancel?.({
|
auditLogger?.runCancel?.({
|
||||||
session_id: session.id,
|
session_id: session.id,
|
||||||
channel: msg.channel,
|
channel: msg.channel,
|
||||||
@@ -1078,6 +1084,7 @@ export function createMessageRouter(deps: {
|
|||||||
}
|
}
|
||||||
run.cancel();
|
run.cancel();
|
||||||
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
||||||
|
deps.metrics?.recordCancelLatency(cancelLatencyMs);
|
||||||
auditLogger?.runCancel?.({
|
auditLogger?.runCancel?.({
|
||||||
session_id: session.id,
|
session_id: session.id,
|
||||||
channel: msg.channel,
|
channel: msg.channel,
|
||||||
@@ -1097,6 +1104,7 @@ export function createMessageRouter(deps: {
|
|||||||
request_id: msg.id,
|
request_id: msg.id,
|
||||||
duration_ms: cancelLatencyMs,
|
duration_ms: cancelLatencyMs,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('cancel_requested');
|
||||||
return 'Cancellation requested. The active operation will stop at the next safe point.';
|
return 'Cancellation requested. The active operation will stop at the next safe point.';
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -1503,6 +1511,7 @@ export function createMessageRouter(deps: {
|
|||||||
state: 'start',
|
state: 'start',
|
||||||
request_id: msg.id,
|
request_id: msg.id,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('start');
|
||||||
|
|
||||||
// Determine if the active model supports native audio input
|
// Determine if the active model supports native audio input
|
||||||
let effectiveTier: string = deps.config.agents.primary_tier ?? 'default';
|
let effectiveTier: string = deps.config.agents.primary_tier ?? 'default';
|
||||||
@@ -1577,6 +1586,7 @@ export function createMessageRouter(deps: {
|
|||||||
request_id: msg.id,
|
request_id: msg.id,
|
||||||
duration_ms: Date.now() - runStartedAt,
|
duration_ms: Date.now() - runStartedAt,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('complete');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1673,6 +1683,7 @@ export function createMessageRouter(deps: {
|
|||||||
request_id: msg.id,
|
request_id: msg.id,
|
||||||
duration_ms: Date.now() - runStartedAt,
|
duration_ms: Date.now() - runStartedAt,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('complete');
|
||||||
return;
|
return;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
const detail = error instanceof Error ? error.message : String(error);
|
const detail = error instanceof Error ? error.message : String(error);
|
||||||
@@ -1716,17 +1727,19 @@ export function createMessageRouter(deps: {
|
|||||||
replyTo: msg.id,
|
replyTo: msg.id,
|
||||||
attachments: mergedAttachments.length > 0 ? mergedAttachments : undefined,
|
attachments: mergedAttachments.length > 0 ? mergedAttachments : undefined,
|
||||||
});
|
});
|
||||||
|
const finalState = response.trim().toLowerCase() === 'operation cancelled by user.'
|
||||||
|
? 'cancelled'
|
||||||
|
: 'complete';
|
||||||
auditLogger?.runState?.({
|
auditLogger?.runState?.({
|
||||||
session_id: sessionIdForRun,
|
session_id: sessionIdForRun,
|
||||||
channel: msg.channel,
|
channel: msg.channel,
|
||||||
sender: msg.senderId,
|
sender: msg.senderId,
|
||||||
source: 'channel',
|
source: 'channel',
|
||||||
state: response.trim().toLowerCase() === 'operation cancelled by user.'
|
state: finalState,
|
||||||
? 'cancelled'
|
|
||||||
: 'complete',
|
|
||||||
request_id: msg.id,
|
request_id: msg.id,
|
||||||
duration_ms: Date.now() - runStartedAt,
|
duration_ms: Date.now() - runStartedAt,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState(finalState);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error);
|
console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error);
|
||||||
await reply({
|
await reply({
|
||||||
@@ -1742,6 +1755,7 @@ export function createMessageRouter(deps: {
|
|||||||
request_id: msg.id,
|
request_id: msg.id,
|
||||||
error: error instanceof Error ? error.message : String(error),
|
error: error instanceof Error ? error.message : String(error),
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('error');
|
||||||
} finally {
|
} finally {
|
||||||
activeRuns.delete(sessionIdForRun);
|
activeRuns.delete(sessionIdForRun);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -134,6 +134,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
: deps.sessionBridge.cancel(connectionId);
|
: deps.sessionBridge.cancel(connectionId);
|
||||||
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
||||||
interruptedPreviousRun = cancelled;
|
interruptedPreviousRun = cancelled;
|
||||||
|
deps.metrics?.recordCancelLatency(cancelLatencyMs);
|
||||||
auditLogger?.queuePreempt?.({
|
auditLogger?.queuePreempt?.({
|
||||||
session_id: sessionIdForAudit,
|
session_id: sessionIdForAudit,
|
||||||
channel: 'ws',
|
channel: 'ws',
|
||||||
@@ -163,6 +164,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
request_id: requestId,
|
request_id: requestId,
|
||||||
duration_ms: cancelLatencyMs,
|
duration_ms: cancelLatencyMs,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('cancel_requested');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -208,6 +210,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
state: 'start',
|
state: 'start',
|
||||||
request_id: requestId,
|
request_id: requestId,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('start');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) {
|
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) {
|
||||||
@@ -380,6 +383,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
const cancelStartedAt = Date.now();
|
const cancelStartedAt = Date.now();
|
||||||
const cancelled = deps.sessionBridge.cancel(connectionId);
|
const cancelled = deps.sessionBridge.cancel(connectionId);
|
||||||
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
||||||
|
deps.metrics?.recordCancelLatency(cancelLatencyMs);
|
||||||
auditLogger?.runCancel?.({
|
auditLogger?.runCancel?.({
|
||||||
session_id: sessionIdForAudit,
|
session_id: sessionIdForAudit,
|
||||||
channel: 'ws',
|
channel: 'ws',
|
||||||
@@ -400,6 +404,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
request_id: requestId,
|
request_id: requestId,
|
||||||
duration_ms: cancelLatencyMs,
|
duration_ms: cancelLatencyMs,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('cancel_requested');
|
||||||
}
|
}
|
||||||
return cancelled
|
return cancelled
|
||||||
? 'Cancellation requested. The active operation will stop at the next safe point.'
|
? 'Cancellation requested. The active operation will stop at the next safe point.'
|
||||||
@@ -695,17 +700,19 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
send(makeEvent(request.id, 'done', { content: response }));
|
send(makeEvent(request.id, 'done', { content: response }));
|
||||||
|
const finalState = response.trim().toLowerCase() === 'operation cancelled by user.'
|
||||||
|
? 'cancelled'
|
||||||
|
: 'complete';
|
||||||
auditLogger?.runState?.({
|
auditLogger?.runState?.({
|
||||||
session_id: sessionIdForAudit,
|
session_id: sessionIdForAudit,
|
||||||
channel: 'ws',
|
channel: 'ws',
|
||||||
sender: connectionId,
|
sender: connectionId,
|
||||||
source: 'gateway',
|
source: 'gateway',
|
||||||
state: response.trim().toLowerCase() === 'operation cancelled by user.'
|
state: finalState,
|
||||||
? 'cancelled'
|
|
||||||
: 'complete',
|
|
||||||
request_id: requestId,
|
request_id: requestId,
|
||||||
duration_ms: Date.now() - runStartedAt,
|
duration_ms: Date.now() - runStartedAt,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState(finalState);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
const message = err instanceof Error ? err.message : 'Unknown error';
|
const message = err instanceof Error ? err.message : 'Unknown error';
|
||||||
deps.metrics?.incrementErrors();
|
deps.metrics?.incrementErrors();
|
||||||
@@ -727,6 +734,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
duration_ms: Date.now() - runStartedAt,
|
duration_ms: Date.now() - runStartedAt,
|
||||||
error: message,
|
error: message,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('error');
|
||||||
} finally {
|
} finally {
|
||||||
deps.sessionBridge.setBusy(connectionId, false);
|
deps.sessionBridge.setBusy(connectionId, false);
|
||||||
deps.sessionBridge.setOnToolUse(connectionId, undefined);
|
deps.sessionBridge.setOnToolUse(connectionId, undefined);
|
||||||
@@ -763,6 +771,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
const cancelStartedAt = Date.now();
|
const cancelStartedAt = Date.now();
|
||||||
const cancelled = deps.sessionBridge.cancel(connectionId);
|
const cancelled = deps.sessionBridge.cancel(connectionId);
|
||||||
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
const cancelLatencyMs = Date.now() - cancelStartedAt;
|
||||||
|
deps.metrics?.recordCancelLatency(cancelLatencyMs);
|
||||||
const sessionIdForAudit = sessionId ?? `ws:${connectionId}`;
|
const sessionIdForAudit = sessionId ?? `ws:${connectionId}`;
|
||||||
auditLogger?.runCancel?.({
|
auditLogger?.runCancel?.({
|
||||||
session_id: sessionIdForAudit,
|
session_id: sessionIdForAudit,
|
||||||
@@ -784,6 +793,7 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
|
|||||||
request_id: request.id.toString(),
|
request_id: request.id.toString(),
|
||||||
duration_ms: cancelLatencyMs,
|
duration_ms: cancelLatencyMs,
|
||||||
});
|
});
|
||||||
|
deps.metrics?.recordRunState('cancel_requested');
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
id: request.id,
|
id: request.id,
|
||||||
|
|||||||
@@ -28,6 +28,65 @@ describe('MetricsCollector', () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('run state counters', () => {
|
||||||
|
it('tracks run lifecycle states', () => {
|
||||||
|
collector.recordRunState('start');
|
||||||
|
collector.recordRunState('complete');
|
||||||
|
collector.recordRunState('cancel_requested');
|
||||||
|
collector.recordRunState('cancelled');
|
||||||
|
collector.recordRunState('error');
|
||||||
|
|
||||||
|
expect(collector.getRunStateCounters()).toEqual({
|
||||||
|
start: 1,
|
||||||
|
complete: 1,
|
||||||
|
cancel_requested: 1,
|
||||||
|
cancelled: 1,
|
||||||
|
error: 1,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('cancel latency tracking', () => {
|
||||||
|
it('records cancel latency samples', () => {
|
||||||
|
collector.recordCancelLatency(12.2);
|
||||||
|
collector.recordCancelLatency(30);
|
||||||
|
const snapshot = collector.getCancelLatencySnapshot();
|
||||||
|
expect(snapshot.sampleCount).toBe(2);
|
||||||
|
expect(snapshot.samples).toEqual([12, 30]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('enforces cancel latency buffer size', () => {
|
||||||
|
const small = new MetricsCollector({ cancelLatencyBufferSize: 3 });
|
||||||
|
small.recordCancelLatency(10);
|
||||||
|
small.recordCancelLatency(20);
|
||||||
|
small.recordCancelLatency(30);
|
||||||
|
small.recordCancelLatency(40);
|
||||||
|
|
||||||
|
const snapshot = small.getCancelLatencySnapshot();
|
||||||
|
expect(snapshot.sampleCount).toBe(4);
|
||||||
|
expect(snapshot.samples).toEqual([20, 30, 40]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('reaction decision counters', () => {
|
||||||
|
it('tracks reaction matches and skips by reason', () => {
|
||||||
|
collector.recordReactionDecision({ matched: true });
|
||||||
|
collector.recordReactionDecision({ matched: true });
|
||||||
|
collector.recordReactionDecision({ matched: false, reason: 'no_match' });
|
||||||
|
collector.recordReactionDecision({ matched: false, reason: 'no_rules' });
|
||||||
|
collector.recordReactionDecision({ matched: false, reason: 'no_match' });
|
||||||
|
|
||||||
|
expect(collector.getReactionDecisionCounters()).toEqual({
|
||||||
|
matched: 2,
|
||||||
|
skipped: 3,
|
||||||
|
skipReasons: {
|
||||||
|
no_match: 2,
|
||||||
|
no_rules: 1,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('model call ring buffer', () => {
|
describe('model call ring buffer', () => {
|
||||||
function makeCall(overrides?: Partial<ModelCallEntry>): ModelCallEntry {
|
function makeCall(overrides?: Partial<ModelCallEntry>): ModelCallEntry {
|
||||||
return {
|
return {
|
||||||
@@ -193,6 +252,15 @@ describe('MetricsCollector', () => {
|
|||||||
expect(snapshot.modelCalls.avgLatency).toBe(0);
|
expect(snapshot.modelCalls.avgLatency).toBe(0);
|
||||||
expect(snapshot.modelCalls.errorRate).toBe(0);
|
expect(snapshot.modelCalls.errorRate).toBe(0);
|
||||||
expect(snapshot.modelCalls.recentCalls).toEqual([]);
|
expect(snapshot.modelCalls.recentCalls).toEqual([]);
|
||||||
|
expect(snapshot.runStates).toEqual({
|
||||||
|
start: 0,
|
||||||
|
complete: 0,
|
||||||
|
cancel_requested: 0,
|
||||||
|
cancelled: 0,
|
||||||
|
error: 0,
|
||||||
|
});
|
||||||
|
expect(snapshot.cancelLatencyMs).toEqual({ sampleCount: 0, samples: [] });
|
||||||
|
expect(snapshot.reactions).toEqual({ matched: 0, skipped: 0, skipReasons: {} });
|
||||||
expect(snapshot.queueDepth).toBe(0);
|
expect(snapshot.queueDepth).toBe(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -33,6 +33,27 @@ export interface ActiveRequestInfo {
|
|||||||
durationMs: number;
|
durationMs: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type RunState = 'start' | 'complete' | 'cancel_requested' | 'cancelled' | 'error';
|
||||||
|
|
||||||
|
export interface RunStateCounters {
|
||||||
|
start: number;
|
||||||
|
complete: number;
|
||||||
|
cancel_requested: number;
|
||||||
|
cancelled: number;
|
||||||
|
error: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface CancelLatencySnapshot {
|
||||||
|
sampleCount: number;
|
||||||
|
samples: number[];
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ReactionDecisionCounters {
|
||||||
|
matched: number;
|
||||||
|
skipped: number;
|
||||||
|
skipReasons: Record<string, number>;
|
||||||
|
}
|
||||||
|
|
||||||
export interface MetricsSnapshot {
|
export interface MetricsSnapshot {
|
||||||
messagesProcessed: number;
|
messagesProcessed: number;
|
||||||
errors: number;
|
errors: number;
|
||||||
@@ -44,6 +65,9 @@ export interface MetricsSnapshot {
|
|||||||
errorRate: number;
|
errorRate: number;
|
||||||
recentCalls: ModelCallEntry[];
|
recentCalls: ModelCallEntry[];
|
||||||
};
|
};
|
||||||
|
runStates: RunStateCounters;
|
||||||
|
cancelLatencyMs: CancelLatencySnapshot;
|
||||||
|
reactions: ReactionDecisionCounters;
|
||||||
queueDepth: number;
|
queueDepth: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,6 +75,7 @@ export interface MetricsCollectorConfig {
|
|||||||
getQueueDepth?: () => number;
|
getQueueDepth?: () => number;
|
||||||
modelCallBufferSize?: number;
|
modelCallBufferSize?: number;
|
||||||
eventBufferSize?: number;
|
eventBufferSize?: number;
|
||||||
|
cancelLatencyBufferSize?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Implementation ───────────────────────────────────────────────
|
// ── Implementation ───────────────────────────────────────────────
|
||||||
@@ -76,11 +101,28 @@ export class MetricsCollector {
|
|||||||
private _activeRequests: Map<string, ActiveRequest> = new Map();
|
private _activeRequests: Map<string, ActiveRequest> = new Map();
|
||||||
private _getQueueDepth: () => number;
|
private _getQueueDepth: () => number;
|
||||||
|
|
||||||
|
private _runStateCounts: RunStateCounters = {
|
||||||
|
start: 0,
|
||||||
|
complete: 0,
|
||||||
|
cancel_requested: 0,
|
||||||
|
cancelled: 0,
|
||||||
|
error: 0,
|
||||||
|
};
|
||||||
|
private _cancelLatencies: number[] = [];
|
||||||
|
private _cancelLatencyBufferSize: number;
|
||||||
|
private _cancelLatencyTotal = 0;
|
||||||
|
private _reactionDecisions: ReactionDecisionCounters = {
|
||||||
|
matched: 0,
|
||||||
|
skipped: 0,
|
||||||
|
skipReasons: {},
|
||||||
|
};
|
||||||
|
|
||||||
constructor(config?: MetricsCollectorConfig) {
|
constructor(config?: MetricsCollectorConfig) {
|
||||||
this._startTime = Date.now();
|
this._startTime = Date.now();
|
||||||
this._getQueueDepth = config?.getQueueDepth ?? (() => 0);
|
this._getQueueDepth = config?.getQueueDepth ?? (() => 0);
|
||||||
this._modelCallBufferSize = config?.modelCallBufferSize ?? 200;
|
this._modelCallBufferSize = config?.modelCallBufferSize ?? 200;
|
||||||
this._eventBufferSize = config?.eventBufferSize ?? 500;
|
this._eventBufferSize = config?.eventBufferSize ?? 500;
|
||||||
|
this._cancelLatencyBufferSize = config?.cancelLatencyBufferSize ?? 200;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── Counters ─────────────────────────────────────────────────
|
// ── Counters ─────────────────────────────────────────────────
|
||||||
@@ -105,6 +147,60 @@ export class MetricsCollector {
|
|||||||
return this._activeRequestCount;
|
return this._activeRequestCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Run state + cancel latency ──────────────────────────────
|
||||||
|
|
||||||
|
recordRunState(state: RunState): void {
|
||||||
|
this._runStateCounts[state] += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
recordCancelLatency(latencyMs: number): void {
|
||||||
|
if (!Number.isFinite(latencyMs) || latencyMs < 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const normalized = Math.round(latencyMs);
|
||||||
|
this._cancelLatencies.push(normalized);
|
||||||
|
this._cancelLatencyTotal += 1;
|
||||||
|
if (this._cancelLatencies.length > this._cancelLatencyBufferSize) {
|
||||||
|
this._cancelLatencies.shift();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getRunStateCounters(): RunStateCounters {
|
||||||
|
return { ...this._runStateCounts };
|
||||||
|
}
|
||||||
|
|
||||||
|
getCancelLatencySnapshot(): CancelLatencySnapshot {
|
||||||
|
return {
|
||||||
|
sampleCount: this._cancelLatencyTotal,
|
||||||
|
samples: [...this._cancelLatencies],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Reaction decisions ──────────────────────────────────────
|
||||||
|
|
||||||
|
recordReactionDecision(decision: { matched: boolean; reason?: string }): void {
|
||||||
|
if (decision.matched) {
|
||||||
|
this._reactionDecisions.matched += 1;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this._reactionDecisions.skipped += 1;
|
||||||
|
if (decision.reason) {
|
||||||
|
const key = decision.reason.trim();
|
||||||
|
if (key.length > 0) {
|
||||||
|
this._reactionDecisions.skipReasons[key] = (this._reactionDecisions.skipReasons[key] ?? 0) + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getReactionDecisionCounters(): ReactionDecisionCounters {
|
||||||
|
return {
|
||||||
|
matched: this._reactionDecisions.matched,
|
||||||
|
skipped: this._reactionDecisions.skipped,
|
||||||
|
skipReasons: { ...this._reactionDecisions.skipReasons },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
// ── Model call ring buffer ───────────────────────────────────
|
// ── Model call ring buffer ───────────────────────────────────
|
||||||
|
|
||||||
recordModelCall(entry: ModelCallEntry): void {
|
recordModelCall(entry: ModelCallEntry): void {
|
||||||
@@ -204,6 +300,9 @@ export class MetricsCollector {
|
|||||||
errorRate: Math.round(errorRate * 10000) / 10000,
|
errorRate: Math.round(errorRate * 10000) / 10000,
|
||||||
recentCalls: calls.slice(-20),
|
recentCalls: calls.slice(-20),
|
||||||
},
|
},
|
||||||
|
runStates: this.getRunStateCounters(),
|
||||||
|
cancelLatencyMs: this.getCancelLatencySnapshot(),
|
||||||
|
reactions: this.getReactionDecisionCounters(),
|
||||||
queueDepth: this._getQueueDepth(),
|
queueDepth: this._getQueueDepth(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user