From 8684c3a07d038a63bd32e62519a9ae4058ed627c Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 16 Feb 2026 13:46:35 -0800 Subject: [PATCH] feat(backup): add scheduler alerts and recovery notifications --- README.md | 5 + config/default.yaml | 5 + docs/plans/state.json | 9 +- src/backup/index.ts | 1 + src/backup/scheduler.test.ts | 123 ++++++++++++++++++++++ src/backup/scheduler.ts | 146 ++++++++++++++++++++++++++ src/config/schema.test.ts | 9 ++ src/config/schema.ts | 6 ++ src/daemon/index.ts | 79 ++------------ src/gateway/handlers/services.test.ts | 25 +++++ src/gateway/handlers/services.ts | 13 +++ 11 files changed, 349 insertions(+), 72 deletions(-) create mode 100644 src/backup/scheduler.test.ts create mode 100644 src/backup/scheduler.ts diff --git a/README.md b/README.md index eb1efef..1558409 100644 --- a/README.md +++ b/README.md @@ -583,6 +583,11 @@ backup: schedule: "0 2 * * *" # Optional cron schedule (nightly 2 AM) interval: "24h" # Fallback when schedule is not set run_on_start: true # Also run once on daemon start + notify: + channel: telegram + peer: "123456789" + failure_threshold: 1 # Notify after this many consecutive failures + notify_recovery: true # Send a recovery message after failure clears local_dir: ~/.local/share/flynn/backups include_vectors: true minio: diff --git a/config/default.yaml b/config/default.yaml index 052df22..17493d1 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -308,6 +308,11 @@ hooks: # schedule: "0 2 * * *" # interval: "24h" # run_on_start: false +# notify: +# channel: telegram +# peer: "123456789" +# failure_threshold: 1 +# notify_recovery: true # local_dir: ~/.local/share/flynn/backups # include_vectors: true # minio: diff --git a/docs/plans/state.json b/docs/plans/state.json index 4c025d5..b1855ef 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -7,13 +7,16 @@ "status": "completed", "date": "2026-02-16", "updated": "2026-02-16", - "summary": "Added first-class automation presets and scheduling upgrades: `automation.daily_briefing` now auto-registers an opinionated cron job for morning briefings, and backup scheduling now supports cron expressions via `backup.schedule` plus optional `backup.run_on_start` while preserving interval fallback.", + "summary": "Added first-class automation presets and scheduling upgrades: `automation.daily_briefing` now auto-registers an opinionated cron job for morning briefings, and backup scheduling now supports cron expressions via `backup.schedule` plus optional `backup.run_on_start` while preserving interval fallback. Added `BackupScheduler` with `backup.notify` channel alerts, configurable `backup.failure_threshold`, and recovery notifications (`backup.notify_recovery`) so backup failures/recoveries proactively notify operators.", "files_modified": [ "src/config/schema.ts", "src/config/schema.test.ts", "src/automation/index.ts", "src/automation/presets.ts", "src/automation/presets.test.ts", + "src/backup/index.ts", + "src/backup/scheduler.ts", + "src/backup/scheduler.test.ts", "src/daemon/channels.ts", "src/daemon/channels.test.ts", "src/daemon/index.ts", @@ -22,7 +25,7 @@ "config/default.yaml", "README.md" ], - "test_status": "pnpm test:run src/automation/presets.test.ts src/config/schema.test.ts src/daemon/channels.test.ts src/gateway/handlers/services.test.ts + pnpm typecheck passing" + "test_status": "pnpm test:run src/automation/presets.test.ts src/backup/scheduler.test.ts src/config/schema.test.ts src/daemon/channels.test.ts src/gateway/handlers/services.test.ts + pnpm typecheck passing" }, "backup-session-summary-audit-trail": { "status": "completed", @@ -3308,7 +3311,7 @@ } }, "overall_progress": { - "total_test_count": 1822, + "total_test_count": 1825, "all_tests_passing": true, "p0_completion": "3/3 (100%)", "p1_completion": "4/4 (100%)", diff --git a/src/backup/index.ts b/src/backup/index.ts index e0d44a3..49cf481 100644 --- a/src/backup/index.ts +++ b/src/backup/index.ts @@ -1 +1,2 @@ export { runBackupSnapshot, backupInternals, type BackupRunOptions, type BackupResult } from './run.js'; +export { BackupScheduler, type BackupSchedulerDeps } from './scheduler.js'; diff --git a/src/backup/scheduler.test.ts b/src/backup/scheduler.test.ts new file mode 100644 index 0000000..9d2c3a2 --- /dev/null +++ b/src/backup/scheduler.test.ts @@ -0,0 +1,123 @@ +import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { BackupConfig } from '../config/schema.js'; +import { BackupScheduler } from './scheduler.js'; + +function makeBackupConfig(overrides: Partial = {}): BackupConfig { + return { + enabled: true, + schedule: undefined, + interval: '1h', + run_on_start: false, + local_dir: '/tmp', + include_vectors: true, + minio: { + enabled: false, + endpoint: undefined, + access_key: undefined, + secret_key: undefined, + bucket: undefined, + prefix: 'flynn', + secure: true, + }, + failure_threshold: 2, + notify_recovery: true, + notify: undefined, + ...overrides, + }; +} + +describe('BackupScheduler', () => { + afterEach(() => { + vi.useRealTimers(); + vi.restoreAllMocks(); + }); + + it('runs on interval and avoids overlapping executions', async () => { + vi.useFakeTimers(); + let runCount = 0; + let release: (() => void) | undefined; + const runSnapshot = vi.fn(async () => { + runCount += 1; + await new Promise((resolve) => { + release = resolve; + }); + return { archivePath: '/tmp/a.tar.gz', fileName: 'a.tar.gz', uploaded: false }; + }); + + const scheduler = new BackupScheduler({ + dataDir: '/data', + backupConfig: makeBackupConfig({ interval: '1h' }), + runSnapshot, + }); + + scheduler.start(); + await vi.advanceTimersByTimeAsync(3_600_000); + await vi.advanceTimersByTimeAsync(10_800_000); + expect(runCount).toBe(1); + + release?.(); + await vi.runOnlyPendingTimersAsync(); + await vi.advanceTimersByTimeAsync(3_600_000); + expect(runCount).toBeGreaterThanOrEqual(2); + scheduler.stop(); + }); + + it('notifies after failure threshold and sends recovery notification', async () => { + vi.useFakeTimers(); + const send = vi.fn<(peerId: string, message: { text: string }) => Promise>(async () => {}); + const channelLookup = { + get: vi.fn(() => ({ send })), + }; + + let attempts = 0; + const runSnapshot = vi.fn(async () => { + attempts += 1; + if (attempts <= 2) { + throw new Error('minio upload failed'); + } + return { archivePath: '/tmp/ok.tar.gz', fileName: 'ok.tar.gz', uploaded: false }; + }); + + const scheduler = new BackupScheduler({ + dataDir: '/data', + backupConfig: makeBackupConfig({ + interval: '1h', + failure_threshold: 2, + notify: { channel: 'telegram', peer: '123' }, + notify_recovery: true, + }), + runSnapshot, + channelLookup, + }); + + scheduler.start(); + await vi.advanceTimersByTimeAsync(3_600_000); + await vi.advanceTimersByTimeAsync(3_600_000); + expect(send).toHaveBeenCalledTimes(1); + const failureCall = send.mock.calls.at(0); + expect(failureCall?.[1].text).toContain('Backup FAILING'); + + await vi.advanceTimersByTimeAsync(3_600_000); + expect(send).toHaveBeenCalledTimes(2); + const recoveryCall = send.mock.calls.at(1); + expect(recoveryCall?.[1].text).toContain('Backup RECOVERED'); + scheduler.stop(); + }); + + it('runs immediately when run_on_start is enabled', async () => { + const runSnapshot = vi.fn(async () => { + return { archivePath: '/tmp/start.tar.gz', fileName: 'start.tar.gz', uploaded: false }; + }); + + const scheduler = new BackupScheduler({ + dataDir: '/data', + backupConfig: makeBackupConfig({ run_on_start: true, interval: '1h' }), + runSnapshot, + }); + + scheduler.start(); + await Promise.resolve(); + expect(runSnapshot).toHaveBeenCalledTimes(1); + scheduler.stop(); + }); +}); diff --git a/src/backup/scheduler.ts b/src/backup/scheduler.ts new file mode 100644 index 0000000..e7a93e0 --- /dev/null +++ b/src/backup/scheduler.ts @@ -0,0 +1,146 @@ +import { Cron } from 'croner'; +import type { BackupConfig } from '../config/schema.js'; +import type { OutboundMessage } from '../channels/types.js'; +import { parseDuration } from '../session/index.js'; +import { runBackupSnapshot } from './run.js'; + +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +export interface BackupSchedulerDeps { + dataDir: string; + backupConfig: BackupConfig; + channelLookup?: ChannelLookup; + runSnapshot?: typeof runBackupSnapshot; +} + +export class BackupScheduler { + private cronJob: Cron | undefined; + private intervalJob: ReturnType | undefined; + private running = false; + private consecutiveFailures = 0; + private notifiedFailure = false; + private readonly deps: BackupSchedulerDeps; + + constructor(deps: BackupSchedulerDeps) { + this.deps = deps; + } + + start(): void { + if (!this.deps.backupConfig.enabled) { + return; + } + + const backupSchedule = this.deps.backupConfig.schedule?.trim(); + const backupIntervalMs = parseDuration(this.deps.backupConfig.interval); + + if (!backupSchedule && !backupIntervalMs) { + console.warn(`Backup enabled but interval is invalid: ${this.deps.backupConfig.interval}`); + return; + } + + if (backupSchedule) { + try { + this.cronJob = new Cron(backupSchedule, { paused: false }, () => { + void this.runScheduledBackup(); + }); + console.log(`Backup scheduler enabled (cron: ${backupSchedule})`); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.warn(`Backup cron schedule is invalid (${backupSchedule}): ${message}`); + } + } + + if (!this.cronJob && backupIntervalMs) { + this.intervalJob = setInterval(() => { + void this.runScheduledBackup(); + }, backupIntervalMs); + console.log(`Backup scheduler enabled (interval: ${this.deps.backupConfig.interval})`); + } + + if (!this.cronJob && !this.intervalJob) { + console.warn('Backup scheduler disabled: no valid backup.schedule or backup.interval'); + return; + } + + if (this.deps.backupConfig.run_on_start) { + void this.runScheduledBackup(); + } + } + + stop(): void { + if (this.cronJob) { + this.cronJob.stop(); + this.cronJob = undefined; + } + if (this.intervalJob) { + clearInterval(this.intervalJob); + this.intervalJob = undefined; + } + } + + private async runScheduledBackup(): Promise { + if (this.running) { + return; + } + this.running = true; + try { + const runner = this.deps.runSnapshot ?? runBackupSnapshot; + const result = await runner({ + dataDir: this.deps.dataDir, + backupConfig: this.deps.backupConfig, + }); + console.log(`Backup completed: ${result.archivePath}${result.uploaded && result.remotePath ? ` -> ${result.remotePath}` : ''}`); + await this.handleSuccess(); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(`Backup failed: ${message}`); + await this.handleFailure(message); + } finally { + this.running = false; + } + } + + private async handleFailure(message: string): Promise { + this.consecutiveFailures += 1; + const threshold = this.deps.backupConfig.failure_threshold; + if (this.consecutiveFailures < threshold || this.notifiedFailure) { + return; + } + + this.notifiedFailure = true; + await this.notify( + `Backup FAILING (${this.consecutiveFailures} consecutive failures).\nError: ${message}`, + ); + } + + private async handleSuccess(): Promise { + if (this.notifiedFailure && this.deps.backupConfig.notify_recovery) { + await this.notify( + `Backup RECOVERED after ${this.consecutiveFailures} consecutive failure(s).`, + ); + } + this.consecutiveFailures = 0; + this.notifiedFailure = false; + } + + private async notify(text: string): Promise { + const notifyConfig = this.deps.backupConfig.notify; + if (!notifyConfig || !this.deps.channelLookup) { + return; + } + + const adapter = this.deps.channelLookup.get(notifyConfig.channel); + if (!adapter) { + console.warn(`BackupScheduler: notification channel '${notifyConfig.channel}' not found`); + return; + } + + try { + await adapter.send(notifyConfig.peer, { text }); + } catch (err) { + console.error('BackupScheduler: failed to send notification:', err); + } + } +} diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 5ca1cd9..92f414e 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -208,6 +208,9 @@ describe('configSchema — backup', () => { expect(result.backup.schedule).toBeUndefined(); expect(result.backup.interval).toBe('24h'); expect(result.backup.run_on_start).toBe(false); + expect(result.backup.notify).toBeUndefined(); + expect(result.backup.failure_threshold).toBe(1); + expect(result.backup.notify_recovery).toBe(true); expect(result.backup.include_vectors).toBe(true); expect(result.backup.minio.enabled).toBe(false); expect(result.backup.minio.prefix).toBe('flynn'); @@ -222,6 +225,9 @@ describe('configSchema — backup', () => { schedule: '0 2 * * *', interval: '12h', run_on_start: true, + notify: { channel: 'telegram', peer: '123' }, + failure_threshold: 3, + notify_recovery: false, local_dir: '/tmp/flynn-backups', include_vectors: false, minio: { @@ -240,6 +246,9 @@ describe('configSchema — backup', () => { expect(result.backup.schedule).toBe('0 2 * * *'); expect(result.backup.interval).toBe('12h'); expect(result.backup.run_on_start).toBe(true); + expect(result.backup.notify).toEqual({ channel: 'telegram', peer: '123' }); + expect(result.backup.failure_threshold).toBe(3); + expect(result.backup.notify_recovery).toBe(false); expect(result.backup.local_dir).toBe('/tmp/flynn-backups'); expect(result.backup.include_vectors).toBe(false); expect(result.backup.minio.enabled).toBe(true); diff --git a/src/config/schema.ts b/src/config/schema.ts index 58e3372..17b8fed 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -717,6 +717,12 @@ const backupSchema = z.object({ schedule: z.string().optional(), interval: z.string().default('24h'), run_on_start: z.boolean().default(false), + notify: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }).optional(), + failure_threshold: z.number().min(1).max(10).default(1), + notify_recovery: z.boolean().default(true), local_dir: z.string().default('~/.local/share/flynn/backups'), include_vectors: z.boolean().default(true), minio: z.object({ diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 2d36f75..79a5f80 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -2,7 +2,6 @@ import { resolve } from 'path'; import { homedir } from 'os'; import { mkdirSync } from 'fs'; -import { Cron } from 'croner'; // ── Config & Types ── import type { Config } from '../config/index.js'; @@ -34,7 +33,7 @@ import type { McpManager } from '../mcp/index.js'; import type { SkillRegistry, SkillInstaller } from '../skills/index.js'; import type { GatewayServer } from '../gateway/index.js'; import { AuditLogger, initAuditLogger } from '../audit/index.js'; -import { runBackupSnapshot } from '../backup/index.js'; +import { BackupScheduler } from '../backup/index.js'; export interface DaemonContext { config: Config; @@ -105,73 +104,6 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions): lifecycle.onShutdown(async () => { clearInterval(pruneInterval); }); } - if (config.backup.enabled) { - const backupIntervalMs = parseDuration(config.backup.interval); - const backupSchedule = config.backup.schedule?.trim(); - if (!backupSchedule && !backupIntervalMs) { - console.warn(`Backup enabled but interval is invalid: ${config.backup.interval}`); - } else { - let backupRunning = false; - const runScheduledBackup = async (): Promise => { - if (backupRunning) { - return; - } - backupRunning = true; - try { - const result = await runBackupSnapshot({ - dataDir, - backupConfig: config.backup, - }); - console.log(`Backup completed: ${result.archivePath}${result.uploaded && result.remotePath ? ` -> ${result.remotePath}` : ''}`); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - console.error(`Backup failed: ${message}`); - } finally { - backupRunning = false; - } - }; - - let backupCron: Cron | undefined; - let backupInterval: ReturnType | undefined; - - if (backupSchedule) { - try { - backupCron = new Cron(backupSchedule, { paused: false }, () => { - void runScheduledBackup(); - }); - console.log(`Backup scheduler enabled (cron: ${backupSchedule})`); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - console.warn(`Backup cron schedule is invalid (${backupSchedule}): ${message}`); - } - } - - if (!backupCron && backupIntervalMs) { - backupInterval = setInterval(() => { - void runScheduledBackup(); - }, backupIntervalMs); - console.log(`Backup scheduler enabled (interval: ${config.backup.interval})`); - } - - if (!backupCron && !backupInterval) { - console.warn('Backup scheduler disabled: no valid backup.schedule or backup.interval'); - } else { - if (config.backup.run_on_start) { - void runScheduledBackup(); - } - - lifecycle.onShutdown(async () => { - if (backupCron) { - backupCron.stop(); - } - if (backupInterval) { - clearInterval(backupInterval); - } - }); - } - } - } - // ── Core Services ── const hookEngine = new HookEngine(config.hooks); const { toolRegistry, toolExecutor, browserManager } = initTools({ config, lifecycle, hookEngine }); @@ -256,6 +188,15 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions): // ── Lifecycle ── await startServices({ config, lifecycle, channelRegistry, gateway, modelRouter, memoryDir, dataDir }); + const backupScheduler = new BackupScheduler({ + dataDir, + backupConfig: config.backup, + channelLookup: channelRegistry, + }); + backupScheduler.start(); + lifecycle.onShutdown(async () => { + backupScheduler.stop(); + }); return { config, lifecycle, sessionStore, sessionManager, hookEngine, modelRouter, diff --git a/src/gateway/handlers/services.test.ts b/src/gateway/handlers/services.test.ts index ebcbdd5..702bb33 100644 --- a/src/gateway/handlers/services.test.ts +++ b/src/gateway/handlers/services.test.ts @@ -15,6 +15,26 @@ function makeBaseConfig(): Config { backends: { native: { enabled: true }, opencode: { enabled: false }, claude_code: { enabled: false } }, hooks: { confirm: [], log: [], silent: [] }, mcp: { servers: [] }, + backup: { + enabled: false, + schedule: undefined, + interval: '24h', + run_on_start: false, + notify: undefined, + failure_threshold: 1, + notify_recovery: true, + local_dir: '~/.local/share/flynn/backups', + include_vectors: true, + minio: { + enabled: false, + endpoint: undefined, + access_key: undefined, + secret_key: undefined, + bucket: undefined, + prefix: 'flynn', + secure: true, + }, + }, automation: { cron: [], webhooks: [], @@ -61,6 +81,7 @@ describe('discoverServices', () => { expect.objectContaining({ name: 'zalo', status: 'not_configured' }), expect.objectContaining({ name: 'cron', status: 'not_configured' }), expect.objectContaining({ name: 'daily_briefing', status: 'not_configured' }), + expect.objectContaining({ name: 'backup', status: 'not_configured' }), expect.objectContaining({ name: 'mcp', status: 'not_configured' }), expect.objectContaining({ name: 'web_search', status: 'configured' }), expect.objectContaining({ name: 'audio_transcription', status: 'not_configured' }), @@ -105,6 +126,8 @@ describe('discoverServices', () => { output: { channel: 'webchat', peer: 'x' }, }; cfg.mcp.servers = [{ name: 'srv', command: 'x', args: [] }]; + cfg.backup.enabled = true; + cfg.backup.schedule = '0 2 * * *'; const reg = new ChannelRegistry(); const services = discoverServices(cfg, reg); @@ -112,6 +135,8 @@ describe('discoverServices', () => { expect(services.find(s => s.name === 'cron')?.status).toBe('configured'); expect(services.find(s => s.name === 'cron')?.itemCount).toBe(2); expect(services.find(s => s.name === 'daily_briefing')?.status).toBe('configured'); + expect(services.find(s => s.name === 'backup')?.status).toBe('configured'); + expect(services.find(s => s.name === 'backup')?.metadata).toMatchObject({ schedule: '0 2 * * *' }); expect(services.find(s => s.name === 'mcp')?.metadata).toEqual({ serverCount: 1 }); }); diff --git a/src/gateway/handlers/services.ts b/src/gateway/handlers/services.ts index 93ae997..192716d 100644 --- a/src/gateway/handlers/services.ts +++ b/src/gateway/handlers/services.ts @@ -121,6 +121,19 @@ export function discoverServices( }, }); + services.push({ + name: 'backup', + type: 'automation', + status: config.backup?.enabled ? 'configured' : 'not_configured', + description: 'Snapshot backup scheduler', + metadata: { + schedule: config.backup?.schedule, + interval: config.backup?.interval, + run_on_start: config.backup?.run_on_start, + minio_enabled: config.backup?.minio?.enabled ?? false, + }, + }); + const automation = config.automation; const dailyBriefingEnabled = Boolean(automation.daily_briefing?.enabled && automation.daily_briefing.output); const totalCronJobs = automation.cron.length + (dailyBriefingEnabled ? 1 : 0);