feat(backup): add scheduler alerts and recovery notifications

This commit is contained in:
William Valentin
2026-02-16 13:46:35 -08:00
parent ce621d1b72
commit 8684c3a07d
11 changed files with 349 additions and 72 deletions
+1
View File
@@ -1 +1,2 @@
export { runBackupSnapshot, backupInternals, type BackupRunOptions, type BackupResult } from './run.js';
export { BackupScheduler, type BackupSchedulerDeps } from './scheduler.js';
+123
View File
@@ -0,0 +1,123 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { BackupConfig } from '../config/schema.js';
import { BackupScheduler } from './scheduler.js';
function makeBackupConfig(overrides: Partial<BackupConfig> = {}): BackupConfig {
return {
enabled: true,
schedule: undefined,
interval: '1h',
run_on_start: false,
local_dir: '/tmp',
include_vectors: true,
minio: {
enabled: false,
endpoint: undefined,
access_key: undefined,
secret_key: undefined,
bucket: undefined,
prefix: 'flynn',
secure: true,
},
failure_threshold: 2,
notify_recovery: true,
notify: undefined,
...overrides,
};
}
describe('BackupScheduler', () => {
afterEach(() => {
vi.useRealTimers();
vi.restoreAllMocks();
});
it('runs on interval and avoids overlapping executions', async () => {
vi.useFakeTimers();
let runCount = 0;
let release: (() => void) | undefined;
const runSnapshot = vi.fn(async () => {
runCount += 1;
await new Promise<void>((resolve) => {
release = resolve;
});
return { archivePath: '/tmp/a.tar.gz', fileName: 'a.tar.gz', uploaded: false };
});
const scheduler = new BackupScheduler({
dataDir: '/data',
backupConfig: makeBackupConfig({ interval: '1h' }),
runSnapshot,
});
scheduler.start();
await vi.advanceTimersByTimeAsync(3_600_000);
await vi.advanceTimersByTimeAsync(10_800_000);
expect(runCount).toBe(1);
release?.();
await vi.runOnlyPendingTimersAsync();
await vi.advanceTimersByTimeAsync(3_600_000);
expect(runCount).toBeGreaterThanOrEqual(2);
scheduler.stop();
});
it('notifies after failure threshold and sends recovery notification', async () => {
vi.useFakeTimers();
const send = vi.fn<(peerId: string, message: { text: string }) => Promise<void>>(async () => {});
const channelLookup = {
get: vi.fn(() => ({ send })),
};
let attempts = 0;
const runSnapshot = vi.fn(async () => {
attempts += 1;
if (attempts <= 2) {
throw new Error('minio upload failed');
}
return { archivePath: '/tmp/ok.tar.gz', fileName: 'ok.tar.gz', uploaded: false };
});
const scheduler = new BackupScheduler({
dataDir: '/data',
backupConfig: makeBackupConfig({
interval: '1h',
failure_threshold: 2,
notify: { channel: 'telegram', peer: '123' },
notify_recovery: true,
}),
runSnapshot,
channelLookup,
});
scheduler.start();
await vi.advanceTimersByTimeAsync(3_600_000);
await vi.advanceTimersByTimeAsync(3_600_000);
expect(send).toHaveBeenCalledTimes(1);
const failureCall = send.mock.calls.at(0);
expect(failureCall?.[1].text).toContain('Backup FAILING');
await vi.advanceTimersByTimeAsync(3_600_000);
expect(send).toHaveBeenCalledTimes(2);
const recoveryCall = send.mock.calls.at(1);
expect(recoveryCall?.[1].text).toContain('Backup RECOVERED');
scheduler.stop();
});
it('runs immediately when run_on_start is enabled', async () => {
const runSnapshot = vi.fn(async () => {
return { archivePath: '/tmp/start.tar.gz', fileName: 'start.tar.gz', uploaded: false };
});
const scheduler = new BackupScheduler({
dataDir: '/data',
backupConfig: makeBackupConfig({ run_on_start: true, interval: '1h' }),
runSnapshot,
});
scheduler.start();
await Promise.resolve();
expect(runSnapshot).toHaveBeenCalledTimes(1);
scheduler.stop();
});
});
+146
View File
@@ -0,0 +1,146 @@
import { Cron } from 'croner';
import type { BackupConfig } from '../config/schema.js';
import type { OutboundMessage } from '../channels/types.js';
import { parseDuration } from '../session/index.js';
import { runBackupSnapshot } from './run.js';
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
export interface BackupSchedulerDeps {
dataDir: string;
backupConfig: BackupConfig;
channelLookup?: ChannelLookup;
runSnapshot?: typeof runBackupSnapshot;
}
export class BackupScheduler {
private cronJob: Cron | undefined;
private intervalJob: ReturnType<typeof setInterval> | undefined;
private running = false;
private consecutiveFailures = 0;
private notifiedFailure = false;
private readonly deps: BackupSchedulerDeps;
constructor(deps: BackupSchedulerDeps) {
this.deps = deps;
}
start(): void {
if (!this.deps.backupConfig.enabled) {
return;
}
const backupSchedule = this.deps.backupConfig.schedule?.trim();
const backupIntervalMs = parseDuration(this.deps.backupConfig.interval);
if (!backupSchedule && !backupIntervalMs) {
console.warn(`Backup enabled but interval is invalid: ${this.deps.backupConfig.interval}`);
return;
}
if (backupSchedule) {
try {
this.cronJob = new Cron(backupSchedule, { paused: false }, () => {
void this.runScheduledBackup();
});
console.log(`Backup scheduler enabled (cron: ${backupSchedule})`);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.warn(`Backup cron schedule is invalid (${backupSchedule}): ${message}`);
}
}
if (!this.cronJob && backupIntervalMs) {
this.intervalJob = setInterval(() => {
void this.runScheduledBackup();
}, backupIntervalMs);
console.log(`Backup scheduler enabled (interval: ${this.deps.backupConfig.interval})`);
}
if (!this.cronJob && !this.intervalJob) {
console.warn('Backup scheduler disabled: no valid backup.schedule or backup.interval');
return;
}
if (this.deps.backupConfig.run_on_start) {
void this.runScheduledBackup();
}
}
stop(): void {
if (this.cronJob) {
this.cronJob.stop();
this.cronJob = undefined;
}
if (this.intervalJob) {
clearInterval(this.intervalJob);
this.intervalJob = undefined;
}
}
private async runScheduledBackup(): Promise<void> {
if (this.running) {
return;
}
this.running = true;
try {
const runner = this.deps.runSnapshot ?? runBackupSnapshot;
const result = await runner({
dataDir: this.deps.dataDir,
backupConfig: this.deps.backupConfig,
});
console.log(`Backup completed: ${result.archivePath}${result.uploaded && result.remotePath ? ` -> ${result.remotePath}` : ''}`);
await this.handleSuccess();
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
console.error(`Backup failed: ${message}`);
await this.handleFailure(message);
} finally {
this.running = false;
}
}
private async handleFailure(message: string): Promise<void> {
this.consecutiveFailures += 1;
const threshold = this.deps.backupConfig.failure_threshold;
if (this.consecutiveFailures < threshold || this.notifiedFailure) {
return;
}
this.notifiedFailure = true;
await this.notify(
`Backup FAILING (${this.consecutiveFailures} consecutive failures).\nError: ${message}`,
);
}
private async handleSuccess(): Promise<void> {
if (this.notifiedFailure && this.deps.backupConfig.notify_recovery) {
await this.notify(
`Backup RECOVERED after ${this.consecutiveFailures} consecutive failure(s).`,
);
}
this.consecutiveFailures = 0;
this.notifiedFailure = false;
}
private async notify(text: string): Promise<void> {
const notifyConfig = this.deps.backupConfig.notify;
if (!notifyConfig || !this.deps.channelLookup) {
return;
}
const adapter = this.deps.channelLookup.get(notifyConfig.channel);
if (!adapter) {
console.warn(`BackupScheduler: notification channel '${notifyConfig.channel}' not found`);
return;
}
try {
await adapter.send(notifyConfig.peer, { text });
} catch (err) {
console.error('BackupScheduler: failed to send notification:', err);
}
}
}