feat(03-01): create MetricsCollector and wire into gateway
- Add MetricsCollector class with counters, model call ring buffer, event ring buffer, and active request tracking - Add system.metrics, system.events, system.activeRequests RPC handlers - Add GET /health unauthenticated HTTP endpoint for Docker HEALTHCHECK - Add totalPending() to LaneQueue for queue depth metrics - Add 20 tests for MetricsCollector
This commit is contained in:
@@ -0,0 +1,210 @@
|
||||
/**
|
||||
* MetricsCollector — single source of truth for all ops metrics.
|
||||
*
|
||||
* Tracks counters (messages, errors, active requests), model call latency/tokens,
|
||||
* event stream, and active request durations. All synchronous, no external deps.
|
||||
*/
|
||||
|
||||
// ── Types ────────────────────────────────────────────────────────
|
||||
|
||||
export interface ModelCallEntry {
|
||||
timestamp: number;
|
||||
provider: string;
|
||||
latency: number;
|
||||
inputTokens: number;
|
||||
outputTokens: number;
|
||||
tokensPerSec: number;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
export interface EventEntry {
|
||||
timestamp: number;
|
||||
level: 'info' | 'warn' | 'error';
|
||||
source: string;
|
||||
message: string;
|
||||
context?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface ActiveRequestInfo {
|
||||
id: string;
|
||||
sessionId: string;
|
||||
channel: string;
|
||||
startedAt: number;
|
||||
durationMs: number;
|
||||
}
|
||||
|
||||
export interface MetricsSnapshot {
|
||||
messagesProcessed: number;
|
||||
errors: number;
|
||||
activeRequests: number;
|
||||
uptime: number;
|
||||
modelCalls: {
|
||||
total: number;
|
||||
avgLatency: number;
|
||||
errorRate: number;
|
||||
recentCalls: ModelCallEntry[];
|
||||
};
|
||||
queueDepth: number;
|
||||
}
|
||||
|
||||
export interface MetricsCollectorConfig {
|
||||
getQueueDepth?: () => number;
|
||||
modelCallBufferSize?: number;
|
||||
eventBufferSize?: number;
|
||||
}
|
||||
|
||||
// ── Implementation ───────────────────────────────────────────────
|
||||
|
||||
interface ActiveRequest {
|
||||
sessionId: string;
|
||||
channel: string;
|
||||
startedAt: number;
|
||||
}
|
||||
|
||||
export class MetricsCollector {
|
||||
private _messagesProcessed = 0;
|
||||
private _errors = 0;
|
||||
private _activeRequestCount = 0;
|
||||
private _startTime: number;
|
||||
|
||||
private _modelCalls: ModelCallEntry[] = [];
|
||||
private _modelCallBufferSize: number;
|
||||
|
||||
private _events: EventEntry[] = [];
|
||||
private _eventBufferSize: number;
|
||||
|
||||
private _activeRequests: Map<string, ActiveRequest> = new Map();
|
||||
private _getQueueDepth: () => number;
|
||||
|
||||
constructor(config?: MetricsCollectorConfig) {
|
||||
this._startTime = Date.now();
|
||||
this._getQueueDepth = config?.getQueueDepth ?? (() => 0);
|
||||
this._modelCallBufferSize = config?.modelCallBufferSize ?? 200;
|
||||
this._eventBufferSize = config?.eventBufferSize ?? 500;
|
||||
}
|
||||
|
||||
// ── Counters ─────────────────────────────────────────────────
|
||||
|
||||
incrementMessages(): void {
|
||||
this._messagesProcessed++;
|
||||
}
|
||||
|
||||
incrementErrors(): void {
|
||||
this._errors++;
|
||||
}
|
||||
|
||||
get messagesProcessed(): number {
|
||||
return this._messagesProcessed;
|
||||
}
|
||||
|
||||
get errors(): number {
|
||||
return this._errors;
|
||||
}
|
||||
|
||||
get activeRequestCount(): number {
|
||||
return this._activeRequestCount;
|
||||
}
|
||||
|
||||
// ── Model call ring buffer ───────────────────────────────────
|
||||
|
||||
recordModelCall(entry: ModelCallEntry): void {
|
||||
this._modelCalls.push(entry);
|
||||
if (this._modelCalls.length > this._modelCallBufferSize) {
|
||||
this._modelCalls.shift();
|
||||
}
|
||||
}
|
||||
|
||||
getModelMetrics(): ModelCallEntry[] {
|
||||
return [...this._modelCalls];
|
||||
}
|
||||
|
||||
// ── Event ring buffer ────────────────────────────────────────
|
||||
|
||||
recordEvent(event: EventEntry): void {
|
||||
this._events.push(event);
|
||||
if (this._events.length > this._eventBufferSize) {
|
||||
this._events.shift();
|
||||
}
|
||||
}
|
||||
|
||||
getEvents(opts?: { level?: string; limit?: number }): EventEntry[] {
|
||||
let filtered = this._events;
|
||||
|
||||
if (opts?.level) {
|
||||
filtered = filtered.filter(e => e.level === opts.level);
|
||||
}
|
||||
|
||||
// Newest first
|
||||
const reversed = [...filtered].reverse();
|
||||
|
||||
if (opts?.limit && opts.limit > 0) {
|
||||
return reversed.slice(0, opts.limit);
|
||||
}
|
||||
|
||||
return reversed;
|
||||
}
|
||||
|
||||
// ── Active request tracking ──────────────────────────────────
|
||||
|
||||
startRequest(id: string, info: { sessionId: string; channel: string }): void {
|
||||
this._activeRequests.set(id, {
|
||||
sessionId: info.sessionId,
|
||||
channel: info.channel,
|
||||
startedAt: Date.now(),
|
||||
});
|
||||
this._activeRequestCount++;
|
||||
}
|
||||
|
||||
endRequest(id: string): void {
|
||||
if (this._activeRequests.delete(id)) {
|
||||
this._activeRequestCount--;
|
||||
}
|
||||
}
|
||||
|
||||
getActiveRequests(): ActiveRequestInfo[] {
|
||||
const now = Date.now();
|
||||
const result: ActiveRequestInfo[] = [];
|
||||
|
||||
for (const [id, req] of this._activeRequests) {
|
||||
result.push({
|
||||
id,
|
||||
sessionId: req.sessionId,
|
||||
channel: req.channel,
|
||||
startedAt: req.startedAt,
|
||||
durationMs: now - req.startedAt,
|
||||
});
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// ── Snapshot ─────────────────────────────────────────────────
|
||||
|
||||
getSnapshot(): MetricsSnapshot {
|
||||
const calls = this._modelCalls;
|
||||
const totalCalls = calls.length;
|
||||
const errorCalls = calls.filter(c => c.error).length;
|
||||
|
||||
const avgLatency = totalCalls > 0
|
||||
? calls.reduce((sum, c) => sum + c.latency, 0) / totalCalls
|
||||
: 0;
|
||||
|
||||
const errorRate = totalCalls > 0
|
||||
? errorCalls / totalCalls
|
||||
: 0;
|
||||
|
||||
return {
|
||||
messagesProcessed: this._messagesProcessed,
|
||||
errors: this._errors,
|
||||
activeRequests: this._activeRequestCount,
|
||||
uptime: Math.floor((Date.now() - this._startTime) / 1000),
|
||||
modelCalls: {
|
||||
total: totalCalls,
|
||||
avgLatency: Math.round(avgLatency),
|
||||
errorRate: Math.round(errorRate * 10000) / 10000,
|
||||
recentCalls: calls.slice(-20),
|
||||
},
|
||||
queueDepth: this._getQueueDepth(),
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user