feat(automation): add scheduled minio memory sync jobs

This commit is contained in:
William Valentin
2026-02-16 14:25:18 -08:00
parent 22f225998f
commit 21c986b671
10 changed files with 384 additions and 3 deletions
+2
View File
@@ -4,3 +4,5 @@ export { GmailWatcher } from './gmail.js';
export { HeartbeatMonitor, parseInterval } from './heartbeat.js';
export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js';
export { buildPresetCronJobs } from './presets.js';
export { MinioSyncScheduler } from './minioSync.js';
export type { MinioSyncSchedulerDeps } from './minioSync.js';
+114
View File
@@ -0,0 +1,114 @@
import { describe, expect, it, vi } from 'vitest';
import type { BackupConfig, MinioSyncAutomationConfig } from '../config/schema.js';
import type { MemoryStore } from '../memory/store.js';
import { MinioSyncScheduler } from './minioSync.js';
function makeBackupConfig(): BackupConfig {
return {
enabled: true,
schedule: undefined,
interval: '24h',
run_on_start: false,
notify: undefined,
failure_threshold: 1,
notify_recovery: true,
local_dir: '~/.local/share/flynn/backups',
include_vectors: true,
minio: {
enabled: true,
endpoint: 'localhost:9000',
access_key: 'minio-admin',
secret_key: 'minio-secret',
bucket: 'flynn-knowledge',
prefix: 'flynn',
secure: false,
},
};
}
function makeAutomationConfig(overrides?: Partial<MinioSyncAutomationConfig>): MinioSyncAutomationConfig {
return {
enabled: true,
interval: '6h',
run_on_start: false,
tasks: [{
prefix: 'knowledge/',
bucket: undefined,
namespace_base: 'global/knowledge/minio',
mode: 'append',
max_objects: 20,
max_chars_per_object: 8000,
force: false,
}],
notify: undefined,
notify_on_success: false,
...overrides,
};
}
describe('MinioSyncScheduler', () => {
it('skips when memory store is unavailable', async () => {
const scheduler = new MinioSyncScheduler({
config: makeAutomationConfig(),
backupConfig: makeBackupConfig(),
memoryStore: undefined,
channelLookup: { get: () => undefined },
});
const result = await scheduler.runOnce();
expect(result).toEqual({ succeeded: 0, failed: 0, details: [] });
});
it('runs configured sync tasks and records success/failure counts', async () => {
const memoryStore = { write: vi.fn() } as unknown as MemoryStore;
const send = vi.fn(async () => undefined);
const scheduler = new MinioSyncScheduler({
config: makeAutomationConfig({
tasks: [
{
prefix: 'knowledge/good/',
bucket: undefined,
namespace_base: 'global/knowledge/minio',
mode: 'append',
max_objects: 20,
max_chars_per_object: 8000,
force: false,
},
{
prefix: 'knowledge/fail/',
bucket: undefined,
namespace_base: 'global/knowledge/minio',
mode: 'append',
max_objects: 20,
max_chars_per_object: 8000,
force: false,
},
],
notify: { channel: 'telegram', peer: '123' },
}),
backupConfig: makeBackupConfig(),
memoryStore,
channelLookup: { get: () => ({ send }) },
createSyncTool: () => ({
name: 'minio.sync',
description: '',
inputSchema: { type: 'object', properties: {} },
execute: async (rawArgs: unknown) => {
const prefix = (rawArgs as { prefix: string }).prefix;
if (prefix === 'knowledge/fail/') {
return { success: false, output: '', error: 'boom' };
}
return { success: true, output: 'ok' };
},
}),
});
const result = await scheduler.runOnce();
expect(result.succeeded).toBe(1);
expect(result.failed).toBe(1);
expect(result.details).toContain('OK knowledge/good/');
expect(result.details).toContain('FAIL knowledge/fail/: boom');
expect(send).toHaveBeenCalledTimes(1);
scheduler.stop();
});
});
+133
View File
@@ -0,0 +1,133 @@
import type { BackupConfig, MinioSyncAutomationConfig, MinioSyncTaskConfig } from '../config/schema.js';
import type { MemoryStore } from '../memory/store.js';
import type { OutboundMessage } from '../channels/types.js';
import { parseInterval } from './heartbeat.js';
import { createMinioSyncTool } from '../tools/builtin/minio-sync.js';
import { auditLogger } from '../audit/index.js';
import type { Tool } from '../tools/types.js';
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
export interface MinioSyncSchedulerDeps {
config: MinioSyncAutomationConfig;
backupConfig: BackupConfig;
memoryStore?: MemoryStore;
channelLookup: ChannelLookup;
createSyncTool?: (backupConfig: BackupConfig, memoryStore: MemoryStore) => Tool;
}
export class MinioSyncScheduler {
private readonly deps: MinioSyncSchedulerDeps;
private timer: ReturnType<typeof setInterval> | undefined;
private running = false;
constructor(deps: MinioSyncSchedulerDeps) {
this.deps = deps;
}
start(): void {
if (!this.deps.config.enabled) {return;}
if (!this.deps.memoryStore) {
console.warn('MinioSyncScheduler: automation.minio_sync is enabled but memory is disabled; skipping scheduler');
return;
}
if (this.deps.config.tasks.length === 0) {
console.warn('MinioSyncScheduler: automation.minio_sync is enabled but no tasks are configured; skipping scheduler');
return;
}
const intervalMs = parseInterval(this.deps.config.interval);
console.log(`MinioSyncScheduler: starting (interval=${this.deps.config.interval}, tasks=${this.deps.config.tasks.length})`);
auditLogger?.systemStart('MinioSyncScheduler', {
interval: this.deps.config.interval,
tasks: this.deps.config.tasks.length,
});
this.timer = setInterval(() => {
this.runOnce().catch((err) => {
console.error('MinioSyncScheduler: unexpected error during sync cycle:', err);
});
}, intervalMs);
if (this.deps.config.run_on_start) {
this.runOnce().catch((err) => {
console.error('MinioSyncScheduler: unexpected error during startup sync:', err);
});
}
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = undefined;
}
auditLogger?.systemStop('MinioSyncScheduler');
}
async runOnce(): Promise<{ succeeded: number; failed: number; details: string[] }> {
if (!this.deps.memoryStore || this.running) {
return { succeeded: 0, failed: 0, details: [] };
}
this.running = true;
try {
const toolFactory = this.deps.createSyncTool ?? createMinioSyncTool;
const tool = toolFactory(this.deps.backupConfig, this.deps.memoryStore);
let succeeded = 0;
let failed = 0;
const details: string[] = [];
for (const task of this.deps.config.tasks) {
const result = await tool.execute(this.toToolArgs(task));
if (result.success) {
succeeded++;
details.push(`OK ${task.prefix}`);
} else {
failed++;
details.push(`FAIL ${task.prefix}: ${result.error ?? 'Unknown error'}`);
}
}
if (failed > 0) {
await this.notify(`MinIO sync completed with failures.\n${details.join('\n')}`);
} else if (this.deps.config.notify_on_success) {
await this.notify(`MinIO sync completed successfully.\n${details.join('\n')}`);
}
return { succeeded, failed, details };
} finally {
this.running = false;
}
}
private toToolArgs(task: MinioSyncTaskConfig): Record<string, unknown> {
return {
prefix: task.prefix,
bucket: task.bucket,
namespace_base: task.namespace_base,
mode: task.mode,
max_objects: task.max_objects,
max_chars_per_object: task.max_chars_per_object,
force: task.force,
};
}
private async notify(text: string): Promise<void> {
const notifyConfig = this.deps.config.notify;
if (!notifyConfig) {return;}
const channel = this.deps.channelLookup.get(notifyConfig.channel);
if (!channel) {
console.warn(`MinioSyncScheduler: notification channel '${notifyConfig.channel}' not found`);
return;
}
try {
await channel.send(notifyConfig.peer, { text });
} catch (err) {
console.error('MinioSyncScheduler: failed to send notification:', err);
}
}
}