feat: require admin token for ingestion endpoints

This commit is contained in:
William Valentin
2026-02-01 03:08:15 -08:00
parent 50aa6008e3
commit 7c8406c7cc
6 changed files with 249 additions and 185 deletions

View File

@@ -1,66 +1,17 @@
import { z } from "zod";
import { getDb } from "@tline/db";
import { getMinioBucket } from "@tline/minio";
import { enqueueScanMinioPrefix } from "@tline/queue";
import { getAdminOk, handleScanMinioImport } from "../handlers";
export const runtime = "nodejs";
const paramsSchema = z.object({ id: z.string().uuid() });
const bodySchema = z
.object({
bucket: z.string().min(1).optional(),
prefix: z.string().min(1).default("originals/"),
})
.strict();
export async function POST(
request: Request,
context: { params: Promise<{ id: string }> },
): Promise<Response> {
const rawParams = await context.params;
const paramsParsed = paramsSchema.safeParse(rawParams);
if (!paramsParsed.success) {
return Response.json(
{ error: "invalid_params", issues: paramsParsed.error.issues },
{ status: 400 },
);
}
const params = paramsParsed.data;
const bodyJson = await request.json().catch(() => ({}));
const body = bodySchema.parse(bodyJson);
const bucket = body.bucket ?? getMinioBucket();
const db = getDb();
const rows = await db<
{
id: string;
}[]
>`
select id
from imports
where id = ${params.id}
limit 1
`;
const imp = rows[0];
if (!imp) {
return Response.json({ error: "not_found" }, { status: 404 });
}
await enqueueScanMinioPrefix({
importId: imp.id,
bucket,
prefix: body.prefix,
const res = await handleScanMinioImport({
adminOk: getAdminOk(request.headers),
params: rawParams,
body: bodyJson,
});
await db`
update imports
set status = 'queued'
where id = ${imp.id}
`;
return Response.json({ ok: true, importId: imp.id, bucket, prefix: body.prefix });
return Response.json(res.body, { status: res.status });
}

View File

@@ -1,108 +1,16 @@
import { randomUUID } from "crypto";
import { Readable } from "stream";
import type { ReadableStream as NodeReadableStream } from "node:stream/web";
import { PutObjectCommand } from "@aws-sdk/client-s3";
import { z } from "zod";
import { getDb } from "@tline/db";
import { getMinioBucket, getMinioInternalClient } from "@tline/minio";
import { enqueueProcessAsset } from "@tline/queue";
import { getAdminOk, handleUploadImport } from "../handlers";
export const runtime = "nodejs";
const paramsSchema = z.object({ id: z.string().uuid() });
const contentTypeMediaMap: Array<{
match: (ct: string) => boolean;
mediaType: "image" | "video";
}> = [
{ match: (ct) => ct.startsWith("image/"), mediaType: "image" },
{ match: (ct) => ct.startsWith("video/"), mediaType: "video" },
];
function inferMediaTypeFromContentType(ct: string): "image" | "video" | null {
const found = contentTypeMediaMap.find((m) => m.match(ct));
return found?.mediaType ?? null;
}
function inferExtFromContentType(ct: string): string {
const parts = ct.split("/");
const ext = parts[1] ?? "bin";
return ext.replace(/[^a-zA-Z0-9]+/g, "").toLowerCase() || "bin";
}
export async function POST(
request: Request,
context: { params: Promise<{ id: string }> },
): Promise<Response> {
const rawParams = await context.params;
const paramsParsed = paramsSchema.safeParse(rawParams);
if (!paramsParsed.success) {
return Response.json(
{ error: "invalid_params", issues: paramsParsed.error.issues },
{ status: 400 },
);
}
const params = paramsParsed.data;
const contentType = request.headers.get("content-type") ?? "application/octet-stream";
const mediaType = inferMediaTypeFromContentType(contentType);
if (!mediaType) {
return Response.json({ error: "unsupported_content_type", contentType }, { status: 400 });
}
const bucket = getMinioBucket();
const ext = inferExtFromContentType(contentType);
const objectId = randomUUID();
const key = `staging/${params.id}/${objectId}.${ext}`;
const db = getDb();
const [imp] = await db<{ id: string }[]>`
select id
from imports
where id = ${params.id}
limit 1
`;
if (!imp) {
return Response.json({ error: "import_not_found" }, { status: 404 });
}
if (!request.body) {
return Response.json({ error: "missing_body" }, { status: 400 });
}
const s3 = getMinioInternalClient();
const bodyStream = Readable.fromWeb(request.body as unknown as NodeReadableStream);
await s3.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: bodyStream,
ContentType: contentType,
}),
);
const rows = await db<
{
id: string;
status: "new" | "processing" | "ready" | "failed";
}[]
>`
insert into assets (bucket, media_type, mime_type, source_key, active_key)
values (${bucket}, ${mediaType}, ${contentType}, ${key}, ${key})
on conflict (bucket, source_key)
do update set active_key = excluded.active_key
returning id, status
`;
const asset = rows[0];
if (!asset) {
return Response.json({ error: "asset_insert_failed" }, { status: 500 });
}
await enqueueProcessAsset({ assetId: asset.id });
return Response.json({ ok: true, importId: imp.id, assetId: asset.id, bucket, key });
const res = await handleUploadImport({
adminOk: getAdminOk(request.headers),
params: rawParams,
request,
});
return Response.json(res.body, { status: res.status });
}

