216 lines
7.2 KiB
TypeScript
216 lines
7.2 KiB
TypeScript
import { promisify } from 'node:util';
|
|
import { execFile } from 'node:child_process';
|
|
import type { BackupConfig } from '../../config/schema.js';
|
|
import type { MemoryStore } from '../../memory/store.js';
|
|
import type { Tool, ToolResult } from '../types.js';
|
|
import { backupInternals } from '../../backup/index.js';
|
|
import { minioIngestInternals } from './minio-ingest.js';
|
|
|
|
const execFileAsync = promisify(execFile);
|
|
|
|
type ExecRunner = (
|
|
file: string,
|
|
args: string[],
|
|
options?: { env?: NodeJS.ProcessEnv; maxBuffer?: number },
|
|
) => Promise<{ stdout: string | Buffer; stderr: string | Buffer }>;
|
|
|
|
interface MinioSyncArgs {
|
|
prefix: string;
|
|
bucket?: string;
|
|
namespace_base?: string;
|
|
mode?: 'append' | 'replace';
|
|
max_objects?: number;
|
|
max_chars_per_object?: number;
|
|
force?: boolean;
|
|
}
|
|
|
|
export interface MinioSyncDeps {
|
|
execRunner?: ExecRunner;
|
|
now?: () => Date;
|
|
}
|
|
|
|
function parseListedObjectKeys(stdout: string): string[] {
|
|
const keys: string[] = [];
|
|
const lines = stdout.split('\n').map((line) => line.trim()).filter(Boolean);
|
|
for (const line of lines) {
|
|
try {
|
|
const parsed = JSON.parse(line) as Record<string, unknown>;
|
|
const key = typeof parsed.key === 'string'
|
|
? parsed.key
|
|
: typeof parsed.name === 'string'
|
|
? parsed.name
|
|
: null;
|
|
const type = typeof parsed.type === 'string' ? parsed.type : null;
|
|
if (!key) {continue;}
|
|
if (type && type !== 'file') {continue;}
|
|
if (key.endsWith('/')) {continue;}
|
|
keys.push(key);
|
|
} catch {
|
|
continue;
|
|
}
|
|
}
|
|
return keys;
|
|
}
|
|
|
|
function normalizeNamespaceSegment(value: string): string {
|
|
return value
|
|
.replace(/\.[^.]+$/, '')
|
|
.replace(/[^a-zA-Z0-9/_-]/g, '_')
|
|
.replace(/\/+/g, '/')
|
|
.replace(/^\/+|\/+$/g, '');
|
|
}
|
|
|
|
export const minioSyncInternals = {
|
|
parseListedObjectKeys,
|
|
normalizeNamespaceSegment,
|
|
};
|
|
|
|
export function createMinioSyncTool(config: BackupConfig, store: MemoryStore, deps?: MinioSyncDeps): Tool {
|
|
return {
|
|
name: 'minio.sync',
|
|
description: 'Sync text-like objects from a MinIO prefix into memory namespaces.',
|
|
inputSchema: {
|
|
type: 'object',
|
|
properties: {
|
|
prefix: {
|
|
type: 'string',
|
|
description: 'MinIO object prefix to sync recursively (for example: "knowledge/")',
|
|
},
|
|
bucket: {
|
|
type: 'string',
|
|
description: 'Optional bucket override. Defaults to backup.minio.bucket.',
|
|
},
|
|
namespace_base: {
|
|
type: 'string',
|
|
description: 'Base memory namespace. Per-object namespaces are nested under this path.',
|
|
},
|
|
mode: {
|
|
type: 'string',
|
|
enum: ['append', 'replace'],
|
|
description: 'Write mode per object namespace. Default: append.',
|
|
},
|
|
max_objects: {
|
|
type: 'number',
|
|
description: 'Maximum number of objects to ingest per run. Default: 20.',
|
|
},
|
|
max_chars_per_object: {
|
|
type: 'number',
|
|
description: 'Maximum characters ingested per object. Default: 8000.',
|
|
},
|
|
force: {
|
|
type: 'boolean',
|
|
description: 'Allow non-text-like files/extensions.',
|
|
},
|
|
},
|
|
required: ['prefix'],
|
|
},
|
|
execute: async (rawArgs: unknown): Promise<ToolResult> => {
|
|
const args = rawArgs as MinioSyncArgs;
|
|
const minio = config.minio;
|
|
const prefix = args.prefix?.trim();
|
|
const bucket = args.bucket ?? minio.bucket;
|
|
const namespaceBase = args.namespace_base ?? 'global/knowledge/minio';
|
|
const mode = args.mode ?? 'append';
|
|
const maxObjects = Math.max(1, Math.floor(args.max_objects ?? 20));
|
|
const maxChars = Math.max(1, Math.floor(args.max_chars_per_object ?? 8_000));
|
|
const force = args.force ?? false;
|
|
|
|
if (!prefix) {
|
|
return { success: false, output: '', error: 'prefix is required' };
|
|
}
|
|
if (!minio.enabled) {
|
|
return { success: false, output: '', error: 'MinIO sync requires backup.minio.enabled=true' };
|
|
}
|
|
if (!minio.endpoint || !minio.access_key || !minio.secret_key || !bucket) {
|
|
return {
|
|
success: false,
|
|
output: '',
|
|
error: 'Missing MinIO credentials in backup.minio (endpoint/access_key/secret_key/bucket)',
|
|
};
|
|
}
|
|
|
|
const alias = 'flynnsync';
|
|
const host = backupInternals.buildMinioHost({
|
|
endpoint: minio.endpoint,
|
|
accessKey: minio.access_key,
|
|
secretKey: minio.secret_key,
|
|
secure: minio.secure,
|
|
});
|
|
const env = { ...process.env, [`MC_HOST_${alias}`]: host };
|
|
const runner = deps?.execRunner ?? (async (file: string, cmdArgs: string[], options?: { env?: NodeJS.ProcessEnv; maxBuffer?: number }) => {
|
|
return execFileAsync(file, cmdArgs, options);
|
|
});
|
|
|
|
try {
|
|
const basePath = `${alias}/${bucket}/${prefix}`;
|
|
const { stdout: listed } = await runner('mc', ['ls', '--json', '--recursive', basePath], {
|
|
env,
|
|
maxBuffer: 20 * 1024 * 1024,
|
|
});
|
|
|
|
const keys = parseListedObjectKeys(typeof listed === 'string' ? listed : listed.toString('utf-8'));
|
|
if (keys.length === 0) {
|
|
return {
|
|
success: true,
|
|
output: `No objects found under prefix minio://${bucket}/${prefix}`,
|
|
};
|
|
}
|
|
|
|
const selected = keys.slice(0, maxObjects);
|
|
let imported = 0;
|
|
let skipped = 0;
|
|
const importedNamespaces: string[] = [];
|
|
|
|
for (const key of selected) {
|
|
if (!force && !minioIngestInternals.isLikelyTextObject(key)) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
|
|
const remotePath = `${alias}/${bucket}/${key}`;
|
|
const text = await minioIngestInternals.readObjectText(runner, remotePath, key, env);
|
|
|
|
if (!force && !minioIngestInternals.isExtractableBinaryObject(key) && !minioIngestInternals.isLikelyText(text)) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
const trimmed = text.trim();
|
|
if (!trimmed) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
|
|
const clipped = trimmed.length > maxChars
|
|
? `${trimmed.slice(0, maxChars)}\n\n[truncated to ${maxChars} chars]`
|
|
: trimmed;
|
|
const importedAt = (deps?.now ? deps.now() : new Date()).toISOString();
|
|
const objectNamespace = normalizeNamespaceSegment(key);
|
|
const targetNamespace = `${namespaceBase}/${objectNamespace}`;
|
|
const payload = `## MinIO Sync Import\nsource: minio://${bucket}/${key}\nimported_at: ${importedAt}\n\n${clipped}`;
|
|
store.write(targetNamespace, payload, mode);
|
|
imported++;
|
|
importedNamespaces.push(targetNamespace);
|
|
}
|
|
|
|
return {
|
|
success: true,
|
|
output: [
|
|
`MinIO sync completed.`,
|
|
`Prefix: minio://${bucket}/${prefix}`,
|
|
`Scanned: ${selected.length} object(s)`,
|
|
`Imported: ${imported}`,
|
|
`Skipped: ${skipped}`,
|
|
importedNamespaces.length > 0 ? `Namespaces:\n- ${importedNamespaces.join('\n- ')}` : 'Namespaces:\n- (none)',
|
|
].join('\n'),
|
|
};
|
|
} catch (error) {
|
|
return {
|
|
success: false,
|
|
output: '',
|
|
error: error instanceof Error ? error.message : String(error),
|
|
};
|
|
}
|
|
},
|
|
};
|
|
}
|