feat(heartbeat): add process memory and backup health checks

This commit is contained in:
William Valentin
2026-02-16 13:50:39 -08:00
parent 8684c3a07d
commit 07340ff0af
11 changed files with 282 additions and 8 deletions
+7 -1
View File
@@ -652,12 +652,14 @@ automation:
heartbeat: heartbeat:
enabled: true enabled: true
interval: "5m" # Check every 5 minutes interval: "5m" # Check every 5 minutes
checks: [gateway, model, channels, memory, disk] checks: [gateway, model, channels, memory, disk, process_memory, backup]
notify: notify:
channel: telegram channel: telegram
peer: "123456789" peer: "123456789"
failure_threshold: 2 # Notify after 2 consecutive failures failure_threshold: 2 # Notify after 2 consecutive failures
disk_threshold_mb: 100 # Warn when <100MB free disk_threshold_mb: 100 # Warn when <100MB free
process_memory_threshold_mb: 1500 # Warn when RSS memory exceeds threshold
backup_failure_threshold: 1 # Warn when backup failures meet threshold
``` ```
### Heartbeat Checks ### Heartbeat Checks
@@ -669,6 +671,8 @@ automation:
| `channels` | At least one channel adapter is connected | | `channels` | At least one channel adapter is connected |
| `memory` | Memory directory is readable and writable | | `memory` | Memory directory is readable and writable |
| `disk` | Free disk space exceeds threshold | | `disk` | Free disk space exceeds threshold |
| `process_memory` | Flynn process RSS memory usage stays under threshold |
| `backup` | Backup scheduler consecutive failures stay under threshold |
The monitor sends a notification when failures reach the configured threshold and a recovery notification when all checks pass again. The monitor sends a notification when failures reach the configured threshold and a recovery notification when all checks pass again.
@@ -683,6 +687,8 @@ The monitor sends a notification when failures reach the configured threshold an
| `notify.peer` | no | Peer/chat ID for notifications | | `notify.peer` | no | Peer/chat ID for notifications |
| `failure_threshold` | no | Consecutive failures before notifying (default: `2`) | | `failure_threshold` | no | Consecutive failures before notifying (default: `2`) |
| `disk_threshold_mb` | no | Disk space warning threshold in MB (default: `100`) | | `disk_threshold_mb` | no | Disk space warning threshold in MB (default: `100`) |
| `process_memory_threshold_mb` | no | RSS memory threshold in MB for `process_memory` check (default: `1500`) |
| `backup_failure_threshold` | no | Consecutive backup failures threshold for `backup` check (default: `1`) |
## Gmail Pub/Sub Watcher ## Gmail Pub/Sub Watcher
+3 -1
View File
@@ -291,12 +291,14 @@ hooks:
# heartbeat: # heartbeat:
# enabled: false # enabled: false
# interval: "5m" # interval: "5m"
# checks: [gateway, model, channels, memory, disk] # checks: [gateway, model, channels, memory, disk, process_memory, backup]
# notify: # notify:
# channel: telegram # channel: telegram
# peer: "123456789" # peer: "123456789"
# failure_threshold: 2 # failure_threshold: 2
# disk_threshold_mb: 100 # disk_threshold_mb: 100
# process_memory_threshold_mb: 1500
# backup_failure_threshold: 1
# ── Backup ────────────────────────────────────────────────────────── # ── Backup ──────────────────────────────────────────────────────────
# Snapshot sessions.db, vectors.db (optional), and memory/ into a tarball. # Snapshot sessions.db, vectors.db (optional), and memory/ into a tarball.
+7 -3
View File
@@ -7,16 +7,20 @@
"status": "completed", "status": "completed",
"date": "2026-02-16", "date": "2026-02-16",
"updated": "2026-02-16", "updated": "2026-02-16",
"summary": "Added first-class automation presets and scheduling upgrades: `automation.daily_briefing` now auto-registers an opinionated cron job for morning briefings, and backup scheduling now supports cron expressions via `backup.schedule` plus optional `backup.run_on_start` while preserving interval fallback. Added `BackupScheduler` with `backup.notify` channel alerts, configurable `backup.failure_threshold`, and recovery notifications (`backup.notify_recovery`) so backup failures/recoveries proactively notify operators.", "summary": "Added first-class automation presets and scheduling upgrades: `automation.daily_briefing` now auto-registers an opinionated cron job for morning briefings, and backup scheduling now supports cron expressions via `backup.schedule` plus optional `backup.run_on_start` while preserving interval fallback. Added `BackupScheduler` with `backup.notify` channel alerts, configurable `backup.failure_threshold`, and recovery notifications (`backup.notify_recovery`) so backup failures/recoveries proactively notify operators. Extended heartbeat monitoring with `process_memory` and `backup` checks (with thresholds) so high RSS usage and backup failure streaks proactively trigger health alerts.",
"files_modified": [ "files_modified": [
"src/config/schema.ts", "src/config/schema.ts",
"src/config/schema.test.ts", "src/config/schema.test.ts",
"src/automation/index.ts", "src/automation/index.ts",
"src/automation/presets.ts", "src/automation/presets.ts",
"src/automation/presets.test.ts", "src/automation/presets.test.ts",
"src/automation/heartbeat.ts",
"src/automation/heartbeat.test.ts",
"src/backup/index.ts", "src/backup/index.ts",
"src/backup/scheduler.ts", "src/backup/scheduler.ts",
"src/backup/scheduler.test.ts", "src/backup/scheduler.test.ts",
"src/backup/status.ts",
"src/backup/status.test.ts",
"src/daemon/channels.ts", "src/daemon/channels.ts",
"src/daemon/channels.test.ts", "src/daemon/channels.test.ts",
"src/daemon/index.ts", "src/daemon/index.ts",
@@ -25,7 +29,7 @@
"config/default.yaml", "config/default.yaml",
"README.md" "README.md"
], ],
"test_status": "pnpm test:run src/automation/presets.test.ts src/backup/scheduler.test.ts src/config/schema.test.ts src/daemon/channels.test.ts src/gateway/handlers/services.test.ts + pnpm typecheck passing" "test_status": "pnpm test:run src/automation/presets.test.ts src/automation/heartbeat.test.ts src/backup/scheduler.test.ts src/backup/status.test.ts src/config/schema.test.ts src/daemon/channels.test.ts src/gateway/handlers/services.test.ts + pnpm typecheck passing"
}, },
"backup-session-summary-audit-trail": { "backup-session-summary-audit-trail": {
"status": "completed", "status": "completed",
@@ -3311,7 +3315,7 @@
} }
}, },
"overall_progress": { "overall_progress": {
"total_test_count": 1825, "total_test_count": 1830,
"all_tests_passing": true, "all_tests_passing": true,
"p0_completion": "3/3 (100%)", "p0_completion": "3/3 (100%)",
"p1_completion": "4/4 (100%)", "p1_completion": "4/4 (100%)",
+78 -1
View File
@@ -8,9 +8,11 @@ function makeConfig(overrides?: Partial<HeartbeatConfig>): HeartbeatConfig {
return { return {
enabled: true, enabled: true,
interval: '5m', interval: '5m',
checks: ['gateway', 'model', 'channels', 'memory', 'disk'], checks: ['gateway', 'model', 'channels', 'memory', 'disk', 'process_memory', 'backup'],
failure_threshold: 2, failure_threshold: 2,
disk_threshold_mb: 100, disk_threshold_mb: 100,
process_memory_threshold_mb: 1500,
backup_failure_threshold: 1,
...overrides, ...overrides,
}; };
} }
@@ -29,6 +31,12 @@ function makeDeps(overrides?: Partial<HeartbeatDeps>): HeartbeatDeps {
memoryDir: '/tmp/flynn-test-memory', memoryDir: '/tmp/flynn-test-memory',
dataDir: '/tmp', dataDir: '/tmp',
channelLookup: { get: vi.fn() }, channelLookup: { get: vi.fn() },
processMemoryUsageMb: () => 256,
backupHealthProvider: () => ({
enabled: false,
hasRun: false,
consecutiveFailures: 0,
}),
...overrides, ...overrides,
}; };
} }
@@ -436,4 +444,73 @@ describe('HeartbeatMonitor', () => {
expect(check.healthy).toBe(false); expect(check.healthy).toBe(false);
}); });
}); });
describe('process_memory check', () => {
it('passes when RSS is below threshold', async () => {
const deps = makeDeps({
config: makeConfig({ checks: ['process_memory'], process_memory_threshold_mb: 512 }),
processMemoryUsageMb: () => 200,
});
monitor = new HeartbeatMonitor(deps);
const result = await monitor.runChecks();
const check = result.checks.find((c) => c.name === 'process_memory');
if (!check) {throw new Error('Expected process_memory check result');}
expect(check.healthy).toBe(true);
});
it('fails when RSS is above threshold', async () => {
const deps = makeDeps({
config: makeConfig({ checks: ['process_memory'], process_memory_threshold_mb: 128 }),
processMemoryUsageMb: () => 512,
});
monitor = new HeartbeatMonitor(deps);
const result = await monitor.runChecks();
const check = result.checks.find((c) => c.name === 'process_memory');
if (!check) {throw new Error('Expected process_memory check result');}
expect(check.healthy).toBe(false);
expect(check.message).toContain('High memory usage');
});
});
describe('backup check', () => {
it('passes when backup is disabled', async () => {
const deps = makeDeps({
config: makeConfig({ checks: ['backup'] }),
backupHealthProvider: () => ({
enabled: false,
hasRun: false,
consecutiveFailures: 0,
}),
});
monitor = new HeartbeatMonitor(deps);
const result = await monitor.runChecks();
const check = result.checks.find((c) => c.name === 'backup');
if (!check) {throw new Error('Expected backup check result');}
expect(check.healthy).toBe(true);
expect(check.message).toContain('disabled');
});
it('fails when backup consecutive failures exceed threshold', async () => {
const deps = makeDeps({
config: makeConfig({ checks: ['backup'], backup_failure_threshold: 2 }),
backupHealthProvider: () => ({
enabled: true,
hasRun: true,
consecutiveFailures: 3,
lastError: 'minio unavailable',
}),
});
monitor = new HeartbeatMonitor(deps);
const result = await monitor.runChecks();
const check = result.checks.find((c) => c.name === 'backup');
if (!check) {throw new Error('Expected backup check result');}
expect(check.healthy).toBe(false);
expect(check.message).toContain('Backup failing');
expect(check.message).toContain('minio unavailable');
});
});
}); });
+76
View File
@@ -2,6 +2,7 @@ import { statfsSync, accessSync, constants as fsConstants } from 'fs';
import { request } from 'http'; import { request } from 'http';
import type { HeartbeatConfig, HeartbeatCheck } from '../config/schema.js'; import type { HeartbeatConfig, HeartbeatCheck } from '../config/schema.js';
import type { ChannelAdapter, OutboundMessage } from '../channels/types.js'; import type { ChannelAdapter, OutboundMessage } from '../channels/types.js';
import { getBackupHealthSnapshot, type BackupHealthSnapshot } from '../backup/index.js';
import { auditLogger } from '../audit/index.js'; import { auditLogger } from '../audit/index.js';
/** Result of a single health check. */ /** Result of a single health check. */
@@ -38,6 +39,8 @@ export interface HeartbeatDeps {
memoryDir: string | undefined; memoryDir: string | undefined;
dataDir: string; dataDir: string;
channelLookup: ChannelLookup; channelLookup: ChannelLookup;
processMemoryUsageMb?: () => number;
backupHealthProvider?: () => BackupHealthSnapshot;
} }
/** /**
@@ -126,6 +129,12 @@ export class HeartbeatMonitor {
case 'disk': case 'disk':
result = this.checkDisk(start); result = this.checkDisk(start);
break; break;
case 'process_memory':
result = this.checkProcessMemory(start);
break;
case 'backup':
result = this.checkBackup(start);
break;
default: default:
result = { name: check, healthy: false, message: `Unknown check: ${check}`, durationMs: Date.now() - start }; result = { name: check, healthy: false, message: `Unknown check: ${check}`, durationMs: Date.now() - start };
} }
@@ -312,6 +321,73 @@ export class HeartbeatMonitor {
} }
} }
private checkProcessMemory(start: number): CheckResult {
try {
const usageMb = this.deps.processMemoryUsageMb
? this.deps.processMemoryUsageMb()
: Math.round(process.memoryUsage().rss / (1024 * 1024));
const thresholdMb = this.deps.config.process_memory_threshold_mb;
const healthy = usageMb <= thresholdMb;
return {
name: 'process_memory',
healthy,
message: healthy
? `Process RSS ${usageMb} MB (threshold: ${thresholdMb} MB)`
: `High memory usage: ${usageMb} MB RSS (threshold: ${thresholdMb} MB)`,
durationMs: Date.now() - start,
};
} catch (err) {
return {
name: 'process_memory',
healthy: false,
message: err instanceof Error ? err.message : 'Failed to check process memory usage',
durationMs: Date.now() - start,
};
}
}
private checkBackup(start: number): CheckResult {
try {
const snapshot = this.deps.backupHealthProvider
? this.deps.backupHealthProvider()
: getBackupHealthSnapshot();
if (!snapshot.enabled) {
return { name: 'backup', healthy: true, message: 'Backup scheduler disabled', durationMs: Date.now() - start };
}
const threshold = this.deps.config.backup_failure_threshold;
const healthy = snapshot.consecutiveFailures < threshold;
if (!snapshot.hasRun) {
return { name: 'backup', healthy: true, message: 'No backup runs recorded yet', durationMs: Date.now() - start };
}
if (healthy) {
const successSuffix = snapshot.lastSuccessAt ? `, last success ${new Date(snapshot.lastSuccessAt).toISOString()}` : '';
return {
name: 'backup',
healthy: true,
message: `Consecutive failures: ${snapshot.consecutiveFailures}/${threshold}${successSuffix}`,
durationMs: Date.now() - start,
};
}
return {
name: 'backup',
healthy: false,
message: `Backup failing (${snapshot.consecutiveFailures} consecutive failures, threshold: ${threshold})${snapshot.lastError ? `${snapshot.lastError}` : ''}`,
durationMs: Date.now() - start,
};
} catch (err) {
return {
name: 'backup',
healthy: false,
message: err instanceof Error ? err.message : 'Failed to read backup health state',
durationMs: Date.now() - start,
};
}
}
// ── Notification ─────────────────────────────────────────────── // ── Notification ───────────────────────────────────────────────
private async notify(text: string): Promise<void> { private async notify(text: string): Promise<void> {
+7
View File
@@ -1,2 +1,9 @@
export { runBackupSnapshot, backupInternals, type BackupRunOptions, type BackupResult } from './run.js'; export { runBackupSnapshot, backupInternals, type BackupRunOptions, type BackupResult } from './run.js';
export { BackupScheduler, type BackupSchedulerDeps } from './scheduler.js'; export { BackupScheduler, type BackupSchedulerDeps } from './scheduler.js';
export {
getBackupHealthSnapshot,
initBackupHealth,
markBackupSuccess,
markBackupFailure,
type BackupHealthSnapshot,
} from './status.js';
+4
View File
@@ -3,6 +3,7 @@ import type { BackupConfig } from '../config/schema.js';
import type { OutboundMessage } from '../channels/types.js'; import type { OutboundMessage } from '../channels/types.js';
import { parseDuration } from '../session/index.js'; import { parseDuration } from '../session/index.js';
import { runBackupSnapshot } from './run.js'; import { runBackupSnapshot } from './run.js';
import { initBackupHealth, markBackupFailure, markBackupSuccess } from './status.js';
interface ChannelLookup { interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined; get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
@@ -28,6 +29,7 @@ export class BackupScheduler {
} }
start(): void { start(): void {
initBackupHealth(this.deps.backupConfig.enabled);
if (!this.deps.backupConfig.enabled) { if (!this.deps.backupConfig.enabled) {
return; return;
} }
@@ -92,10 +94,12 @@ export class BackupScheduler {
backupConfig: this.deps.backupConfig, backupConfig: this.deps.backupConfig,
}); });
console.log(`Backup completed: ${result.archivePath}${result.uploaded && result.remotePath ? ` -> ${result.remotePath}` : ''}`); console.log(`Backup completed: ${result.archivePath}${result.uploaded && result.remotePath ? ` -> ${result.remotePath}` : ''}`);
markBackupSuccess();
await this.handleSuccess(); await this.handleSuccess();
} catch (error) { } catch (error) {
const message = error instanceof Error ? error.message : String(error); const message = error instanceof Error ? error.message : String(error);
console.error(`Backup failed: ${message}`); console.error(`Backup failed: ${message}`);
markBackupFailure(message);
await this.handleFailure(message); await this.handleFailure(message);
} finally { } finally {
this.running = false; this.running = false;
+42
View File
@@ -0,0 +1,42 @@
import { describe, expect, it } from 'vitest';
import {
getBackupHealthSnapshot,
initBackupHealth,
markBackupFailure,
markBackupSuccess,
} from './status.js';
describe('backup status tracker', () => {
it('tracks failures and resets on success', () => {
initBackupHealth(true);
expect(getBackupHealthSnapshot()).toMatchObject({
enabled: true,
hasRun: false,
consecutiveFailures: 0,
});
markBackupFailure('upload failed', 123);
expect(getBackupHealthSnapshot()).toMatchObject({
enabled: true,
hasRun: true,
consecutiveFailures: 1,
lastFailureAt: 123,
lastError: 'upload failed',
});
markBackupFailure('upload failed again', 456);
expect(getBackupHealthSnapshot()).toMatchObject({
consecutiveFailures: 2,
lastFailureAt: 456,
lastError: 'upload failed again',
});
markBackupSuccess(789);
expect(getBackupHealthSnapshot()).toMatchObject({
hasRun: true,
consecutiveFailures: 0,
lastSuccessAt: 789,
lastError: undefined,
});
});
});
+46
View File
@@ -0,0 +1,46 @@
export interface BackupHealthSnapshot {
enabled: boolean;
hasRun: boolean;
consecutiveFailures: number;
lastSuccessAt?: number;
lastFailureAt?: number;
lastError?: string;
}
let snapshot: BackupHealthSnapshot = {
enabled: false,
hasRun: false,
consecutiveFailures: 0,
};
export function initBackupHealth(enabled: boolean): void {
snapshot = {
enabled,
hasRun: false,
consecutiveFailures: 0,
};
}
export function markBackupSuccess(now = Date.now()): void {
snapshot = {
...snapshot,
hasRun: true,
consecutiveFailures: 0,
lastSuccessAt: now,
lastError: undefined,
};
}
export function markBackupFailure(error: string, now = Date.now()): void {
snapshot = {
...snapshot,
hasRun: true,
consecutiveFailures: snapshot.consecutiveFailures + 1,
lastFailureAt: now,
lastError: error,
};
}
export function getBackupHealthSnapshot(): BackupHealthSnapshot {
return { ...snapshot };
}
+8
View File
@@ -962,6 +962,14 @@ describe('configSchema automation', () => {
expect(result.automation.daily_briefing.prompt).toBe('Custom briefing prompt'); expect(result.automation.daily_briefing.prompt).toBe('Custom briefing prompt');
expect(result.automation.daily_briefing.model_tier).toBe('fast'); expect(result.automation.daily_briefing.model_tier).toBe('fast');
}); });
it('defaults heartbeat extended thresholds and checks', () => {
const result = configSchema.parse(baseConfig);
expect(result.automation.heartbeat.process_memory_threshold_mb).toBe(1500);
expect(result.automation.heartbeat.backup_failure_threshold).toBe(1);
expect(result.automation.heartbeat.checks).toContain('process_memory');
expect(result.automation.heartbeat.checks).toContain('backup');
});
}); });
describe('configSchema — intents', () => { describe('configSchema — intents', () => {
+4 -2
View File
@@ -302,18 +302,20 @@ const gmailSchema = z.object({
message: z.string().default('New email from {{from}}: {{subject}}\n\n{{snippet}}'), message: z.string().default('New email from {{from}}: {{subject}}\n\n{{snippet}}'),
}).optional(); }).optional();
const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk']); const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk', 'process_memory', 'backup']);
const heartbeatSchema = z.object({ const heartbeatSchema = z.object({
enabled: z.boolean().default(false), enabled: z.boolean().default(false),
interval: z.string().default('5m'), interval: z.string().default('5m'),
checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']), checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk', 'process_memory', 'backup']),
notify: z.object({ notify: z.object({
channel: z.string().min(1), channel: z.string().min(1),
peer: z.string().min(1), peer: z.string().min(1),
}).optional(), }).optional(),
failure_threshold: z.number().min(1).max(10).default(2), failure_threshold: z.number().min(1).max(10).default(2),
disk_threshold_mb: z.number().min(10).default(100), disk_threshold_mb: z.number().min(10).default(100),
process_memory_threshold_mb: z.number().min(64).default(1500),
backup_failure_threshold: z.number().min(1).max(10).default(1),
}).default({}); }).default({});
const gcalSchema = z.object({ const gcalSchema = z.object({