View File

@@ -0,0 +1,220 @@
import { getAdminToken, isAdminRequest } from "@tline/config";
import { getDb } from "@tline/db";
import { z } from "zod";
import type { ReadableStream as NodeReadableStream } from "node:stream/web";
const ADMIN_HEADER = "X-Porthole-Admin-Token";
const createImportBodySchema = z
.object({
type: z.enum(["upload", "minio_scan"]).default("upload"),
})
.strict();
const uploadParamsSchema = z.object({ id: z.string().uuid() });
const scanParamsSchema = z.object({ id: z.string().uuid() });
const scanBodySchema = z
.object({
bucket: z.string().min(1).optional(),
prefix: z.string().min(1).default("originals/"),
})
.strict();
const contentTypeMediaMap: Array<{
match: (ct: string) => boolean;
mediaType: "image" | "video";
}> = [
{ match: (ct) => ct.startsWith("image/"), mediaType: "image" },
{ match: (ct) => ct.startsWith("video/"), mediaType: "video" },
];
function inferMediaTypeFromContentType(ct: string): "image" | "video" | null {
const found = contentTypeMediaMap.find((m) => m.match(ct));
return found?.mediaType ?? null;
}
function inferExtFromContentType(ct: string): string {
const parts = ct.split("/");
const ext = parts[1] ?? "bin";
return ext.replace(/[^a-zA-Z0-9]+/g, "").toLowerCase() || "bin";
}
export function getAdminOk(headers: Headers) {
const headerToken = headers.get(ADMIN_HEADER);
return isAdminRequest({ adminToken: getAdminToken() }, { headerToken });
}
export async function handleCreateImport(input: {
adminOk: boolean;
body: unknown;
}): Promise<{ status: number; body: unknown }> {
if (!input.adminOk) {
return { status: 401, body: { error: "admin_required" } };
}
const body = createImportBodySchema.parse(input.body ?? {});
const db = getDb();
const rows = await db<
{
id: string;
type: "upload" | "minio_scan";
status: string;
created_at: string;
}[]
>`
insert into imports (type, status)
values (${body.type}, 'new')
returning id, type, status, created_at
`;
const created = rows[0];
if (!created) {
return { status: 500, body: { error: "insert_failed" } };
}
return { status: 200, body: created };
}
export async function handleUploadImport(input: {
adminOk: boolean;
params: { id: string };
request: Request;
}): Promise<{ status: number; body: unknown }> {
if (!input.adminOk) {
return { status: 401, body: { error: "admin_required" } };
}
const paramsParsed = uploadParamsSchema.safeParse(input.params);
if (!paramsParsed.success) {
return {
status: 400,
body: { error: "invalid_params", issues: paramsParsed.error.issues },
};
}
const params = paramsParsed.data;
const { randomUUID } = await import("crypto");
const { Readable } = await import("stream");
const { PutObjectCommand } = await import("@aws-sdk/client-s3");
const { getMinioBucket, getMinioInternalClient } = await import("@tline/minio");
const { enqueueProcessAsset } = await import("@tline/queue");
const contentType = input.request.headers.get("content-type") ?? "application/octet-stream";
const mediaType = inferMediaTypeFromContentType(contentType);
if (!mediaType) {
return { status: 400, body: { error: "unsupported_content_type", contentType } };
}
const bucket = getMinioBucket();
const ext = inferExtFromContentType(contentType);
const objectId = randomUUID();
const key = `staging/${params.id}/${objectId}.${ext}`;
const db = getDb();
const [imp] = await db<{ id: string }[]>`
select id
from imports
where id = ${params.id}
limit 1
`;
if (!imp) {
return { status: 404, body: { error: "import_not_found" } };
}
if (!input.request.body) {
return { status: 400, body: { error: "missing_body" } };
}
const s3 = getMinioInternalClient();
const bodyStream = Readable.fromWeb(input.request.body as unknown as NodeReadableStream);
await s3.send(
new PutObjectCommand({
Bucket: bucket,
Key: key,
Body: bodyStream,
ContentType: contentType,
}),
);
const rows = await db<
{
id: string;
status: "new" | "processing" | "ready" | "failed";
}[]
>`
insert into assets (bucket, media_type, mime_type, source_key, active_key)
values (${bucket}, ${mediaType}, ${contentType}, ${key}, ${key})
on conflict (bucket, source_key)
do update set active_key = excluded.active_key
returning id, status
`;
const asset = rows[0];
if (!asset) {
return { status: 500, body: { error: "asset_insert_failed" } };
}
await enqueueProcessAsset({ assetId: asset.id });
return {
status: 200,
body: { ok: true, importId: imp.id, assetId: asset.id, bucket, key },
};
}
export async function handleScanMinioImport(input: {
adminOk: boolean;
params: { id: string };
body: unknown;
}): Promise<{ status: number; body: unknown }> {
if (!input.adminOk) {
return { status: 401, body: { error: "admin_required" } };
}
const paramsParsed = scanParamsSchema.safeParse(input.params);
if (!paramsParsed.success) {
return {
status: 400,
body: { error: "invalid_params", issues: paramsParsed.error.issues },
};
}
const params = paramsParsed.data;
const body = scanBodySchema.parse(input.body ?? {});
const { getMinioBucket } = await import("@tline/minio");
const { enqueueScanMinioPrefix } = await import("@tline/queue");
const bucket = body.bucket ?? getMinioBucket();
const db = getDb();
const rows = await db<{ id: string }[]>`
select id
from imports
where id = ${params.id}
limit 1
`;
const imp = rows[0];
if (!imp) {
return { status: 404, body: { error: "not_found" } };
}
await enqueueScanMinioPrefix({
importId: imp.id,
bucket,
prefix: body.prefix,
});
await db`
update imports
set status = 'queued'
where id = ${imp.id}
`;
return {
status: 200,
body: { ok: true, importId: imp.id, bucket, prefix: body.prefix },
};
}

