221 lines
5.9 KiB
TypeScript
221 lines
5.9 KiB
TypeScript
import { getAdminToken, isAdminRequest } from "@tline/config";
|
|
import { getDb } from "@tline/db";
|
|
|
|
import { z } from "zod";
|
|
import type { ReadableStream as NodeReadableStream } from "node:stream/web";
|
|
|
|
const ADMIN_HEADER = "X-Porthole-Admin-Token";
|
|
|
|
const createImportBodySchema = z
|
|
.object({
|
|
type: z.enum(["upload", "minio_scan"]).default("upload"),
|
|
})
|
|
.strict();
|
|
|
|
const uploadParamsSchema = z.object({ id: z.string().uuid() });
|
|
|
|
const scanParamsSchema = z.object({ id: z.string().uuid() });
|
|
const scanBodySchema = z
|
|
.object({
|
|
bucket: z.string().min(1).optional(),
|
|
prefix: z.string().min(1).default("originals/"),
|
|
})
|
|
.strict();
|
|
|
|
const contentTypeMediaMap: Array<{
|
|
match: (ct: string) => boolean;
|
|
mediaType: "image" | "video";
|
|
}> = [
|
|
{ match: (ct) => ct.startsWith("image/"), mediaType: "image" },
|
|
{ match: (ct) => ct.startsWith("video/"), mediaType: "video" },
|
|
];
|
|
|
|
function inferMediaTypeFromContentType(ct: string): "image" | "video" | null {
|
|
const found = contentTypeMediaMap.find((m) => m.match(ct));
|
|
return found?.mediaType ?? null;
|
|
}
|
|
|
|
function inferExtFromContentType(ct: string): string {
|
|
const parts = ct.split("/");
|
|
const ext = parts[1] ?? "bin";
|
|
return ext.replace(/[^a-zA-Z0-9]+/g, "").toLowerCase() || "bin";
|
|
}
|
|
|
|
export function getAdminOk(headers: Headers) {
|
|
const headerToken = headers.get(ADMIN_HEADER);
|
|
return isAdminRequest({ adminToken: getAdminToken() }, { headerToken });
|
|
}
|
|
|
|
export async function handleCreateImport(input: {
|
|
adminOk: boolean;
|
|
body: unknown;
|
|
}): Promise<{ status: number; body: unknown }> {
|
|
if (!input.adminOk) {
|
|
return { status: 401, body: { error: "admin_required" } };
|
|
}
|
|
|
|
const body = createImportBodySchema.parse(input.body ?? {});
|
|
const db = getDb();
|
|
const rows = await db<
|
|
{
|
|
id: string;
|
|
type: "upload" | "minio_scan";
|
|
status: string;
|
|
created_at: string;
|
|
}[]
|
|
>`
|
|
insert into imports (type, status)
|
|
values (${body.type}, 'new')
|
|
returning id, type, status, created_at
|
|
`;
|
|
|
|
const created = rows[0];
|
|
if (!created) {
|
|
return { status: 500, body: { error: "insert_failed" } };
|
|
}
|
|
|
|
return { status: 200, body: created };
|
|
}
|
|
|
|
export async function handleUploadImport(input: {
|
|
adminOk: boolean;
|
|
params: { id: string };
|
|
request: Request;
|
|
}): Promise<{ status: number; body: unknown }> {
|
|
if (!input.adminOk) {
|
|
return { status: 401, body: { error: "admin_required" } };
|
|
}
|
|
|
|
const paramsParsed = uploadParamsSchema.safeParse(input.params);
|
|
if (!paramsParsed.success) {
|
|
return {
|
|
status: 400,
|
|
body: { error: "invalid_params", issues: paramsParsed.error.issues },
|
|
};
|
|
}
|
|
const params = paramsParsed.data;
|
|
|
|
const { randomUUID } = await import("crypto");
|
|
const { Readable } = await import("stream");
|
|
const { PutObjectCommand } = await import("@aws-sdk/client-s3");
|
|
const { getMinioBucket, getMinioInternalClient } = await import("@tline/minio");
|
|
const { enqueueProcessAsset } = await import("@tline/queue");
|
|
|
|
const contentType = input.request.headers.get("content-type") ?? "application/octet-stream";
|
|
const mediaType = inferMediaTypeFromContentType(contentType);
|
|
if (!mediaType) {
|
|
return { status: 400, body: { error: "unsupported_content_type", contentType } };
|
|
}
|
|
|
|
const bucket = getMinioBucket();
|
|
const ext = inferExtFromContentType(contentType);
|
|
const objectId = randomUUID();
|
|
const key = `staging/${params.id}/${objectId}.${ext}`;
|
|
|
|
const db = getDb();
|
|
const [imp] = await db<{ id: string }[]>`
|
|
select id
|
|
from imports
|
|
where id = ${params.id}
|
|
limit 1
|
|
`;
|
|
|
|
if (!imp) {
|
|
return { status: 404, body: { error: "import_not_found" } };
|
|
}
|
|
|
|
if (!input.request.body) {
|
|
return { status: 400, body: { error: "missing_body" } };
|
|
}
|
|
|
|
const s3 = getMinioInternalClient();
|
|
const bodyStream = Readable.fromWeb(input.request.body as unknown as NodeReadableStream);
|
|
await s3.send(
|
|
new PutObjectCommand({
|
|
Bucket: bucket,
|
|
Key: key,
|
|
Body: bodyStream,
|
|
ContentType: contentType,
|
|
}),
|
|
);
|
|
|
|
const rows = await db<
|
|
{
|
|
id: string;
|
|
status: "new" | "processing" | "ready" | "failed";
|
|
}[]
|
|
>`
|
|
insert into assets (bucket, media_type, mime_type, source_key, active_key)
|
|
values (${bucket}, ${mediaType}, ${contentType}, ${key}, ${key})
|
|
on conflict (bucket, source_key)
|
|
do update set active_key = excluded.active_key
|
|
returning id, status
|
|
`;
|
|
|
|
const asset = rows[0];
|
|
if (!asset) {
|
|
return { status: 500, body: { error: "asset_insert_failed" } };
|
|
}
|
|
|
|
await enqueueProcessAsset({ assetId: asset.id });
|
|
|
|
return {
|
|
status: 200,
|
|
body: { ok: true, importId: imp.id, assetId: asset.id, bucket, key },
|
|
};
|
|
}
|
|
|
|
export async function handleScanMinioImport(input: {
|
|
adminOk: boolean;
|
|
params: { id: string };
|
|
body: unknown;
|
|
}): Promise<{ status: number; body: unknown }> {
|
|
if (!input.adminOk) {
|
|
return { status: 401, body: { error: "admin_required" } };
|
|
}
|
|
|
|
const paramsParsed = scanParamsSchema.safeParse(input.params);
|
|
if (!paramsParsed.success) {
|
|
return {
|
|
status: 400,
|
|
body: { error: "invalid_params", issues: paramsParsed.error.issues },
|
|
};
|
|
}
|
|
const params = paramsParsed.data;
|
|
const body = scanBodySchema.parse(input.body ?? {});
|
|
|
|
const { getMinioBucket } = await import("@tline/minio");
|
|
const { enqueueScanMinioPrefix } = await import("@tline/queue");
|
|
|
|
const bucket = body.bucket ?? getMinioBucket();
|
|
const db = getDb();
|
|
const rows = await db<{ id: string }[]>`
|
|
select id
|
|
from imports
|
|
where id = ${params.id}
|
|
limit 1
|
|
`;
|
|
|
|
const imp = rows[0];
|
|
if (!imp) {
|
|
return { status: 404, body: { error: "not_found" } };
|
|
}
|
|
|
|
await enqueueScanMinioPrefix({
|
|
importId: imp.id,
|
|
bucket,
|
|
prefix: body.prefix,
|
|
});
|
|
|
|
await db`
|
|
update imports
|
|
set status = 'queued'
|
|
where id = ${imp.id}
|
|
`;
|
|
|
|
return {
|
|
status: 200,
|
|
body: { ok: true, importId: imp.id, bucket, prefix: body.prefix },
|
|
};
|
|
}
|