514 lines
17 KiB
TypeScript
514 lines
17 KiB
TypeScript
import { statfsSync, accessSync, constants as fsConstants } from 'fs';
|
|
import { request } from 'http';
|
|
import type { HeartbeatConfig, HeartbeatCheck } from '../config/schema.js';
|
|
import type { ChannelAdapter, OutboundMessage } from '../channels/types.js';
|
|
import { getBackupHealthSnapshot, type BackupHealthSnapshot } from '../backup/index.js';
|
|
import { auditLogger } from '../audit/index.js';
|
|
|
|
/** Result of a single health check. */
|
|
export interface CheckResult {
|
|
name: HeartbeatCheck;
|
|
healthy: boolean;
|
|
message: string;
|
|
durationMs: number;
|
|
}
|
|
|
|
/** Result of a full heartbeat cycle. */
|
|
export interface HeartbeatResult {
|
|
healthy: boolean;
|
|
checks: CheckResult[];
|
|
timestamp: number;
|
|
}
|
|
|
|
/** Minimal interface for sending notifications via a channel. */
|
|
interface ChannelLookup {
|
|
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
|
|
}
|
|
|
|
/** Minimal interface for listing channel adapters. */
|
|
interface ChannelLister {
|
|
list(): ChannelAdapter[];
|
|
}
|
|
|
|
/** Dependencies injected into HeartbeatMonitor. */
|
|
export interface HeartbeatDeps {
|
|
config: HeartbeatConfig;
|
|
getGatewayPort: () => number;
|
|
modelRouter: { getTier(): string } | undefined;
|
|
channelLister: ChannelLister;
|
|
memoryDir: string | undefined;
|
|
dataDir: string;
|
|
channelLookup: ChannelLookup;
|
|
processMemoryUsageMb?: () => number;
|
|
backupHealthProvider?: () => BackupHealthSnapshot;
|
|
getModelCalls?: () => Array<{ provider: string; error?: string }>;
|
|
}
|
|
|
|
/**
|
|
* Parse a human-friendly interval string into milliseconds.
|
|
* Supports: '60s', '5m', '1h'. Bare numbers are treated as seconds.
|
|
*/
|
|
export function parseInterval(interval: string): number {
|
|
const match = interval.trim().match(/^(\d+(?:\.\d+)?)\s*(s|m|h)?$/i);
|
|
if (!match) {
|
|
throw new Error(`Invalid interval format: '${interval}'. Use e.g. '60s', '5m', '1h'.`);
|
|
}
|
|
|
|
const value = parseFloat(match[1]);
|
|
const unit = (match[2] ?? 's').toLowerCase();
|
|
|
|
switch (unit) {
|
|
case 's': return Math.round(value * 1000);
|
|
case 'm': return Math.round(value * 60 * 1000);
|
|
case 'h': return Math.round(value * 60 * 60 * 1000);
|
|
default: return Math.round(value * 1000);
|
|
}
|
|
}
|
|
|
|
export class HeartbeatMonitor {
|
|
private timer: ReturnType<typeof setInterval> | undefined;
|
|
private lastResult: HeartbeatResult | undefined;
|
|
private consecutiveFailures = 0;
|
|
private failureAlertSentForCurrentIncident = false;
|
|
private failureAlertProcessedForCurrentIncident = false;
|
|
private lastFailureNotificationAt = 0;
|
|
private lastFailureSignature = '';
|
|
private lastRecoveryNotificationAt = 0;
|
|
private readonly deps: HeartbeatDeps;
|
|
|
|
constructor(deps: HeartbeatDeps) {
|
|
this.deps = deps;
|
|
}
|
|
|
|
/** Start the heartbeat monitor. Does nothing if disabled. */
|
|
start(): void {
|
|
if (!this.deps.config.enabled) {return;}
|
|
|
|
const intervalMs = parseInterval(this.deps.config.interval);
|
|
console.log(`HeartbeatMonitor: starting (interval=${this.deps.config.interval}, checks=[${this.deps.config.checks.join(', ')}])`);
|
|
auditLogger?.systemStart('HeartbeatMonitor', { interval: this.deps.config.interval, checks: this.deps.config.checks });
|
|
|
|
this.timer = setInterval(() => {
|
|
this.runChecks().catch((err) => {
|
|
console.error('HeartbeatMonitor: unexpected error during check cycle:', err);
|
|
});
|
|
}, intervalMs);
|
|
|
|
// Also run immediately on start
|
|
this.runChecks().catch((err) => {
|
|
console.error('HeartbeatMonitor: unexpected error during initial check:', err);
|
|
});
|
|
}
|
|
|
|
/** Stop the heartbeat monitor. */
|
|
stop(): void {
|
|
if (this.timer) {
|
|
clearInterval(this.timer);
|
|
this.timer = undefined;
|
|
}
|
|
auditLogger?.systemStop('HeartbeatMonitor');
|
|
}
|
|
|
|
/** Run all configured checks and return the result. */
|
|
async runChecks(): Promise<HeartbeatResult> {
|
|
const checks: CheckResult[] = [];
|
|
|
|
for (const check of this.deps.config.checks) {
|
|
const start = Date.now();
|
|
let result: CheckResult;
|
|
|
|
try {
|
|
switch (check) {
|
|
case 'gateway':
|
|
result = await this.checkGateway(start);
|
|
break;
|
|
case 'model':
|
|
result = this.checkModel(start);
|
|
break;
|
|
case 'channels':
|
|
result = this.checkChannels(start);
|
|
break;
|
|
case 'memory':
|
|
result = this.checkMemory(start);
|
|
break;
|
|
case 'disk':
|
|
result = this.checkDisk(start);
|
|
break;
|
|
case 'process_memory':
|
|
result = this.checkProcessMemory(start);
|
|
break;
|
|
case 'backup':
|
|
result = this.checkBackup(start);
|
|
break;
|
|
case 'provider_errors':
|
|
result = this.checkProviderErrors(start);
|
|
break;
|
|
default:
|
|
result = { name: check, healthy: false, message: `Unknown check: ${check}`, durationMs: Date.now() - start };
|
|
}
|
|
} catch (err) {
|
|
result = {
|
|
name: check,
|
|
healthy: false,
|
|
message: err instanceof Error ? err.message : 'Unknown error',
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
|
|
checks.push(result);
|
|
auditLogger?.heartbeatCheck({
|
|
check_name: result.name,
|
|
healthy: result.healthy,
|
|
message: result.message,
|
|
duration_ms: result.durationMs,
|
|
});
|
|
}
|
|
|
|
const healthy = checks.every((c) => c.healthy);
|
|
const heartbeatResult: HeartbeatResult = {
|
|
healthy,
|
|
checks,
|
|
timestamp: Date.now(),
|
|
};
|
|
|
|
this.lastResult = heartbeatResult;
|
|
|
|
// Failure tracking and notification
|
|
if (!healthy) {
|
|
this.consecutiveFailures++;
|
|
if (this.consecutiveFailures >= this.deps.config.failure_threshold && !this.failureAlertProcessedForCurrentIncident) {
|
|
this.failureAlertProcessedForCurrentIncident = true;
|
|
const failedChecks = checks.filter((c) => !c.healthy).map((c) => `${c.name}: ${c.message}`);
|
|
const signature = failedChecks.join('|');
|
|
const sent = await this.notifyFailureWithCooldown(
|
|
`Heartbeat FAILING (${this.consecutiveFailures} consecutive failures):\n${failedChecks.join('\n')}`,
|
|
signature,
|
|
);
|
|
this.failureAlertSentForCurrentIncident = sent;
|
|
|
|
if (sent) {
|
|
auditLogger?.heartbeatFail({
|
|
checks_failed: failedChecks,
|
|
consecutive_failures: this.consecutiveFailures,
|
|
threshold: this.deps.config.failure_threshold,
|
|
});
|
|
}
|
|
}
|
|
} else {
|
|
if (this.failureAlertSentForCurrentIncident) {
|
|
// Recovery notification
|
|
await this.notifyRecoveryWithCooldown(
|
|
`Heartbeat RECOVERED after ${this.consecutiveFailures} consecutive failure(s). All checks passing.`,
|
|
);
|
|
|
|
auditLogger?.heartbeatRecover({
|
|
consecutive_failures_before: this.consecutiveFailures,
|
|
});
|
|
}
|
|
this.consecutiveFailures = 0;
|
|
this.failureAlertSentForCurrentIncident = false;
|
|
this.failureAlertProcessedForCurrentIncident = false;
|
|
}
|
|
|
|
auditLogger?.heartbeatCycle({
|
|
interval_ms: parseInterval(this.deps.config.interval),
|
|
checks: this.deps.config.checks,
|
|
healthy,
|
|
consecutive_failures: this.consecutiveFailures,
|
|
});
|
|
|
|
return heartbeatResult;
|
|
}
|
|
|
|
/** Get the most recent heartbeat result. */
|
|
getLastResult(): HeartbeatResult | undefined {
|
|
return this.lastResult;
|
|
}
|
|
|
|
// ── Individual checks ──────────────────────────────────────────
|
|
|
|
private async checkGateway(start: number): Promise<CheckResult> {
|
|
const port = this.deps.getGatewayPort();
|
|
|
|
return new Promise<CheckResult>((resolve) => {
|
|
const req = request(
|
|
{ hostname: '127.0.0.1', port, path: '/api/health', method: 'GET', timeout: 5000 },
|
|
(res) => {
|
|
// Consume the response body
|
|
res.resume();
|
|
const healthy = res.statusCode !== undefined && res.statusCode >= 200 && res.statusCode < 400;
|
|
resolve({
|
|
name: 'gateway',
|
|
healthy,
|
|
message: healthy ? `HTTP ${res.statusCode}` : `HTTP ${res.statusCode ?? 'no response'}`,
|
|
durationMs: Date.now() - start,
|
|
});
|
|
},
|
|
);
|
|
|
|
req.on('error', (err) => {
|
|
resolve({
|
|
name: 'gateway',
|
|
healthy: false,
|
|
message: err.message,
|
|
durationMs: Date.now() - start,
|
|
});
|
|
});
|
|
|
|
req.on('timeout', () => {
|
|
req.destroy();
|
|
resolve({
|
|
name: 'gateway',
|
|
healthy: false,
|
|
message: 'Request timed out',
|
|
durationMs: Date.now() - start,
|
|
});
|
|
});
|
|
|
|
req.end();
|
|
});
|
|
}
|
|
|
|
private checkModel(start: number): CheckResult {
|
|
// Lightweight check: verify the model router is present and has a tier set
|
|
const router = this.deps.modelRouter;
|
|
if (!router) {
|
|
return { name: 'model', healthy: false, message: 'Model router not available', durationMs: Date.now() - start };
|
|
}
|
|
|
|
const tier = router.getTier();
|
|
return { name: 'model', healthy: true, message: `Active tier: ${tier}`, durationMs: Date.now() - start };
|
|
}
|
|
|
|
private checkChannels(start: number): CheckResult {
|
|
const adapters = this.deps.channelLister.list();
|
|
const connected = adapters.filter((a) => a.status === 'connected');
|
|
const disconnected = adapters.filter((a) => a.status !== 'connected');
|
|
|
|
// Healthy if at least one adapter is connected
|
|
const healthy = connected.length > 0;
|
|
const details = `${connected.length}/${adapters.length} connected`;
|
|
const message = disconnected.length > 0
|
|
? `${details} (disconnected: ${disconnected.map((a) => a.name).join(', ')})`
|
|
: details;
|
|
|
|
return { name: 'channels', healthy, message, durationMs: Date.now() - start };
|
|
}
|
|
|
|
private checkMemory(start: number): CheckResult {
|
|
const memoryDir = this.deps.memoryDir;
|
|
if (!memoryDir) {
|
|
return { name: 'memory', healthy: true, message: 'Memory store disabled', durationMs: Date.now() - start };
|
|
}
|
|
|
|
try {
|
|
accessSync(memoryDir, fsConstants.R_OK | fsConstants.W_OK);
|
|
return { name: 'memory', healthy: true, message: 'Directory accessible', durationMs: Date.now() - start };
|
|
} catch (err) {
|
|
return {
|
|
name: 'memory',
|
|
healthy: false,
|
|
message: err instanceof Error ? err.message : 'Directory not accessible',
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
}
|
|
|
|
private checkDisk(start: number): CheckResult {
|
|
try {
|
|
const stats = statfsSync(this.deps.dataDir);
|
|
const availableMb = (stats.bavail * stats.bsize) / (1024 * 1024);
|
|
const thresholdMb = this.deps.config.disk_threshold_mb;
|
|
const healthy = availableMb >= thresholdMb;
|
|
|
|
return {
|
|
name: 'disk',
|
|
healthy,
|
|
message: healthy
|
|
? `${Math.round(availableMb)} MB available`
|
|
: `Low disk space: ${Math.round(availableMb)} MB available (threshold: ${thresholdMb} MB)`,
|
|
durationMs: Date.now() - start,
|
|
};
|
|
} catch (err) {
|
|
return {
|
|
name: 'disk',
|
|
healthy: false,
|
|
message: err instanceof Error ? err.message : 'Failed to check disk',
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
}
|
|
|
|
private checkProcessMemory(start: number): CheckResult {
|
|
try {
|
|
const usageMb = this.deps.processMemoryUsageMb
|
|
? this.deps.processMemoryUsageMb()
|
|
: Math.round(process.memoryUsage().rss / (1024 * 1024));
|
|
const thresholdMb = this.deps.config.process_memory_threshold_mb;
|
|
const healthy = usageMb <= thresholdMb;
|
|
|
|
return {
|
|
name: 'process_memory',
|
|
healthy,
|
|
message: healthy
|
|
? `Process RSS ${usageMb} MB (threshold: ${thresholdMb} MB)`
|
|
: `High memory usage: ${usageMb} MB RSS (threshold: ${thresholdMb} MB)`,
|
|
durationMs: Date.now() - start,
|
|
};
|
|
} catch (err) {
|
|
return {
|
|
name: 'process_memory',
|
|
healthy: false,
|
|
message: err instanceof Error ? err.message : 'Failed to check process memory usage',
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
}
|
|
|
|
private checkBackup(start: number): CheckResult {
|
|
try {
|
|
const snapshot = this.deps.backupHealthProvider
|
|
? this.deps.backupHealthProvider()
|
|
: getBackupHealthSnapshot();
|
|
if (!snapshot.enabled) {
|
|
return { name: 'backup', healthy: true, message: 'Backup scheduler disabled', durationMs: Date.now() - start };
|
|
}
|
|
|
|
const threshold = this.deps.config.backup_failure_threshold;
|
|
const healthy = snapshot.consecutiveFailures < threshold;
|
|
if (!snapshot.hasRun) {
|
|
return { name: 'backup', healthy: true, message: 'No backup runs recorded yet', durationMs: Date.now() - start };
|
|
}
|
|
|
|
if (healthy) {
|
|
const successSuffix = snapshot.lastSuccessAt ? `, last success ${new Date(snapshot.lastSuccessAt).toISOString()}` : '';
|
|
return {
|
|
name: 'backup',
|
|
healthy: true,
|
|
message: `Consecutive failures: ${snapshot.consecutiveFailures}/${threshold}${successSuffix}`,
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
|
|
return {
|
|
name: 'backup',
|
|
healthy: false,
|
|
message: `Backup failing (${snapshot.consecutiveFailures} consecutive failures, threshold: ${threshold})${snapshot.lastError ? ` — ${snapshot.lastError}` : ''}`,
|
|
durationMs: Date.now() - start,
|
|
};
|
|
} catch (err) {
|
|
return {
|
|
name: 'backup',
|
|
healthy: false,
|
|
message: err instanceof Error ? err.message : 'Failed to read backup health state',
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
}
|
|
|
|
private checkProviderErrors(start: number): CheckResult {
|
|
try {
|
|
const calls = this.deps.getModelCalls ? this.deps.getModelCalls() : [];
|
|
if (calls.length === 0) {
|
|
return { name: 'provider_errors', healthy: true, message: 'No model calls recorded yet', durationMs: Date.now() - start };
|
|
}
|
|
|
|
const providers = new Map<string, { total: number; errors: number }>();
|
|
for (const call of calls) {
|
|
const current = providers.get(call.provider) ?? { total: 0, errors: 0 };
|
|
current.total += 1;
|
|
if (call.error) {
|
|
current.errors += 1;
|
|
}
|
|
providers.set(call.provider, current);
|
|
}
|
|
|
|
const minCalls = this.deps.config.provider_error_min_calls;
|
|
const threshold = this.deps.config.provider_error_rate_threshold;
|
|
const offenders: string[] = [];
|
|
|
|
for (const [provider, stats] of providers) {
|
|
if (stats.total < minCalls) {
|
|
continue;
|
|
}
|
|
const errorRate = stats.errors / stats.total;
|
|
if (errorRate >= threshold) {
|
|
offenders.push(`${provider} ${Math.round(errorRate * 100)}% (${stats.errors}/${stats.total})`);
|
|
}
|
|
}
|
|
|
|
if (offenders.length > 0) {
|
|
return {
|
|
name: 'provider_errors',
|
|
healthy: false,
|
|
message: `High provider error rate: ${offenders.join(', ')}`,
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
|
|
return {
|
|
name: 'provider_errors',
|
|
healthy: true,
|
|
message: `Provider error rates below threshold across ${providers.size} provider(s)`,
|
|
durationMs: Date.now() - start,
|
|
};
|
|
} catch (err) {
|
|
return {
|
|
name: 'provider_errors',
|
|
healthy: false,
|
|
message: err instanceof Error ? err.message : 'Failed to check provider error rates',
|
|
durationMs: Date.now() - start,
|
|
};
|
|
}
|
|
}
|
|
|
|
// ── Notification ───────────────────────────────────────────────
|
|
|
|
private async notify(text: string): Promise<void> {
|
|
const notifyConfig = this.deps.config.notify;
|
|
if (!notifyConfig) {return;}
|
|
|
|
const adapter = this.deps.channelLookup.get(notifyConfig.channel);
|
|
if (!adapter) {
|
|
console.warn(`HeartbeatMonitor: notification channel '${notifyConfig.channel}' not found`);
|
|
return;
|
|
}
|
|
|
|
try {
|
|
await adapter.send(notifyConfig.peer, { text });
|
|
} catch (err) {
|
|
console.error('HeartbeatMonitor: failed to send notification:', err);
|
|
}
|
|
}
|
|
|
|
private shouldNotifyByCooldown(lastAt: number, cooldownMs: number): boolean {
|
|
return Date.now() - lastAt >= cooldownMs;
|
|
}
|
|
|
|
private async notifyFailureWithCooldown(text: string, signature: string): Promise<boolean> {
|
|
const cooldownMs = parseInterval(this.deps.config.notify_cooldown ?? '30m');
|
|
const signatureChanged = signature !== this.lastFailureSignature;
|
|
const cooldownPassed = this.shouldNotifyByCooldown(this.lastFailureNotificationAt, cooldownMs);
|
|
if (!signatureChanged && !cooldownPassed) {
|
|
return false;
|
|
}
|
|
|
|
await this.notify(text);
|
|
this.lastFailureNotificationAt = Date.now();
|
|
this.lastFailureSignature = signature;
|
|
return true;
|
|
}
|
|
|
|
private async notifyRecoveryWithCooldown(text: string): Promise<boolean> {
|
|
const cooldownMs = parseInterval(this.deps.config.notify_cooldown ?? '30m');
|
|
const cooldownPassed = this.shouldNotifyByCooldown(this.lastRecoveryNotificationAt, cooldownMs);
|
|
if (!cooldownPassed) {
|
|
return false;
|
|
}
|
|
|
|
await this.notify(text);
|
|
this.lastRecoveryNotificationAt = Date.now();
|
|
return true;
|
|
}
|
|
}
|