Files
porthole/apps/web/app/api/imports/handlers.ts
2026-02-01 03:08:15 -08:00

221 lines
5.9 KiB
TypeScript

import { getAdminToken, isAdminRequest } from "@tline/config";
import { getDb } from "@tline/db";
import { z } from "zod";
import type { ReadableStream as NodeReadableStream } from "node:stream/web";
const ADMIN_HEADER = "X-Porthole-Admin-Token";
const createImportBodySchema = z
.object({
type: z.enum(["upload", "minio_scan"]).default("upload"),
})
.strict();
const uploadParamsSchema = z.object({ id: z.string().uuid() });
const scanParamsSchema = z.object({ id: z.string().uuid() });
const scanBodySchema = z
.object({
bucket: z.string().min(1).optional(),
prefix: z.string().min(1).default("originals/"),
})
.strict();
const contentTypeMediaMap: Array<{
match: (ct: string) => boolean;
mediaType: "image" | "video";
}> = [
{ match: (ct) => ct.startsWith("image/"), mediaType: "image" },
{ match: (ct) => ct.startsWith("video/"), mediaType: "video" },
];
function inferMediaTypeFromContentType(ct: string): "image" | "video" | null {
const found = contentTypeMediaMap.find((m) => m.match(ct));
return found?.mediaType ?? null;
}
function inferExtFromContentType(ct: string): string {
const parts = ct.split("/");
const ext = parts[1] ?? "bin";
return ext.replace(/[^a-zA-Z0-9]+/g, "").toLowerCase() || "bin";
}
export function getAdminOk(headers: Headers) {
const headerToken = headers.get(ADMIN_HEADER);
return isAdminRequest({ adminToken: getAdminToken() }, { headerToken });
}
export async function handleCreateImport(input: {
adminOk: boolean;
body: unknown;
}): Promise<{ status: number; body: unknown }> {
if (!input.adminOk) {
return { status: 401, body: { error: "admin_required" } };
}
const body = createImportBodySchema.parse(input.body ?? {});
const db = getDb();
const rows = await db<
{
id: string;
type: "upload" | "minio_scan";
status: string;
created_at: string;
}[]
>`
insert into imports (type, status)
values (${body.type}, 'new')
returning id, type, status, created_at
`;
const created = rows[0];
if (!created) {
return { status: 500, body: { error: "insert_failed" } };
}
return { status: 200, body: created };
}
export async function handleUploadImport(input: {
adminOk: boolean;
params: { id: string };
request: Request;
}): Promise<{ status: number; body: unknown }> {
if (!input.adminOk) {
return { status: 401, body: { error: "admin_required" } };
}
const paramsParsed = uploadParamsSchema.safeParse(input.params);
if (!paramsParsed.success) {
return {
status: 400,
body: { error: "invalid_params", issues: paramsParsed.error.issues },
};
}
const params = paramsParsed.data;
const { randomUUID } = await import("crypto");
const { Readable } = await import("stream");
const { PutObjectCommand } = await import("@aws-sdk/client-s3");
const { getMinioBucket, getMinioInternalClient } = await import("@tline/minio");
const { enqueueProcessAsset } = await import("@tline/queue");
const contentType = input.request.headers.get("content-type") ?? "application/octet-stream";
const mediaType = inferMediaTypeFromContentType(contentType);
if (!mediaType) {
return { status: 400, body: { error: "unsupported_content_type", contentType } };
}
const bucket = getMinioBucket();
const ext = inferExtFromContentType(contentType);
const objectId = randomUUID();
const key = `staging/${params.id}/${objectId}.${ext}`;
const db = getDb();
const [imp] = await db<{ id: string }[]>`
select id
from imports
where id = ${params.id}
limit 1
`;
if (!imp) {
return { status: 404, body: { error: "import_not_found" } };
}
if (!input.request.body) {
return { status: 400, body: { error: "missing_body" } };
}
const s3 = getMinioInternalClient();
const bodyStream = Readable.fromWeb(input.request.body as unknown as NodeReadableStream);
await s3.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: bodyStream,
ContentType: contentType,
}),
);
const rows = await db<
{
id: string;
status: "new" | "processing" | "ready" | "failed";
}[]
>`
insert into assets (bucket, media_type, mime_type, source_key, active_key)
values (${bucket}, ${mediaType}, ${contentType}, ${key}, ${key})
on conflict (bucket, source_key)
do update set active_key = excluded.active_key
returning id, status
`;
const asset = rows[0];
if (!asset) {
return { status: 500, body: { error: "asset_insert_failed" } };
}
await enqueueProcessAsset({ assetId: asset.id });
return {
status: 200,
body: { ok: true, importId: imp.id, assetId: asset.id, bucket, key },
};
}
export async function handleScanMinioImport(input: {
adminOk: boolean;
params: { id: string };
body: unknown;
}): Promise<{ status: number; body: unknown }> {
if (!input.adminOk) {
return { status: 401, body: { error: "admin_required" } };
}
const paramsParsed = scanParamsSchema.safeParse(input.params);
if (!paramsParsed.success) {
return {
status: 400,
body: { error: "invalid_params", issues: paramsParsed.error.issues },
};
}
const params = paramsParsed.data;
const body = scanBodySchema.parse(input.body ?? {});
const { getMinioBucket } = await import("@tline/minio");
const { enqueueScanMinioPrefix } = await import("@tline/queue");
const bucket = body.bucket ?? getMinioBucket();
const db = getDb();
const rows = await db<{ id: string }[]>`
select id
from imports
where id = ${params.id}
limit 1
`;
const imp = rows[0];
if (!imp) {
return { status: 404, body: { error: "not_found" } };
}
await enqueueScanMinioPrefix({
importId: imp.id,
bucket,
prefix: body.prefix,
});
await db`
update imports
set status = 'queued'
where id = ${imp.id}
`;
return {
status: 200,
body: { ok: true, importId: imp.id, bucket, prefix: body.prefix },
};
}