feat: add heartbeat monitor and vector memory search (Tier 2)

Heartbeat:
- HeartbeatMonitor with 5 checks: gateway, model, channels, memory, disk
- Configurable interval, failure threshold, notification channel
- Recovery notifications when health restores
- 25 new tests

Vector Memory Search:
- EmbeddingProvider interface with OpenAI, Gemini, Ollama, LlamaCpp backends
- SQLite-backed VectorStore with cosine similarity search
- Text chunker with paragraph-aware splitting and overlap
- HybridSearch merging keyword + vector results with configurable weight
- Background indexer with dirty-namespace tracking
- Graceful fallback to keyword search when embeddings unavailable
- 51 new tests

Config: automation.heartbeat + memory.embedding schema sections
Total: 950 tests passing, all types clean
This commit is contained in:
William Valentin
2026-02-07 14:45:11 -08:00
parent b50c140d25
commit 88731a50e3
17 changed files with 2354 additions and 7 deletions
+307
View File
@@ -0,0 +1,307 @@
import { statfsSync, accessSync, constants as fsConstants } from 'fs';
import { request } from 'http';
import type { HeartbeatConfig, HeartbeatCheck } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js';
/** Result of a single health check. */
export interface CheckResult {
name: HeartbeatCheck;
healthy: boolean;
message: string;
durationMs: number;
}
/** Result of a full heartbeat cycle. */
export interface HeartbeatResult {
healthy: boolean;
checks: CheckResult[];
timestamp: number;
}
/** Minimal interface for sending notifications via a channel. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
/** Minimal interface for listing channel adapters. */
interface ChannelLister {
list(): ChannelAdapter[];
}
/** Dependencies injected into HeartbeatMonitor. */
export interface HeartbeatDeps {
config: HeartbeatConfig;
getGatewayPort: () => number;
modelRouter: { getTier(): string } | undefined;
channelLister: ChannelLister;
memoryDir: string | undefined;
dataDir: string;
channelLookup: ChannelLookup;
}
/**
* Parse a human-friendly interval string into milliseconds.
* Supports: '60s', '5m', '1h'. Bare numbers are treated as seconds.
*/
export function parseInterval(interval: string): number {
const match = interval.trim().match(/^(\d+(?:\.\d+)?)\s*(s|m|h)?$/i);
if (!match) {
throw new Error(`Invalid interval format: '${interval}'. Use e.g. '60s', '5m', '1h'.`);
}
const value = parseFloat(match[1]);
const unit = (match[2] ?? 's').toLowerCase();
switch (unit) {
case 's': return Math.round(value * 1000);
case 'm': return Math.round(value * 60 * 1000);
case 'h': return Math.round(value * 60 * 60 * 1000);
default: return Math.round(value * 1000);
}
}
export class HeartbeatMonitor {
private timer: ReturnType<typeof setInterval> | undefined;
private lastResult: HeartbeatResult | undefined;
private consecutiveFailures = 0;
private notifiedFailure = false;
private readonly deps: HeartbeatDeps;
constructor(deps: HeartbeatDeps) {
this.deps = deps;
}
/** Start the heartbeat monitor. Does nothing if disabled. */
start(): void {
if (!this.deps.config.enabled) return;
const intervalMs = parseInterval(this.deps.config.interval);
console.log(`HeartbeatMonitor: starting (interval=${this.deps.config.interval}, checks=[${this.deps.config.checks.join(', ')}])`);
this.timer = setInterval(() => {
this.runChecks().catch((err) => {
console.error('HeartbeatMonitor: unexpected error during check cycle:', err);
});
}, intervalMs);
// Also run immediately on start
this.runChecks().catch((err) => {
console.error('HeartbeatMonitor: unexpected error during initial check:', err);
});
}
/** Stop the heartbeat monitor. */
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = undefined;
}
}
/** Run all configured checks and return the result. */
async runChecks(): Promise<HeartbeatResult> {
const checks: CheckResult[] = [];
for (const check of this.deps.config.checks) {
const start = Date.now();
let result: CheckResult;
try {
switch (check) {
case 'gateway':
result = await this.checkGateway(start);
break;
case 'model':
result = this.checkModel(start);
break;
case 'channels':
result = this.checkChannels(start);
break;
case 'memory':
result = this.checkMemory(start);
break;
case 'disk':
result = this.checkDisk(start);
break;
default:
result = { name: check, healthy: false, message: `Unknown check: ${check}`, durationMs: Date.now() - start };
}
} catch (err) {
result = {
name: check,
healthy: false,
message: err instanceof Error ? err.message : 'Unknown error',
durationMs: Date.now() - start,
};
}
checks.push(result);
}
const healthy = checks.every((c) => c.healthy);
const heartbeatResult: HeartbeatResult = {
healthy,
checks,
timestamp: Date.now(),
};
this.lastResult = heartbeatResult;
// Failure tracking and notification
if (!healthy) {
this.consecutiveFailures++;
if (this.consecutiveFailures >= this.deps.config.failure_threshold && !this.notifiedFailure) {
this.notifiedFailure = true;
const failedChecks = checks.filter((c) => !c.healthy).map((c) => `${c.name}: ${c.message}`);
await this.notify(`Heartbeat FAILING (${this.consecutiveFailures} consecutive failures):\n${failedChecks.join('\n')}`);
}
} else {
if (this.notifiedFailure) {
// Recovery notification
await this.notify(`Heartbeat RECOVERED after ${this.consecutiveFailures} consecutive failure(s). All checks passing.`);
}
this.consecutiveFailures = 0;
this.notifiedFailure = false;
}
return heartbeatResult;
}
/** Get the most recent heartbeat result. */
getLastResult(): HeartbeatResult | undefined {
return this.lastResult;
}
// ── Individual checks ──────────────────────────────────────────
private async checkGateway(start: number): Promise<CheckResult> {
const port = this.deps.getGatewayPort();
return new Promise<CheckResult>((resolve) => {
const req = request(
{ hostname: '127.0.0.1', port, path: '/api/health', method: 'GET', timeout: 5000 },
(res) => {
// Consume the response body
res.resume();
const healthy = res.statusCode !== undefined && res.statusCode >= 200 && res.statusCode < 400;
resolve({
name: 'gateway',
healthy,
message: healthy ? `HTTP ${res.statusCode}` : `HTTP ${res.statusCode ?? 'no response'}`,
durationMs: Date.now() - start,
});
},
);
req.on('error', (err) => {
resolve({
name: 'gateway',
healthy: false,
message: err.message,
durationMs: Date.now() - start,
});
});
req.on('timeout', () => {
req.destroy();
resolve({
name: 'gateway',
healthy: false,
message: 'Request timed out',
durationMs: Date.now() - start,
});
});
req.end();
});
}
private checkModel(start: number): CheckResult {
// Lightweight check: verify the model router is present and has a tier set
const router = this.deps.modelRouter;
if (!router) {
return { name: 'model', healthy: false, message: 'Model router not available', durationMs: Date.now() - start };
}
const tier = router.getTier();
return { name: 'model', healthy: true, message: `Active tier: ${tier}`, durationMs: Date.now() - start };
}
private checkChannels(start: number): CheckResult {
const adapters = this.deps.channelLister.list();
const connected = adapters.filter((a) => a.status === 'connected');
const disconnected = adapters.filter((a) => a.status !== 'connected');
// Healthy if at least one adapter is connected
const healthy = connected.length > 0;
const details = `${connected.length}/${adapters.length} connected`;
const message = disconnected.length > 0
? `${details} (disconnected: ${disconnected.map((a) => a.name).join(', ')})`
: details;
return { name: 'channels', healthy, message, durationMs: Date.now() - start };
}
private checkMemory(start: number): CheckResult {
const memoryDir = this.deps.memoryDir;
if (!memoryDir) {
return { name: 'memory', healthy: true, message: 'Memory store disabled', durationMs: Date.now() - start };
}
try {
accessSync(memoryDir, fsConstants.R_OK | fsConstants.W_OK);
return { name: 'memory', healthy: true, message: 'Directory accessible', durationMs: Date.now() - start };
} catch (err) {
return {
name: 'memory',
healthy: false,
message: err instanceof Error ? err.message : 'Directory not accessible',
durationMs: Date.now() - start,
};
}
}
private checkDisk(start: number): CheckResult {
try {
const stats = statfsSync(this.deps.dataDir);
const availableMb = (stats.bavail * stats.bsize) / (1024 * 1024);
const thresholdMb = this.deps.config.disk_threshold_mb;
const healthy = availableMb >= thresholdMb;
return {
name: 'disk',
healthy,
message: healthy
? `${Math.round(availableMb)} MB available`
: `Low disk space: ${Math.round(availableMb)} MB available (threshold: ${thresholdMb} MB)`,
durationMs: Date.now() - start,
};
} catch (err) {
return {
name: 'disk',
healthy: false,
message: err instanceof Error ? err.message : 'Failed to check disk',
durationMs: Date.now() - start,
};
}
}
// ── Notification ───────────────────────────────────────────────
private async notify(text: string): Promise<void> {
const notifyConfig = this.deps.config.notify;
if (!notifyConfig) return;
const adapter = this.deps.channelLookup.get(notifyConfig.channel);
if (!adapter) {
console.warn(`HeartbeatMonitor: notification channel '${notifyConfig.channel}' not found`);
return;
}
try {
await adapter.send(notifyConfig.peer, { text });
} catch (err) {
console.error('HeartbeatMonitor: failed to send notification:', err);
}
}
}