diff --git a/README.md b/README.md index 1558409..08273de 100644 --- a/README.md +++ b/README.md @@ -652,12 +652,14 @@ automation: heartbeat: enabled: true interval: "5m" # Check every 5 minutes - checks: [gateway, model, channels, memory, disk] + checks: [gateway, model, channels, memory, disk, process_memory, backup] notify: channel: telegram peer: "123456789" failure_threshold: 2 # Notify after 2 consecutive failures disk_threshold_mb: 100 # Warn when <100MB free + process_memory_threshold_mb: 1500 # Warn when RSS memory exceeds threshold + backup_failure_threshold: 1 # Warn when backup failures meet threshold ``` ### Heartbeat Checks @@ -669,6 +671,8 @@ automation: | `channels` | At least one channel adapter is connected | | `memory` | Memory directory is readable and writable | | `disk` | Free disk space exceeds threshold | +| `process_memory` | Flynn process RSS memory usage stays under threshold | +| `backup` | Backup scheduler consecutive failures stay under threshold | The monitor sends a notification when failures reach the configured threshold and a recovery notification when all checks pass again. @@ -683,6 +687,8 @@ The monitor sends a notification when failures reach the configured threshold an | `notify.peer` | no | Peer/chat ID for notifications | | `failure_threshold` | no | Consecutive failures before notifying (default: `2`) | | `disk_threshold_mb` | no | Disk space warning threshold in MB (default: `100`) | +| `process_memory_threshold_mb` | no | RSS memory threshold in MB for `process_memory` check (default: `1500`) | +| `backup_failure_threshold` | no | Consecutive backup failures threshold for `backup` check (default: `1`) | ## Gmail Pub/Sub Watcher diff --git a/config/default.yaml b/config/default.yaml index 17493d1..d2eeab4 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -291,12 +291,14 @@ hooks: # heartbeat: # enabled: false # interval: "5m" -# checks: [gateway, model, channels, memory, disk] +# checks: [gateway, model, channels, memory, disk, process_memory, backup] # notify: # channel: telegram # peer: "123456789" # failure_threshold: 2 # disk_threshold_mb: 100 +# process_memory_threshold_mb: 1500 +# backup_failure_threshold: 1 # ── Backup ────────────────────────────────────────────────────────── # Snapshot sessions.db, vectors.db (optional), and memory/ into a tarball. diff --git a/docs/plans/state.json b/docs/plans/state.json index b1855ef..f715197 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -7,16 +7,20 @@ "status": "completed", "date": "2026-02-16", "updated": "2026-02-16", - "summary": "Added first-class automation presets and scheduling upgrades: `automation.daily_briefing` now auto-registers an opinionated cron job for morning briefings, and backup scheduling now supports cron expressions via `backup.schedule` plus optional `backup.run_on_start` while preserving interval fallback. Added `BackupScheduler` with `backup.notify` channel alerts, configurable `backup.failure_threshold`, and recovery notifications (`backup.notify_recovery`) so backup failures/recoveries proactively notify operators.", + "summary": "Added first-class automation presets and scheduling upgrades: `automation.daily_briefing` now auto-registers an opinionated cron job for morning briefings, and backup scheduling now supports cron expressions via `backup.schedule` plus optional `backup.run_on_start` while preserving interval fallback. Added `BackupScheduler` with `backup.notify` channel alerts, configurable `backup.failure_threshold`, and recovery notifications (`backup.notify_recovery`) so backup failures/recoveries proactively notify operators. Extended heartbeat monitoring with `process_memory` and `backup` checks (with thresholds) so high RSS usage and backup failure streaks proactively trigger health alerts.", "files_modified": [ "src/config/schema.ts", "src/config/schema.test.ts", "src/automation/index.ts", "src/automation/presets.ts", "src/automation/presets.test.ts", + "src/automation/heartbeat.ts", + "src/automation/heartbeat.test.ts", "src/backup/index.ts", "src/backup/scheduler.ts", "src/backup/scheduler.test.ts", + "src/backup/status.ts", + "src/backup/status.test.ts", "src/daemon/channels.ts", "src/daemon/channels.test.ts", "src/daemon/index.ts", @@ -25,7 +29,7 @@ "config/default.yaml", "README.md" ], - "test_status": "pnpm test:run src/automation/presets.test.ts src/backup/scheduler.test.ts src/config/schema.test.ts src/daemon/channels.test.ts src/gateway/handlers/services.test.ts + pnpm typecheck passing" + "test_status": "pnpm test:run src/automation/presets.test.ts src/automation/heartbeat.test.ts src/backup/scheduler.test.ts src/backup/status.test.ts src/config/schema.test.ts src/daemon/channels.test.ts src/gateway/handlers/services.test.ts + pnpm typecheck passing" }, "backup-session-summary-audit-trail": { "status": "completed", @@ -3311,7 +3315,7 @@ } }, "overall_progress": { - "total_test_count": 1825, + "total_test_count": 1830, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", diff --git a/src/automation/heartbeat.test.ts b/src/automation/heartbeat.test.ts index 3e8acf7..3f41042 100644 --- a/src/automation/heartbeat.test.ts +++ b/src/automation/heartbeat.test.ts @@ -8,9 +8,11 @@ function makeConfig(overrides?: Partial): HeartbeatConfig { return { enabled: true, interval: '5m', - checks: ['gateway', 'model', 'channels', 'memory', 'disk'], + checks: ['gateway', 'model', 'channels', 'memory', 'disk', 'process_memory', 'backup'], failure_threshold: 2, disk_threshold_mb: 100, + process_memory_threshold_mb: 1500, + backup_failure_threshold: 1, ...overrides, }; } @@ -29,6 +31,12 @@ function makeDeps(overrides?: Partial): HeartbeatDeps { memoryDir: '/tmp/flynn-test-memory', dataDir: '/tmp', channelLookup: { get: vi.fn() }, + processMemoryUsageMb: () => 256, + backupHealthProvider: () => ({ + enabled: false, + hasRun: false, + consecutiveFailures: 0, + }), ...overrides, }; } @@ -436,4 +444,73 @@ describe('HeartbeatMonitor', () => { expect(check.healthy).toBe(false); }); }); + + describe('process_memory check', () => { + it('passes when RSS is below threshold', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['process_memory'], process_memory_threshold_mb: 512 }), + processMemoryUsageMb: () => 200, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'process_memory'); + if (!check) {throw new Error('Expected process_memory check result');} + expect(check.healthy).toBe(true); + }); + + it('fails when RSS is above threshold', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['process_memory'], process_memory_threshold_mb: 128 }), + processMemoryUsageMb: () => 512, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'process_memory'); + if (!check) {throw new Error('Expected process_memory check result');} + expect(check.healthy).toBe(false); + expect(check.message).toContain('High memory usage'); + }); + }); + + describe('backup check', () => { + it('passes when backup is disabled', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['backup'] }), + backupHealthProvider: () => ({ + enabled: false, + hasRun: false, + consecutiveFailures: 0, + }), + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'backup'); + if (!check) {throw new Error('Expected backup check result');} + expect(check.healthy).toBe(true); + expect(check.message).toContain('disabled'); + }); + + it('fails when backup consecutive failures exceed threshold', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['backup'], backup_failure_threshold: 2 }), + backupHealthProvider: () => ({ + enabled: true, + hasRun: true, + consecutiveFailures: 3, + lastError: 'minio unavailable', + }), + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'backup'); + if (!check) {throw new Error('Expected backup check result');} + expect(check.healthy).toBe(false); + expect(check.message).toContain('Backup failing'); + expect(check.message).toContain('minio unavailable'); + }); + }); }); diff --git a/src/automation/heartbeat.ts b/src/automation/heartbeat.ts index 8084d64..223fd5f 100644 --- a/src/automation/heartbeat.ts +++ b/src/automation/heartbeat.ts @@ -2,6 +2,7 @@ import { statfsSync, accessSync, constants as fsConstants } from 'fs'; import { request } from 'http'; import type { HeartbeatConfig, HeartbeatCheck } from '../config/schema.js'; import type { ChannelAdapter, OutboundMessage } from '../channels/types.js'; +import { getBackupHealthSnapshot, type BackupHealthSnapshot } from '../backup/index.js'; import { auditLogger } from '../audit/index.js'; /** Result of a single health check. */ @@ -38,6 +39,8 @@ export interface HeartbeatDeps { memoryDir: string | undefined; dataDir: string; channelLookup: ChannelLookup; + processMemoryUsageMb?: () => number; + backupHealthProvider?: () => BackupHealthSnapshot; } /** @@ -126,6 +129,12 @@ export class HeartbeatMonitor { case 'disk': result = this.checkDisk(start); break; + case 'process_memory': + result = this.checkProcessMemory(start); + break; + case 'backup': + result = this.checkBackup(start); + break; default: result = { name: check, healthy: false, message: `Unknown check: ${check}`, durationMs: Date.now() - start }; } @@ -312,6 +321,73 @@ export class HeartbeatMonitor { } } + private checkProcessMemory(start: number): CheckResult { + try { + const usageMb = this.deps.processMemoryUsageMb + ? this.deps.processMemoryUsageMb() + : Math.round(process.memoryUsage().rss / (1024 * 1024)); + const thresholdMb = this.deps.config.process_memory_threshold_mb; + const healthy = usageMb <= thresholdMb; + + return { + name: 'process_memory', + healthy, + message: healthy + ? `Process RSS ${usageMb} MB (threshold: ${thresholdMb} MB)` + : `High memory usage: ${usageMb} MB RSS (threshold: ${thresholdMb} MB)`, + durationMs: Date.now() - start, + }; + } catch (err) { + return { + name: 'process_memory', + healthy: false, + message: err instanceof Error ? err.message : 'Failed to check process memory usage', + durationMs: Date.now() - start, + }; + } + } + + private checkBackup(start: number): CheckResult { + try { + const snapshot = this.deps.backupHealthProvider + ? this.deps.backupHealthProvider() + : getBackupHealthSnapshot(); + if (!snapshot.enabled) { + return { name: 'backup', healthy: true, message: 'Backup scheduler disabled', durationMs: Date.now() - start }; + } + + const threshold = this.deps.config.backup_failure_threshold; + const healthy = snapshot.consecutiveFailures < threshold; + if (!snapshot.hasRun) { + return { name: 'backup', healthy: true, message: 'No backup runs recorded yet', durationMs: Date.now() - start }; + } + + if (healthy) { + const successSuffix = snapshot.lastSuccessAt ? `, last success ${new Date(snapshot.lastSuccessAt).toISOString()}` : ''; + return { + name: 'backup', + healthy: true, + message: `Consecutive failures: ${snapshot.consecutiveFailures}/${threshold}${successSuffix}`, + durationMs: Date.now() - start, + }; + } + + return { + name: 'backup', + healthy: false, + message: `Backup failing (${snapshot.consecutiveFailures} consecutive failures, threshold: ${threshold})${snapshot.lastError ? ` — ${snapshot.lastError}` : ''}`, + durationMs: Date.now() - start, + }; + } catch (err) { + return { + name: 'backup', + healthy: false, + message: err instanceof Error ? err.message : 'Failed to read backup health state', + durationMs: Date.now() - start, + }; + } + } + // ── Notification ─────────────────────────────────────────────── private async notify(text: string): Promise { diff --git a/src/backup/index.ts b/src/backup/index.ts index 49cf481..4837d90 100644 --- a/src/backup/index.ts +++ b/src/backup/index.ts @@ -1,2 +1,9 @@ export { runBackupSnapshot, backupInternals, type BackupRunOptions, type BackupResult } from './run.js'; export { BackupScheduler, type BackupSchedulerDeps } from './scheduler.js'; +export { + getBackupHealthSnapshot, + initBackupHealth, + markBackupSuccess, + markBackupFailure, + type BackupHealthSnapshot, +} from './status.js'; diff --git a/src/backup/scheduler.ts b/src/backup/scheduler.ts index e7a93e0..78e5e4f 100644 --- a/src/backup/scheduler.ts +++ b/src/backup/scheduler.ts @@ -3,6 +3,7 @@ import type { BackupConfig } from '../config/schema.js'; import type { OutboundMessage } from '../channels/types.js'; import { parseDuration } from '../session/index.js'; import { runBackupSnapshot } from './run.js'; +import { initBackupHealth, markBackupFailure, markBackupSuccess } from './status.js'; interface ChannelLookup { get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; @@ -28,6 +29,7 @@ export class BackupScheduler { } start(): void { + initBackupHealth(this.deps.backupConfig.enabled); if (!this.deps.backupConfig.enabled) { return; } @@ -92,10 +94,12 @@ export class BackupScheduler { backupConfig: this.deps.backupConfig, }); console.log(`Backup completed: ${result.archivePath}${result.uploaded && result.remotePath ? ` -> ${result.remotePath}` : ''}`); + markBackupSuccess(); await this.handleSuccess(); } catch (error) { const message = error instanceof Error ? error.message : String(error); console.error(`Backup failed: ${message}`); + markBackupFailure(message); await this.handleFailure(message); } finally { this.running = false; diff --git a/src/backup/status.test.ts b/src/backup/status.test.ts new file mode 100644 index 0000000..e31d6c7 --- /dev/null +++ b/src/backup/status.test.ts @@ -0,0 +1,42 @@ +import { describe, expect, it } from 'vitest'; +import { + getBackupHealthSnapshot, + initBackupHealth, + markBackupFailure, + markBackupSuccess, +} from './status.js'; + +describe('backup status tracker', () => { + it('tracks failures and resets on success', () => { + initBackupHealth(true); + expect(getBackupHealthSnapshot()).toMatchObject({ + enabled: true, + hasRun: false, + consecutiveFailures: 0, + }); + + markBackupFailure('upload failed', 123); + expect(getBackupHealthSnapshot()).toMatchObject({ + enabled: true, + hasRun: true, + consecutiveFailures: 1, + lastFailureAt: 123, + lastError: 'upload failed', + }); + + markBackupFailure('upload failed again', 456); + expect(getBackupHealthSnapshot()).toMatchObject({ + consecutiveFailures: 2, + lastFailureAt: 456, + lastError: 'upload failed again', + }); + + markBackupSuccess(789); + expect(getBackupHealthSnapshot()).toMatchObject({ + hasRun: true, + consecutiveFailures: 0, + lastSuccessAt: 789, + lastError: undefined, + }); + }); +}); diff --git a/src/backup/status.ts b/src/backup/status.ts new file mode 100644 index 0000000..a9e1980 --- /dev/null +++ b/src/backup/status.ts @@ -0,0 +1,46 @@ +export interface BackupHealthSnapshot { + enabled: boolean; + hasRun: boolean; + consecutiveFailures: number; + lastSuccessAt?: number; + lastFailureAt?: number; + lastError?: string; +} + +let snapshot: BackupHealthSnapshot = { + enabled: false, + hasRun: false, + consecutiveFailures: 0, +}; + +export function initBackupHealth(enabled: boolean): void { + snapshot = { + enabled, + hasRun: false, + consecutiveFailures: 0, + }; +} + +export function markBackupSuccess(now = Date.now()): void { + snapshot = { + ...snapshot, + hasRun: true, + consecutiveFailures: 0, + lastSuccessAt: now, + lastError: undefined, + }; +} + +export function markBackupFailure(error: string, now = Date.now()): void { + snapshot = { + ...snapshot, + hasRun: true, + consecutiveFailures: snapshot.consecutiveFailures + 1, + lastFailureAt: now, + lastError: error, + }; +} + +export function getBackupHealthSnapshot(): BackupHealthSnapshot { + return { ...snapshot }; +} diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 92f414e..b59000b 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -962,6 +962,14 @@ describe('configSchema automation', () => { expect(result.automation.daily_briefing.prompt).toBe('Custom briefing prompt'); expect(result.automation.daily_briefing.model_tier).toBe('fast'); }); + + it('defaults heartbeat extended thresholds and checks', () => { + const result = configSchema.parse(baseConfig); + expect(result.automation.heartbeat.process_memory_threshold_mb).toBe(1500); + expect(result.automation.heartbeat.backup_failure_threshold).toBe(1); + expect(result.automation.heartbeat.checks).toContain('process_memory'); + expect(result.automation.heartbeat.checks).toContain('backup'); + }); }); describe('configSchema — intents', () => { diff --git a/src/config/schema.ts b/src/config/schema.ts index 17b8fed..93d19ea 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -302,18 +302,20 @@ const gmailSchema = z.object({ message: z.string().default('New email from {{from}}: {{subject}}\n\n{{snippet}}'), }).optional(); -const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk']); +const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk', 'process_memory', 'backup']); const heartbeatSchema = z.object({ enabled: z.boolean().default(false), interval: z.string().default('5m'), - checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']), + checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk', 'process_memory', 'backup']), notify: z.object({ channel: z.string().min(1), peer: z.string().min(1), }).optional(), failure_threshold: z.number().min(1).max(10).default(2), disk_threshold_mb: z.number().min(10).default(100), + process_memory_threshold_mb: z.number().min(64).default(1500), + backup_failure_threshold: z.number().min(1).max(10).default(1), }).default({}); const gcalSchema = z.object({