import { getAdminToken, isAdminRequest } from "@tline/config"; import { getDb } from "@tline/db"; import { z } from "zod"; import type { ReadableStream as NodeReadableStream } from "node:stream/web"; const ADMIN_HEADER = "X-Porthole-Admin-Token"; const createImportBodySchema = z .object({ type: z.enum(["upload", "minio_scan"]).default("upload"), }) .strict(); const uploadParamsSchema = z.object({ id: z.string().uuid() }); const scanParamsSchema = z.object({ id: z.string().uuid() }); const scanBodySchema = z .object({ bucket: z.string().min(1).optional(), prefix: z.string().min(1).default("originals/"), }) .strict(); const contentTypeMediaMap: Array<{ match: (ct: string) => boolean; mediaType: "image" | "video"; }> = [ { match: (ct) => ct.startsWith("image/"), mediaType: "image" }, { match: (ct) => ct.startsWith("video/"), mediaType: "video" }, ]; function inferMediaTypeFromContentType(ct: string): "image" | "video" | null { const found = contentTypeMediaMap.find((m) => m.match(ct)); return found?.mediaType ?? null; } function inferExtFromContentType(ct: string): string { const parts = ct.split("/"); const ext = parts[1] ?? "bin"; return ext.replace(/[^a-zA-Z0-9]+/g, "").toLowerCase() || "bin"; } export function getAdminOk(headers: Headers) { const headerToken = headers.get(ADMIN_HEADER); return isAdminRequest({ adminToken: getAdminToken() }, { headerToken }); } export async function handleCreateImport(input: { adminOk: boolean; body: unknown; }): Promise<{ status: number; body: unknown }> { if (!input.adminOk) { return { status: 401, body: { error: "admin_required" } }; } const body = createImportBodySchema.parse(input.body ?? {}); const db = getDb(); const rows = await db< { id: string; type: "upload" | "minio_scan"; status: string; created_at: string; }[] >` insert into imports (type, status) values (${body.type}, 'new') returning id, type, status, created_at `; const created = rows[0]; if (!created) { return { status: 500, body: { error: "insert_failed" } }; } return { status: 200, body: created }; } export async function handleUploadImport(input: { adminOk: boolean; params: { id: string }; request: Request; }): Promise<{ status: number; body: unknown }> { if (!input.adminOk) { return { status: 401, body: { error: "admin_required" } }; } const paramsParsed = uploadParamsSchema.safeParse(input.params); if (!paramsParsed.success) { return { status: 400, body: { error: "invalid_params", issues: paramsParsed.error.issues }, }; } const params = paramsParsed.data; const { randomUUID } = await import("crypto"); const { Readable } = await import("stream"); const { PutObjectCommand } = await import("@aws-sdk/client-s3"); const { getMinioBucket, getMinioInternalClient } = await import("@tline/minio"); const { enqueueProcessAsset } = await import("@tline/queue"); const contentType = input.request.headers.get("content-type") ?? "application/octet-stream"; const mediaType = inferMediaTypeFromContentType(contentType); if (!mediaType) { return { status: 400, body: { error: "unsupported_content_type", contentType } }; } const bucket = getMinioBucket(); const ext = inferExtFromContentType(contentType); const objectId = randomUUID(); const key = `staging/${params.id}/${objectId}.${ext}`; const db = getDb(); const [imp] = await db<{ id: string }[]>` select id from imports where id = ${params.id} limit 1 `; if (!imp) { return { status: 404, body: { error: "import_not_found" } }; } if (!input.request.body) { return { status: 400, body: { error: "missing_body" } }; } const s3 = getMinioInternalClient(); const bodyStream = Readable.fromWeb(input.request.body as unknown as NodeReadableStream); await s3.send( new PutObjectCommand({ Bucket: bucket, Key: key, Body: bodyStream, ContentType: contentType, }), ); const rows = await db< { id: string; status: "new" | "processing" | "ready" | "failed"; }[] >` insert into assets (bucket, media_type, mime_type, source_key, active_key) values (${bucket}, ${mediaType}, ${contentType}, ${key}, ${key}) on conflict (bucket, source_key) do update set active_key = excluded.active_key returning id, status `; const asset = rows[0]; if (!asset) { return { status: 500, body: { error: "asset_insert_failed" } }; } await enqueueProcessAsset({ assetId: asset.id }); return { status: 200, body: { ok: true, importId: imp.id, assetId: asset.id, bucket, key }, }; } export async function handleScanMinioImport(input: { adminOk: boolean; params: { id: string }; body: unknown; }): Promise<{ status: number; body: unknown }> { if (!input.adminOk) { return { status: 401, body: { error: "admin_required" } }; } const paramsParsed = scanParamsSchema.safeParse(input.params); if (!paramsParsed.success) { return { status: 400, body: { error: "invalid_params", issues: paramsParsed.error.issues }, }; } const params = paramsParsed.data; const body = scanBodySchema.parse(input.body ?? {}); const { getMinioBucket } = await import("@tline/minio"); const { enqueueScanMinioPrefix } = await import("@tline/queue"); const bucket = body.bucket ?? getMinioBucket(); const db = getDb(); const rows = await db<{ id: string }[]>` select id from imports where id = ${params.id} limit 1 `; const imp = rows[0]; if (!imp) { return { status: 404, body: { error: "not_found" } }; } await enqueueScanMinioPrefix({ importId: imp.id, bucket, prefix: body.prefix, }); await db` update imports set status = 'queued' where id = ${imp.id} `; return { status: 200, body: { ok: true, importId: imp.id, bucket, prefix: body.prefix }, }; }