From bd1880a44ca76b04b339b336cb440e0f9f8b7088 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 9 Feb 2026 21:28:05 -0800 Subject: [PATCH] feat(03-01): create MetricsCollector and wire into gateway - Add MetricsCollector class with counters, model call ring buffer, event ring buffer, and active request tracking - Add system.metrics, system.events, system.activeRequests RPC handlers - Add GET /health unauthenticated HTTP endpoint for Docker HEALTHCHECK - Add totalPending() to LaneQueue for queue depth metrics - Add 20 tests for MetricsCollector --- src/gateway/handlers/agent.ts | 2 + src/gateway/handlers/system.ts | 30 ++++ src/gateway/lane-queue.ts | 9 ++ src/gateway/metrics.test.ts | 254 +++++++++++++++++++++++++++++++++ src/gateway/metrics.ts | 210 +++++++++++++++++++++++++++ src/gateway/server.ts | 31 ++++ 6 files changed, 536 insertions(+) create mode 100644 src/gateway/metrics.test.ts create mode 100644 src/gateway/metrics.ts diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 2768e27..edc3c3e 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -3,11 +3,13 @@ import type { SendFn } from '../router.js'; import { makeEvent, makeError, ErrorCode } from '../protocol.js'; import type { SessionBridge } from '../session-bridge.js'; import type { LaneQueue } from '../lane-queue.js'; +import type { MetricsCollector } from '../metrics.js'; import type { Attachment } from '../../channels/types.js'; export interface AgentHandlerDeps { sessionBridge: SessionBridge; laneQueue: LaneQueue; + metrics?: MetricsCollector; } export function createAgentHandlers(deps: AgentHandlerDeps) { diff --git a/src/gateway/handlers/system.ts b/src/gateway/handlers/system.ts index 4a805a8..bb15381 100644 --- a/src/gateway/handlers/system.ts +++ b/src/gateway/handlers/system.ts @@ -1,5 +1,6 @@ import type { GatewayRequest, OutboundMessage } from '../protocol.js'; import { makeResponse, makeError, ErrorCode } from '../protocol.js'; +import type { MetricsSnapshot, EventEntry, ActiveRequestInfo } from '../metrics.js'; /** Per-session token usage report returned by system.tokenUsage. */ export interface TokenUsageEntry { @@ -21,6 +22,12 @@ export interface SystemHandlerDeps { getUsage?: () => { totalSessions: number; activeConnections: number }; /** Optional callback to retrieve per-session token usage data. */ getTokenUsage?: () => TokenUsageEntry[]; + /** Optional callback to retrieve aggregated metrics snapshot. */ + getMetrics?: () => MetricsSnapshot; + /** Optional callback to retrieve recent events. */ + getEvents?: (opts?: { level?: string; limit?: number }) => EventEntry[]; + /** Optional callback to retrieve active requests. */ + getActiveRequests?: () => ActiveRequestInfo[]; } export function createSystemHandlers(deps: SystemHandlerDeps) { @@ -75,5 +82,28 @@ export function createSystemHandlers(deps: SystemHandlerDeps) { const sessions = deps.getTokenUsage?.() ?? []; return makeResponse(request.id, { sessions }); }, + + 'system.metrics': async (request: GatewayRequest): Promise => { + if (!deps.getMetrics) { + return makeResponse(request.id, {}); + } + return makeResponse(request.id, deps.getMetrics()); + }, + + 'system.events': async (request: GatewayRequest): Promise => { + if (!deps.getEvents) { + return makeResponse(request.id, { events: [] }); + } + const params = request.params as { level?: string; limit?: number } | undefined; + const events = deps.getEvents({ level: params?.level, limit: params?.limit }); + return makeResponse(request.id, { events }); + }, + + 'system.activeRequests': async (request: GatewayRequest): Promise => { + if (!deps.getActiveRequests) { + return makeResponse(request.id, { requests: [] }); + } + return makeResponse(request.id, { requests: deps.getActiveRequests() }); + }, }; } diff --git a/src/gateway/lane-queue.ts b/src/gateway/lane-queue.ts index a08bdf4..f9109da 100644 --- a/src/gateway/lane-queue.ts +++ b/src/gateway/lane-queue.ts @@ -67,6 +67,15 @@ export class LaneQueue { return this.lanes.get(laneId)?.queue.length ?? 0; } + /** Get the total number of pending items across all lanes. */ + totalPending(): number { + let total = 0; + for (const lane of this.lanes.values()) { + total += lane.queue.length; + } + return total; + } + /** * Cancel all pending entries in a lane. * Active work is NOT interrupted — only queued items are rejected. diff --git a/src/gateway/metrics.test.ts b/src/gateway/metrics.test.ts new file mode 100644 index 0000000..6cf21e9 --- /dev/null +++ b/src/gateway/metrics.test.ts @@ -0,0 +1,254 @@ +import { describe, it, expect, beforeEach } from 'vitest'; +import { MetricsCollector } from './metrics.js'; +import type { ModelCallEntry, EventEntry } from './metrics.js'; + +describe('MetricsCollector', () => { + let collector: MetricsCollector; + + beforeEach(() => { + collector = new MetricsCollector(); + }); + + describe('counters', () => { + it('starts with zero counters', () => { + expect(collector.messagesProcessed).toBe(0); + expect(collector.errors).toBe(0); + expect(collector.activeRequestCount).toBe(0); + }); + + it('increments messages processed', () => { + collector.incrementMessages(); + collector.incrementMessages(); + expect(collector.messagesProcessed).toBe(2); + }); + + it('increments errors', () => { + collector.incrementErrors(); + expect(collector.errors).toBe(1); + }); + }); + + describe('model call ring buffer', () => { + function makeCall(overrides?: Partial): ModelCallEntry { + return { + timestamp: Date.now(), + provider: 'anthropic', + latency: 500, + inputTokens: 100, + outputTokens: 50, + tokensPerSec: 100, + ...overrides, + }; + } + + it('records model calls', () => { + collector.recordModelCall(makeCall()); + expect(collector.getModelMetrics()).toHaveLength(1); + }); + + it('enforces max buffer size (default 200)', () => { + for (let i = 0; i < 210; i++) { + collector.recordModelCall(makeCall({ latency: i })); + } + const calls = collector.getModelMetrics(); + expect(calls).toHaveLength(200); + // First entry should be index 10 (the first 10 were evicted) + expect(calls[0].latency).toBe(10); + }); + + it('respects custom buffer size', () => { + const small = new MetricsCollector({ modelCallBufferSize: 5 }); + for (let i = 0; i < 8; i++) { + small.recordModelCall(makeCall({ latency: i })); + } + const calls = small.getModelMetrics(); + expect(calls).toHaveLength(5); + expect(calls[0].latency).toBe(3); + }); + + it('returns a copy, not the internal array', () => { + collector.recordModelCall(makeCall()); + const a = collector.getModelMetrics(); + const b = collector.getModelMetrics(); + expect(a).not.toBe(b); + }); + }); + + describe('event ring buffer', () => { + function makeEvent(overrides?: Partial): EventEntry { + return { + timestamp: Date.now(), + level: 'info', + source: 'test', + message: 'test event', + ...overrides, + }; + } + + it('records events', () => { + collector.recordEvent(makeEvent()); + expect(collector.getEvents()).toHaveLength(1); + }); + + it('enforces max buffer size (default 500)', () => { + for (let i = 0; i < 510; i++) { + collector.recordEvent(makeEvent({ message: `event-${i}` })); + } + const events = collector.getEvents(); + expect(events).toHaveLength(500); + }); + + it('returns events newest first', () => { + collector.recordEvent(makeEvent({ message: 'first', timestamp: 1000 })); + collector.recordEvent(makeEvent({ message: 'second', timestamp: 2000 })); + collector.recordEvent(makeEvent({ message: 'third', timestamp: 3000 })); + + const events = collector.getEvents(); + expect(events[0].message).toBe('third'); + expect(events[2].message).toBe('first'); + }); + + it('filters by level', () => { + collector.recordEvent(makeEvent({ level: 'info', message: 'info-1' })); + collector.recordEvent(makeEvent({ level: 'error', message: 'error-1' })); + collector.recordEvent(makeEvent({ level: 'info', message: 'info-2' })); + collector.recordEvent(makeEvent({ level: 'warn', message: 'warn-1' })); + + const errors = collector.getEvents({ level: 'error' }); + expect(errors).toHaveLength(1); + expect(errors[0].message).toBe('error-1'); + + const infos = collector.getEvents({ level: 'info' }); + expect(infos).toHaveLength(2); + }); + + it('limits results', () => { + for (let i = 0; i < 10; i++) { + collector.recordEvent(makeEvent({ message: `event-${i}` })); + } + + const limited = collector.getEvents({ limit: 3 }); + expect(limited).toHaveLength(3); + // Should be the 3 newest + expect(limited[0].message).toBe('event-9'); + expect(limited[2].message).toBe('event-7'); + }); + + it('combines level filter and limit', () => { + for (let i = 0; i < 10; i++) { + collector.recordEvent(makeEvent({ level: i % 2 === 0 ? 'error' : 'info', message: `event-${i}` })); + } + + const result = collector.getEvents({ level: 'error', limit: 2 }); + expect(result).toHaveLength(2); + expect(result[0].message).toBe('event-8'); + expect(result[1].message).toBe('event-6'); + }); + }); + + describe('active request tracking', () => { + it('tracks start and end of requests', () => { + collector.startRequest('req-1', { sessionId: 'ws:abc', channel: 'ws' }); + expect(collector.activeRequestCount).toBe(1); + + const active = collector.getActiveRequests(); + expect(active).toHaveLength(1); + expect(active[0].id).toBe('req-1'); + expect(active[0].sessionId).toBe('ws:abc'); + expect(active[0].channel).toBe('ws'); + expect(active[0].durationMs).toBeGreaterThanOrEqual(0); + + collector.endRequest('req-1'); + expect(collector.activeRequestCount).toBe(0); + expect(collector.getActiveRequests()).toHaveLength(0); + }); + + it('handles ending non-existent request', () => { + collector.endRequest('nonexistent'); + expect(collector.activeRequestCount).toBe(0); + }); + + it('tracks multiple concurrent requests', () => { + collector.startRequest('req-1', { sessionId: 'ws:a', channel: 'ws' }); + collector.startRequest('req-2', { sessionId: 'tg:b', channel: 'telegram' }); + expect(collector.activeRequestCount).toBe(2); + expect(collector.getActiveRequests()).toHaveLength(2); + + collector.endRequest('req-1'); + expect(collector.activeRequestCount).toBe(1); + expect(collector.getActiveRequests()[0].id).toBe('req-2'); + }); + }); + + describe('getSnapshot', () => { + it('returns correct shape with zero data', () => { + const snapshot = collector.getSnapshot(); + + expect(snapshot.messagesProcessed).toBe(0); + expect(snapshot.errors).toBe(0); + expect(snapshot.activeRequests).toBe(0); + expect(typeof snapshot.uptime).toBe('number'); + expect(snapshot.uptime).toBeGreaterThanOrEqual(0); + expect(snapshot.modelCalls.total).toBe(0); + expect(snapshot.modelCalls.avgLatency).toBe(0); + expect(snapshot.modelCalls.errorRate).toBe(0); + expect(snapshot.modelCalls.recentCalls).toEqual([]); + expect(snapshot.queueDepth).toBe(0); + }); + + it('reflects accumulated data', () => { + collector.incrementMessages(); + collector.incrementMessages(); + collector.incrementErrors(); + collector.recordModelCall({ + timestamp: Date.now(), + provider: 'anthropic', + latency: 200, + inputTokens: 100, + outputTokens: 50, + tokensPerSec: 250, + }); + collector.recordModelCall({ + timestamp: Date.now(), + provider: 'openai', + latency: 400, + inputTokens: 200, + outputTokens: 100, + tokensPerSec: 250, + error: 'rate limit', + }); + + const snapshot = collector.getSnapshot(); + expect(snapshot.messagesProcessed).toBe(2); + expect(snapshot.errors).toBe(1); + expect(snapshot.modelCalls.total).toBe(2); + expect(snapshot.modelCalls.avgLatency).toBe(300); + expect(snapshot.modelCalls.errorRate).toBe(0.5); + expect(snapshot.modelCalls.recentCalls).toHaveLength(2); + }); + + it('uses getQueueDepth callback', () => { + const withQueue = new MetricsCollector({ getQueueDepth: () => 5 }); + const snapshot = withQueue.getSnapshot(); + expect(snapshot.queueDepth).toBe(5); + }); + + it('limits recentCalls in snapshot to 20', () => { + for (let i = 0; i < 30; i++) { + collector.recordModelCall({ + timestamp: Date.now(), + provider: 'anthropic', + latency: i * 10, + inputTokens: 100, + outputTokens: 50, + tokensPerSec: 100, + }); + } + + const snapshot = collector.getSnapshot(); + expect(snapshot.modelCalls.recentCalls).toHaveLength(20); + // Should be the last 20 + expect(snapshot.modelCalls.recentCalls[0].latency).toBe(100); + }); + }); +}); diff --git a/src/gateway/metrics.ts b/src/gateway/metrics.ts new file mode 100644 index 0000000..6d0ae71 --- /dev/null +++ b/src/gateway/metrics.ts @@ -0,0 +1,210 @@ +/** + * MetricsCollector — single source of truth for all ops metrics. + * + * Tracks counters (messages, errors, active requests), model call latency/tokens, + * event stream, and active request durations. All synchronous, no external deps. + */ + +// ── Types ──────────────────────────────────────────────────────── + +export interface ModelCallEntry { + timestamp: number; + provider: string; + latency: number; + inputTokens: number; + outputTokens: number; + tokensPerSec: number; + error?: string; +} + +export interface EventEntry { + timestamp: number; + level: 'info' | 'warn' | 'error'; + source: string; + message: string; + context?: Record; +} + +export interface ActiveRequestInfo { + id: string; + sessionId: string; + channel: string; + startedAt: number; + durationMs: number; +} + +export interface MetricsSnapshot { + messagesProcessed: number; + errors: number; + activeRequests: number; + uptime: number; + modelCalls: { + total: number; + avgLatency: number; + errorRate: number; + recentCalls: ModelCallEntry[]; + }; + queueDepth: number; +} + +export interface MetricsCollectorConfig { + getQueueDepth?: () => number; + modelCallBufferSize?: number; + eventBufferSize?: number; +} + +// ── Implementation ─────────────────────────────────────────────── + +interface ActiveRequest { + sessionId: string; + channel: string; + startedAt: number; +} + +export class MetricsCollector { + private _messagesProcessed = 0; + private _errors = 0; + private _activeRequestCount = 0; + private _startTime: number; + + private _modelCalls: ModelCallEntry[] = []; + private _modelCallBufferSize: number; + + private _events: EventEntry[] = []; + private _eventBufferSize: number; + + private _activeRequests: Map = new Map(); + private _getQueueDepth: () => number; + + constructor(config?: MetricsCollectorConfig) { + this._startTime = Date.now(); + this._getQueueDepth = config?.getQueueDepth ?? (() => 0); + this._modelCallBufferSize = config?.modelCallBufferSize ?? 200; + this._eventBufferSize = config?.eventBufferSize ?? 500; + } + + // ── Counters ───────────────────────────────────────────────── + + incrementMessages(): void { + this._messagesProcessed++; + } + + incrementErrors(): void { + this._errors++; + } + + get messagesProcessed(): number { + return this._messagesProcessed; + } + + get errors(): number { + return this._errors; + } + + get activeRequestCount(): number { + return this._activeRequestCount; + } + + // ── Model call ring buffer ─────────────────────────────────── + + recordModelCall(entry: ModelCallEntry): void { + this._modelCalls.push(entry); + if (this._modelCalls.length > this._modelCallBufferSize) { + this._modelCalls.shift(); + } + } + + getModelMetrics(): ModelCallEntry[] { + return [...this._modelCalls]; + } + + // ── Event ring buffer ──────────────────────────────────────── + + recordEvent(event: EventEntry): void { + this._events.push(event); + if (this._events.length > this._eventBufferSize) { + this._events.shift(); + } + } + + getEvents(opts?: { level?: string; limit?: number }): EventEntry[] { + let filtered = this._events; + + if (opts?.level) { + filtered = filtered.filter(e => e.level === opts.level); + } + + // Newest first + const reversed = [...filtered].reverse(); + + if (opts?.limit && opts.limit > 0) { + return reversed.slice(0, opts.limit); + } + + return reversed; + } + + // ── Active request tracking ────────────────────────────────── + + startRequest(id: string, info: { sessionId: string; channel: string }): void { + this._activeRequests.set(id, { + sessionId: info.sessionId, + channel: info.channel, + startedAt: Date.now(), + }); + this._activeRequestCount++; + } + + endRequest(id: string): void { + if (this._activeRequests.delete(id)) { + this._activeRequestCount--; + } + } + + getActiveRequests(): ActiveRequestInfo[] { + const now = Date.now(); + const result: ActiveRequestInfo[] = []; + + for (const [id, req] of this._activeRequests) { + result.push({ + id, + sessionId: req.sessionId, + channel: req.channel, + startedAt: req.startedAt, + durationMs: now - req.startedAt, + }); + } + + return result; + } + + // ── Snapshot ───────────────────────────────────────────────── + + getSnapshot(): MetricsSnapshot { + const calls = this._modelCalls; + const totalCalls = calls.length; + const errorCalls = calls.filter(c => c.error).length; + + const avgLatency = totalCalls > 0 + ? calls.reduce((sum, c) => sum + c.latency, 0) / totalCalls + : 0; + + const errorRate = totalCalls > 0 + ? errorCalls / totalCalls + : 0; + + return { + messagesProcessed: this._messagesProcessed, + errors: this._errors, + activeRequests: this._activeRequestCount, + uptime: Math.floor((Date.now() - this._startTime) / 1000), + modelCalls: { + total: totalCalls, + avgLatency: Math.round(avgLatency), + errorRate: Math.round(errorRate * 10000) / 10000, + recentCalls: calls.slice(-20), + }, + queueDepth: this._getQueueDepth(), + }; + } +} diff --git a/src/gateway/server.ts b/src/gateway/server.ts index 4c67ca1..4bf2a14 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -6,6 +6,7 @@ import { serveStatic } from './static.js'; import { SessionBridge } from './session-bridge.js'; import type { SessionBridgeConfig } from './session-bridge.js'; import { LaneQueue } from './lane-queue.js'; +import { MetricsCollector } from './metrics.js'; import { authenticateRequest } from './auth.js'; import type { AuthConfig } from './auth.js'; import { @@ -67,6 +68,7 @@ export class GatewayServer { private router: Router; private sessionBridge: SessionBridge; private laneQueue: LaneQueue; + private metrics: MetricsCollector; private connectionMap: Map = new Map(); private config: GatewayServerConfig; private startTime: number = Date.now(); @@ -83,6 +85,9 @@ export class GatewayServer { }); this.laneQueue = new LaneQueue(); + this.metrics = new MetricsCollector({ + getQueueDepth: () => this.laneQueue.totalPending(), + }); this.router = new Router(); this.registerHandlers(); } @@ -103,6 +108,9 @@ export class GatewayServer { activeConnections: this.sessionBridge.connectionCount, }), getTokenUsage: this.config.getTokenUsage, + getMetrics: () => this.metrics.getSnapshot(), + getEvents: (opts) => this.metrics.getEvents(opts), + getActiveRequests: () => this.metrics.getActiveRequests(), }); const sessionHandlers = createSessionHandlers({ @@ -118,6 +126,7 @@ export class GatewayServer { const agentHandlers = createAgentHandlers({ sessionBridge: this.sessionBridge, laneQueue: this.laneQueue, + metrics: this.metrics, }); // Config handlers (only if config object is provided) @@ -264,6 +273,23 @@ export class GatewayServer { } } + // Health endpoint — unauthenticated for Docker HEALTHCHECK / monitoring + if (req.method === 'GET' && req.url === '/health') { + const channelList = this.config.channelRegistry?.list().map(a => a.name) ?? []; + const body = JSON.stringify({ + status: 'ok', + uptime: Math.floor((Date.now() - this.startTime) / 1000), + version: this.config.version ?? '0.1.0', + sessions: this.sessionBridge.listSessions().length, + connections: this.sessionBridge.connectionCount, + tools: this.config.toolRegistry.list().length, + channels: channelList, + }); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(body); + return; + } + // Gmail Pub/Sub push route — bypass gateway auth (Google sends push notifications directly) if (this.config.gmailHandler && req.method === 'POST' && req.url?.startsWith('/gmail/push')) { try { @@ -350,6 +376,11 @@ export class GatewayServer { return this.sessionBridge; } + /** Get the metrics collector (for external wiring). */ + getMetrics(): MetricsCollector { + return this.metrics; + } + /** Get list of registered methods. */ getMethods(): string[] { return this.router.listMethods();