View File

@@ -1,37 +1,12 @@
import { z } from "zod";
import { getDb } from "@tline/db";
import { getAdminOk, handleCreateImport } from "./handlers";
export const runtime = "nodejs";
const bodySchema = z
.object({
type: z.enum(["upload", "minio_scan"]).default("upload"),
})
.strict();
export async function POST(request: Request): Promise<Response> {
const bodyJson = await request.json().catch(() => ({}));
const body = bodySchema.parse(bodyJson);
const db = getDb();
const rows = await db<
{
id: string;
type: "upload" | "minio_scan";
status: string;
created_at: string;
}[]
>`
insert into imports (type, status)
values (${body.type}, 'new')
returning id, type, status, created_at
`;
const created = rows[0];
if (!created) {
return Response.json({ error: "insert_failed" }, { status: 500 });
}
return Response.json(created);
const res = await handleCreateImport({
adminOk: getAdminOk(request.headers),
body: bodyJson,
});
return Response.json(res.body, { status: res.status });
}

View File

@@ -0,0 +1,8 @@
import { test, expect } from "bun:test";
test("imports POST rejects when missing admin token", async () => {
const { handleCreateImport } = await import("../../app/api/imports/handlers");
const res = await handleCreateImport({ adminOk: false, body: {} });
expect(res.status).toBe(401);
expect(res.body).toEqual({ error: "admin_required" });
});