import { promisify } from 'node:util'; import { execFile } from 'node:child_process'; import type { BackupConfig } from '../../config/schema.js'; import type { MemoryStore } from '../../memory/store.js'; import type { Tool, ToolResult } from '../types.js'; import { backupInternals } from '../../backup/index.js'; import { minioIngestInternals } from './minio-ingest.js'; const execFileAsync = promisify(execFile); type ExecRunner = ( file: string, args: string[], options?: { env?: NodeJS.ProcessEnv; maxBuffer?: number }, ) => Promise<{ stdout: string | Buffer; stderr: string | Buffer }>; interface MinioSyncArgs { prefix: string; bucket?: string; namespace_base?: string; mode?: 'append' | 'replace'; max_objects?: number; max_chars_per_object?: number; force?: boolean; } export interface MinioSyncDeps { execRunner?: ExecRunner; now?: () => Date; } function parseListedObjectKeys(stdout: string): string[] { const keys: string[] = []; const lines = stdout.split('\n').map((line) => line.trim()).filter(Boolean); for (const line of lines) { try { const parsed = JSON.parse(line) as Record; const key = typeof parsed.key === 'string' ? parsed.key : typeof parsed.name === 'string' ? parsed.name : null; const type = typeof parsed.type === 'string' ? parsed.type : null; if (!key) {continue;} if (type && type !== 'file') {continue;} if (key.endsWith('/')) {continue;} if (key === '.keep' || key.endsWith('/.keep')) {continue;} keys.push(key); } catch { continue; } } return keys; } function isBenignMissingObjectError(error: unknown): boolean { const message = error instanceof Error ? error.message : String(error); const lowered = message.toLowerCase(); return lowered.includes('object does not exist') || lowered.includes('unable to read from') || lowered.includes('no such key') || lowered.includes('not found'); } function normalizeNamespaceSegment(value: string): string { return value .replace(/\.[^.]+$/, '') .replace(/[^a-zA-Z0-9/_-]/g, '_') .replace(/\/+/g, '/') .replace(/^\/+|\/+$/g, ''); } export const minioSyncInternals = { isBenignMissingObjectError, parseListedObjectKeys, normalizeNamespaceSegment, }; export function createMinioSyncTool(config: BackupConfig, store: MemoryStore, deps?: MinioSyncDeps): Tool { return { name: 'minio.sync', description: 'Sync text-like objects from a MinIO prefix into memory namespaces.', inputSchema: { type: 'object', properties: { prefix: { type: 'string', description: 'MinIO object prefix to sync recursively (for example: "knowledge/")', }, bucket: { type: 'string', description: 'Optional bucket override. Defaults to backup.minio.bucket.', }, namespace_base: { type: 'string', description: 'Base memory namespace. Per-object namespaces are nested under this path.', }, mode: { type: 'string', enum: ['append', 'replace'], description: 'Write mode per object namespace. Default: append.', }, max_objects: { type: 'number', description: 'Maximum number of objects to ingest per run. Default: 20.', }, max_chars_per_object: { type: 'number', description: 'Maximum characters ingested per object. Default: 8000.', }, force: { type: 'boolean', description: 'Allow non-text-like files/extensions.', }, }, required: ['prefix'], }, execute: async (rawArgs: unknown): Promise => { const args = rawArgs as MinioSyncArgs; const minio = config.minio; const prefix = args.prefix?.trim(); const bucket = args.bucket ?? minio.bucket; const namespaceBase = args.namespace_base ?? 'global/knowledge/minio'; const mode = args.mode ?? 'append'; const maxObjects = Math.max(1, Math.floor(args.max_objects ?? 20)); const maxChars = Math.max(1, Math.floor(args.max_chars_per_object ?? 8_000)); const force = args.force ?? false; if (!prefix) { return { success: false, output: '', error: 'prefix is required' }; } if (!minio.enabled) { return { success: false, output: '', error: 'MinIO sync requires backup.minio.enabled=true' }; } if (!minio.endpoint || !minio.access_key || !minio.secret_key || !bucket) { return { success: false, output: '', error: 'Missing MinIO credentials in backup.minio (endpoint/access_key/secret_key/bucket)', }; } const alias = 'flynnsync'; const host = backupInternals.buildMinioHost({ endpoint: minio.endpoint, accessKey: minio.access_key, secretKey: minio.secret_key, secure: minio.secure, }); const env = { ...process.env, [`MC_HOST_${alias}`]: host }; const mcPath = backupInternals.resolveMcPath(minio.mc_path); const runner = deps?.execRunner ?? (async (file: string, cmdArgs: string[], options?: { env?: NodeJS.ProcessEnv; maxBuffer?: number }) => { return execFileAsync(file, cmdArgs, options); }); try { const basePath = `${alias}/${bucket}/${prefix}`; const { stdout: listed } = await runner(mcPath, ['ls', '--json', '--recursive', basePath], { env, maxBuffer: 20 * 1024 * 1024, }); const keys = parseListedObjectKeys(typeof listed === 'string' ? listed : listed.toString('utf-8')); if (keys.length === 0) { return { success: true, output: `No objects found under prefix minio://${bucket}/${prefix}`, }; } const selected = keys.slice(0, maxObjects); let imported = 0; let skipped = 0; const importedNamespaces: string[] = []; for (const key of selected) { if (!force && !minioIngestInternals.isLikelyTextObject(key)) { skipped++; continue; } let text = ''; try { const remotePath = `${alias}/${bucket}/${key}`; text = await minioIngestInternals.readObjectText(runner, mcPath, remotePath, key, env); } catch (error) { if (isBenignMissingObjectError(error)) { skipped++; continue; } throw error; } if (!force && !minioIngestInternals.isExtractableBinaryObject(key) && !minioIngestInternals.isLikelyText(text)) { skipped++; continue; } const trimmed = text.trim(); if (!trimmed) { skipped++; continue; } const clipped = trimmed.length > maxChars ? `${trimmed.slice(0, maxChars)}\n\n[truncated to ${maxChars} chars]` : trimmed; const importedAt = (deps?.now ? deps.now() : new Date()).toISOString(); const objectNamespace = normalizeNamespaceSegment(key); const targetNamespace = `${namespaceBase}/${objectNamespace}`; const payload = `## MinIO Sync Import\nsource: minio://${bucket}/${key}\nimported_at: ${importedAt}\n\n${clipped}`; store.write(targetNamespace, payload, mode); imported++; importedNamespaces.push(targetNamespace); } return { success: true, output: [ 'MinIO sync completed.', `Prefix: minio://${bucket}/${prefix}`, `Scanned: ${selected.length} object(s)`, `Imported: ${imported}`, `Skipped: ${skipped}`, importedNamespaces.length > 0 ? `Namespaces:\n- ${importedNamespaces.join('\n- ')}` : 'Namespaces:\n- (none)', ].join('\n'), }; } catch (error) { return { success: false, output: '', error: backupInternals.formatMinioCliError(error, mcPath), }; } }, }; }