Files
flynn/src/gateway/metrics.ts
T
William Valentin bd1880a44c feat(03-01): create MetricsCollector and wire into gateway
- Add MetricsCollector class with counters, model call ring buffer, event ring buffer, and active request tracking
- Add system.metrics, system.events, system.activeRequests RPC handlers
- Add GET /health unauthenticated HTTP endpoint for Docker HEALTHCHECK
- Add totalPending() to LaneQueue for queue depth metrics
- Add 20 tests for MetricsCollector
2026-02-09 21:28:05 -08:00

211 lines
5.6 KiB
TypeScript

/**
* MetricsCollector — single source of truth for all ops metrics.
*
* Tracks counters (messages, errors, active requests), model call latency/tokens,
* event stream, and active request durations. All synchronous, no external deps.
*/
// ── Types ────────────────────────────────────────────────────────
export interface ModelCallEntry {
timestamp: number;
provider: string;
latency: number;
inputTokens: number;
outputTokens: number;
tokensPerSec: number;
error?: string;
}
export interface EventEntry {
timestamp: number;
level: 'info' | 'warn' | 'error';
source: string;
message: string;
context?: Record<string, unknown>;
}
export interface ActiveRequestInfo {
id: string;
sessionId: string;
channel: string;
startedAt: number;
durationMs: number;
}
export interface MetricsSnapshot {
messagesProcessed: number;
errors: number;
activeRequests: number;
uptime: number;
modelCalls: {
total: number;
avgLatency: number;
errorRate: number;
recentCalls: ModelCallEntry[];
};
queueDepth: number;
}
export interface MetricsCollectorConfig {
getQueueDepth?: () => number;
modelCallBufferSize?: number;
eventBufferSize?: number;
}
// ── Implementation ───────────────────────────────────────────────
interface ActiveRequest {
sessionId: string;
channel: string;
startedAt: number;
}
export class MetricsCollector {
private _messagesProcessed = 0;
private _errors = 0;
private _activeRequestCount = 0;
private _startTime: number;
private _modelCalls: ModelCallEntry[] = [];
private _modelCallBufferSize: number;
private _events: EventEntry[] = [];
private _eventBufferSize: number;
private _activeRequests: Map<string, ActiveRequest> = new Map();
private _getQueueDepth: () => number;
constructor(config?: MetricsCollectorConfig) {
this._startTime = Date.now();
this._getQueueDepth = config?.getQueueDepth ?? (() => 0);
this._modelCallBufferSize = config?.modelCallBufferSize ?? 200;
this._eventBufferSize = config?.eventBufferSize ?? 500;
}
// ── Counters ─────────────────────────────────────────────────
incrementMessages(): void {
this._messagesProcessed++;
}
incrementErrors(): void {
this._errors++;
}
get messagesProcessed(): number {
return this._messagesProcessed;
}
get errors(): number {
return this._errors;
}
get activeRequestCount(): number {
return this._activeRequestCount;
}
// ── Model call ring buffer ───────────────────────────────────
recordModelCall(entry: ModelCallEntry): void {
this._modelCalls.push(entry);
if (this._modelCalls.length > this._modelCallBufferSize) {
this._modelCalls.shift();
}
}
getModelMetrics(): ModelCallEntry[] {
return [...this._modelCalls];
}
// ── Event ring buffer ────────────────────────────────────────
recordEvent(event: EventEntry): void {
this._events.push(event);
if (this._events.length > this._eventBufferSize) {
this._events.shift();
}
}
getEvents(opts?: { level?: string; limit?: number }): EventEntry[] {
let filtered = this._events;
if (opts?.level) {
filtered = filtered.filter(e => e.level === opts.level);
}
// Newest first
const reversed = [...filtered].reverse();
if (opts?.limit && opts.limit > 0) {
return reversed.slice(0, opts.limit);
}
return reversed;
}
// ── Active request tracking ──────────────────────────────────
startRequest(id: string, info: { sessionId: string; channel: string }): void {
this._activeRequests.set(id, {
sessionId: info.sessionId,
channel: info.channel,
startedAt: Date.now(),
});
this._activeRequestCount++;
}
endRequest(id: string): void {
if (this._activeRequests.delete(id)) {
this._activeRequestCount--;
}
}
getActiveRequests(): ActiveRequestInfo[] {
const now = Date.now();
const result: ActiveRequestInfo[] = [];
for (const [id, req] of this._activeRequests) {
result.push({
id,
sessionId: req.sessionId,
channel: req.channel,
startedAt: req.startedAt,
durationMs: now - req.startedAt,
});
}
return result;
}
// ── Snapshot ─────────────────────────────────────────────────
getSnapshot(): MetricsSnapshot {
const calls = this._modelCalls;
const totalCalls = calls.length;
const errorCalls = calls.filter(c => c.error).length;
const avgLatency = totalCalls > 0
? calls.reduce((sum, c) => sum + c.latency, 0) / totalCalls
: 0;
const errorRate = totalCalls > 0
? errorCalls / totalCalls
: 0;
return {
messagesProcessed: this._messagesProcessed,
errors: this._errors,
activeRequests: this._activeRequestCount,
uptime: Math.floor((Date.now() - this._startTime) / 1000),
modelCalls: {
total: totalCalls,
avgLatency: Math.round(avgLatency),
errorRate: Math.round(errorRate * 10000) / 10000,
recentCalls: calls.slice(-20),
},
queueDepth: this._getQueueDepth(),
};
}
}