From 21c986b6717706c67be25cbcaa8297facf40673b Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 16 Feb 2026 14:25:18 -0800 Subject: [PATCH] feat(automation): add scheduled minio memory sync jobs --- README.md | 16 ++++ config/default.yaml | 17 ++++ docs/plans/state.json | 19 +++++ src/automation/index.ts | 2 + src/automation/minioSync.test.ts | 114 ++++++++++++++++++++++++++ src/automation/minioSync.ts | 133 +++++++++++++++++++++++++++++++ src/config/schema.test.ts | 41 ++++++++++ src/config/schema.ts | 25 ++++++ src/daemon/index.ts | 2 +- src/daemon/services.ts | 18 ++++- 10 files changed, 384 insertions(+), 3 deletions(-) create mode 100644 src/automation/minioSync.test.ts create mode 100644 src/automation/minioSync.ts diff --git a/README.md b/README.md index cfcb256..36b8707 100644 --- a/README.md +++ b/README.md @@ -560,6 +560,21 @@ automation: prompt: | Create my daily briefing. Summarize today's calendar, unread/important email, and top pending tasks. + + # Optional scheduled MinIO -> memory synchronization. + # Runs direct ingestion jobs without requiring interactive tool calls. + minio_sync: + enabled: true + interval: "6h" + run_on_start: false + notify_on_success: false + tasks: + - prefix: "knowledge/" + namespace_base: "global/knowledge/minio" + mode: append + max_objects: 20 + max_chars_per_object: 8000 + force: false ``` ### Cron Config Fields @@ -577,6 +592,7 @@ automation: | `model_tier` | no | Model tier for this job: `fast`, `default`, `complex`, or `local` | | `once_per_local_day` | no | If true, suppress duplicate triggers within the same local day (job timezone) | | `automation.daily_briefing.*` | no | Built-in daily briefing preset; generates an extra cron job when `enabled: true` and `output` is set | +| `automation.minio_sync.*` | no | Scheduled MinIO prefix ingestion into memory namespaces (direct daemon automation) | ## Backup Scheduling diff --git a/config/default.yaml b/config/default.yaml index 54a09ce..1e23003 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -261,6 +261,23 @@ hooks: # Create my daily briefing. # Summarize today's calendar, unread/important email, and top pending tasks. # +# # Optional scheduled MinIO -> memory synchronization +# minio_sync: +# enabled: false +# interval: "6h" +# run_on_start: false +# notify_on_success: false +# notify: +# channel: telegram +# peer: "123456789" +# tasks: +# - prefix: "knowledge/" +# namespace_base: "global/knowledge/minio" +# mode: append +# max_objects: 20 +# max_chars_per_object: 8000 +# force: false +# # webhooks: # - name: github-push # secret: "whsec_..." diff --git a/docs/plans/state.json b/docs/plans/state.json index 8c37843..5f88f62 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -99,6 +99,25 @@ ], "test_status": "pnpm test:run src/tools/builtin/minio-sync.test.ts src/tools/policy.test.ts + pnpm typecheck passing" }, + "automation-scheduled-minio-sync": { + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Added daemon-side scheduled MinIO sync automation (`automation.minio_sync`) with interval execution, optional startup run, multiple prefix tasks, and optional notification routing on failures/success. Wired scheduler lifecycle into service startup/shutdown and updated config/docs/tests.", + "files_modified": [ + "src/automation/minioSync.ts", + "src/automation/minioSync.test.ts", + "src/automation/index.ts", + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/daemon/services.ts", + "src/daemon/index.ts", + "config/default.yaml", + "README.md", + "docs/plans/state.json" + ], + "test_status": "pnpm test:run src/automation/minioSync.test.ts src/config/schema.test.ts + pnpm typecheck passing" + }, "backup-session-summary-audit-trail": { "status": "completed", "date": "2026-02-16", diff --git a/src/automation/index.ts b/src/automation/index.ts index 9d26f23..48267b9 100644 --- a/src/automation/index.ts +++ b/src/automation/index.ts @@ -4,3 +4,5 @@ export { GmailWatcher } from './gmail.js'; export { HeartbeatMonitor, parseInterval } from './heartbeat.js'; export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js'; export { buildPresetCronJobs } from './presets.js'; +export { MinioSyncScheduler } from './minioSync.js'; +export type { MinioSyncSchedulerDeps } from './minioSync.js'; diff --git a/src/automation/minioSync.test.ts b/src/automation/minioSync.test.ts new file mode 100644 index 0000000..0e2c46d --- /dev/null +++ b/src/automation/minioSync.test.ts @@ -0,0 +1,114 @@ +import { describe, expect, it, vi } from 'vitest'; +import type { BackupConfig, MinioSyncAutomationConfig } from '../config/schema.js'; +import type { MemoryStore } from '../memory/store.js'; +import { MinioSyncScheduler } from './minioSync.js'; + +function makeBackupConfig(): BackupConfig { + return { + enabled: true, + schedule: undefined, + interval: '24h', + run_on_start: false, + notify: undefined, + failure_threshold: 1, + notify_recovery: true, + local_dir: '~/.local/share/flynn/backups', + include_vectors: true, + minio: { + enabled: true, + endpoint: 'localhost:9000', + access_key: 'minio-admin', + secret_key: 'minio-secret', + bucket: 'flynn-knowledge', + prefix: 'flynn', + secure: false, + }, + }; +} + +function makeAutomationConfig(overrides?: Partial): MinioSyncAutomationConfig { + return { + enabled: true, + interval: '6h', + run_on_start: false, + tasks: [{ + prefix: 'knowledge/', + bucket: undefined, + namespace_base: 'global/knowledge/minio', + mode: 'append', + max_objects: 20, + max_chars_per_object: 8000, + force: false, + }], + notify: undefined, + notify_on_success: false, + ...overrides, + }; +} + +describe('MinioSyncScheduler', () => { + it('skips when memory store is unavailable', async () => { + const scheduler = new MinioSyncScheduler({ + config: makeAutomationConfig(), + backupConfig: makeBackupConfig(), + memoryStore: undefined, + channelLookup: { get: () => undefined }, + }); + + const result = await scheduler.runOnce(); + expect(result).toEqual({ succeeded: 0, failed: 0, details: [] }); + }); + + it('runs configured sync tasks and records success/failure counts', async () => { + const memoryStore = { write: vi.fn() } as unknown as MemoryStore; + const send = vi.fn(async () => undefined); + const scheduler = new MinioSyncScheduler({ + config: makeAutomationConfig({ + tasks: [ + { + prefix: 'knowledge/good/', + bucket: undefined, + namespace_base: 'global/knowledge/minio', + mode: 'append', + max_objects: 20, + max_chars_per_object: 8000, + force: false, + }, + { + prefix: 'knowledge/fail/', + bucket: undefined, + namespace_base: 'global/knowledge/minio', + mode: 'append', + max_objects: 20, + max_chars_per_object: 8000, + force: false, + }, + ], + notify: { channel: 'telegram', peer: '123' }, + }), + backupConfig: makeBackupConfig(), + memoryStore, + channelLookup: { get: () => ({ send }) }, + createSyncTool: () => ({ + name: 'minio.sync', + description: '', + inputSchema: { type: 'object', properties: {} }, + execute: async (rawArgs: unknown) => { + const prefix = (rawArgs as { prefix: string }).prefix; + if (prefix === 'knowledge/fail/') { + return { success: false, output: '', error: 'boom' }; + } + return { success: true, output: 'ok' }; + }, + }), + }); + + const result = await scheduler.runOnce(); + expect(result.succeeded).toBe(1); + expect(result.failed).toBe(1); + expect(result.details).toContain('OK knowledge/good/'); + expect(result.details).toContain('FAIL knowledge/fail/: boom'); + expect(send).toHaveBeenCalledTimes(1); + scheduler.stop(); + }); +}); diff --git a/src/automation/minioSync.ts b/src/automation/minioSync.ts new file mode 100644 index 0000000..e671b92 --- /dev/null +++ b/src/automation/minioSync.ts @@ -0,0 +1,133 @@ +import type { BackupConfig, MinioSyncAutomationConfig, MinioSyncTaskConfig } from '../config/schema.js'; +import type { MemoryStore } from '../memory/store.js'; +import type { OutboundMessage } from '../channels/types.js'; +import { parseInterval } from './heartbeat.js'; +import { createMinioSyncTool } from '../tools/builtin/minio-sync.js'; +import { auditLogger } from '../audit/index.js'; +import type { Tool } from '../tools/types.js'; + +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +export interface MinioSyncSchedulerDeps { + config: MinioSyncAutomationConfig; + backupConfig: BackupConfig; + memoryStore?: MemoryStore; + channelLookup: ChannelLookup; + createSyncTool?: (backupConfig: BackupConfig, memoryStore: MemoryStore) => Tool; +} + +export class MinioSyncScheduler { + private readonly deps: MinioSyncSchedulerDeps; + private timer: ReturnType | undefined; + private running = false; + + constructor(deps: MinioSyncSchedulerDeps) { + this.deps = deps; + } + + start(): void { + if (!this.deps.config.enabled) {return;} + if (!this.deps.memoryStore) { + console.warn('MinioSyncScheduler: automation.minio_sync is enabled but memory is disabled; skipping scheduler'); + return; + } + if (this.deps.config.tasks.length === 0) { + console.warn('MinioSyncScheduler: automation.minio_sync is enabled but no tasks are configured; skipping scheduler'); + return; + } + + const intervalMs = parseInterval(this.deps.config.interval); + console.log(`MinioSyncScheduler: starting (interval=${this.deps.config.interval}, tasks=${this.deps.config.tasks.length})`); + auditLogger?.systemStart('MinioSyncScheduler', { + interval: this.deps.config.interval, + tasks: this.deps.config.tasks.length, + }); + + this.timer = setInterval(() => { + this.runOnce().catch((err) => { + console.error('MinioSyncScheduler: unexpected error during sync cycle:', err); + }); + }, intervalMs); + + if (this.deps.config.run_on_start) { + this.runOnce().catch((err) => { + console.error('MinioSyncScheduler: unexpected error during startup sync:', err); + }); + } + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + auditLogger?.systemStop('MinioSyncScheduler'); + } + + async runOnce(): Promise<{ succeeded: number; failed: number; details: string[] }> { + if (!this.deps.memoryStore || this.running) { + return { succeeded: 0, failed: 0, details: [] }; + } + + this.running = true; + try { + const toolFactory = this.deps.createSyncTool ?? createMinioSyncTool; + const tool = toolFactory(this.deps.backupConfig, this.deps.memoryStore); + let succeeded = 0; + let failed = 0; + const details: string[] = []; + + for (const task of this.deps.config.tasks) { + const result = await tool.execute(this.toToolArgs(task)); + if (result.success) { + succeeded++; + details.push(`OK ${task.prefix}`); + } else { + failed++; + details.push(`FAIL ${task.prefix}: ${result.error ?? 'Unknown error'}`); + } + } + + if (failed > 0) { + await this.notify(`MinIO sync completed with failures.\n${details.join('\n')}`); + } else if (this.deps.config.notify_on_success) { + await this.notify(`MinIO sync completed successfully.\n${details.join('\n')}`); + } + + return { succeeded, failed, details }; + } finally { + this.running = false; + } + } + + private toToolArgs(task: MinioSyncTaskConfig): Record { + return { + prefix: task.prefix, + bucket: task.bucket, + namespace_base: task.namespace_base, + mode: task.mode, + max_objects: task.max_objects, + max_chars_per_object: task.max_chars_per_object, + force: task.force, + }; + } + + private async notify(text: string): Promise { + const notifyConfig = this.deps.config.notify; + if (!notifyConfig) {return;} + + const channel = this.deps.channelLookup.get(notifyConfig.channel); + if (!channel) { + console.warn(`MinioSyncScheduler: notification channel '${notifyConfig.channel}' not found`); + return; + } + + try { + await channel.send(notifyConfig.peer, { text }); + } catch (err) { + console.error('MinioSyncScheduler: failed to send notification:', err); + } + } +} diff --git a/src/config/schema.test.ts b/src/config/schema.test.ts index 10bb3d2..75f45ec 100644 --- a/src/config/schema.test.ts +++ b/src/config/schema.test.ts @@ -864,6 +864,9 @@ describe('configSchema automation', () => { expect(result.automation.daily_briefing.schedule).toBe('0 8 * * *'); expect(result.automation.daily_briefing.name).toBe('daily-briefing'); expect(result.automation.daily_briefing.dedupe_per_local_day).toBe(true); + expect(result.automation.minio_sync.enabled).toBe(false); + expect(result.automation.minio_sync.interval).toBe('6h'); + expect(result.automation.minio_sync.tasks).toEqual([]); }); it('accepts isolated automation delivery mode', () => { @@ -978,6 +981,44 @@ describe('configSchema automation', () => { expect(result.automation.heartbeat.checks).toContain('backup'); expect(result.automation.heartbeat.checks).toContain('provider_errors'); }); + + it('accepts scheduled minio sync automation config', () => { + const result = configSchema.parse({ + ...baseConfig, + automation: { + minio_sync: { + enabled: true, + interval: '3h', + run_on_start: true, + notify_on_success: true, + notify: { channel: 'telegram', peer: '123' }, + tasks: [{ + prefix: 'knowledge/', + namespace_base: 'global/knowledge/minio', + mode: 'replace', + max_objects: 50, + max_chars_per_object: 12000, + force: true, + }], + }, + }, + }); + + expect(result.automation.minio_sync.enabled).toBe(true); + expect(result.automation.minio_sync.interval).toBe('3h'); + expect(result.automation.minio_sync.run_on_start).toBe(true); + expect(result.automation.minio_sync.notify_on_success).toBe(true); + expect(result.automation.minio_sync.notify).toEqual({ channel: 'telegram', peer: '123' }); + expect(result.automation.minio_sync.tasks).toHaveLength(1); + expect(result.automation.minio_sync.tasks[0]).toMatchObject({ + prefix: 'knowledge/', + namespace_base: 'global/knowledge/minio', + mode: 'replace', + max_objects: 50, + max_chars_per_object: 12000, + force: true, + }); + }); }); describe('configSchema — intents', () => { diff --git a/src/config/schema.ts b/src/config/schema.ts index fbd2014..887da20 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -377,6 +377,28 @@ const dailyBriefingSchema = z.object({ model_tier: modelTierEnum.optional(), }).default({}); +const minioSyncTaskSchema = z.object({ + prefix: z.string().min(1, 'MinIO sync prefix is required'), + bucket: z.string().optional(), + namespace_base: z.string().min(1).default('global/knowledge/minio'), + mode: z.enum(['append', 'replace']).default('append'), + max_objects: z.number().min(1).max(500).default(20), + max_chars_per_object: z.number().min(1).max(1_000_000).default(8_000), + force: z.boolean().default(false), +}); + +const minioSyncAutomationSchema = z.object({ + enabled: z.boolean().default(false), + interval: z.string().default('6h'), + run_on_start: z.boolean().default(false), + tasks: z.array(minioSyncTaskSchema).default([]), + notify: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }).optional(), + notify_on_success: z.boolean().default(false), +}).default({}); + const automationDeliveryModeSchema = z.enum(['shared_session', 'isolated_job']); const automationSchema = z.object({ @@ -390,6 +412,7 @@ const automationSchema = z.object({ gdrive: gdriveSchema, gtasks: gtasksSchema, daily_briefing: dailyBriefingSchema, + minio_sync: minioSyncAutomationSchema, heartbeat: heartbeatSchema, }).default({}); @@ -860,6 +883,8 @@ export type GdocsConfig = z.infer; export type GdriveConfig = z.infer; export type GtasksConfig = z.infer; export type DailyBriefingConfig = z.infer; +export type MinioSyncTaskConfig = z.infer; +export type MinioSyncAutomationConfig = z.infer; export type AutomationDeliveryMode = z.infer; export type PairingCodeConfig = z.infer; export type LogLevel = z.infer; diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 76e9bee..4453010 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -200,7 +200,7 @@ export async function startDaemon(config: Config, options?: StartDaemonOptions): } // ── Lifecycle ── - await startServices({ config, lifecycle, channelRegistry, gateway, modelRouter, memoryDir, dataDir }); + await startServices({ config, lifecycle, channelRegistry, gateway, modelRouter, memoryStore, memoryDir, dataDir }); const backupScheduler = new BackupScheduler({ dataDir, backupConfig: config.backup, diff --git a/src/daemon/services.ts b/src/daemon/services.ts index 78193f9..dff1bbe 100644 --- a/src/daemon/services.ts +++ b/src/daemon/services.ts @@ -8,7 +8,7 @@ import { ModelRouter } from '../models/index.js'; import { SessionManager } from '../session/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, PairingManager, type PairingStore } from '../channels/index.js'; -import { HeartbeatMonitor } from '../automation/index.js'; +import { HeartbeatMonitor, MinioSyncScheduler } from '../automation/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, @@ -430,10 +430,11 @@ export async function startServices(deps: { channelRegistry: ChannelRegistry; gateway: GatewayServer; modelRouter: ModelRouter; + memoryStore?: MemoryStore; memoryDir: string; dataDir: string; }): Promise { - const { config, lifecycle, channelRegistry, gateway, modelRouter, memoryDir, dataDir } = deps; + const { config, lifecycle, channelRegistry, gateway, modelRouter, memoryStore, memoryDir, dataDir } = deps; // Register shutdown handler for channels lifecycle.onShutdown(async () => { @@ -489,6 +490,19 @@ export async function startServices(deps: { console.log('Heartbeat monitor stopped'); }); + const minioSyncScheduler = new MinioSyncScheduler({ + config: config.automation.minio_sync, + backupConfig: config.backup, + memoryStore, + channelLookup: channelRegistry, + }); + minioSyncScheduler.start(); + + lifecycle.onShutdown(async () => { + minioSyncScheduler.stop(); + console.log('MinIO sync scheduler stopped'); + }); + // Signal handlers const signalHandler = () => { lifecycle.shutdown().then(() => process.exit(0));