feat: add mp4 transcode job and variant record

This commit is contained in:
William Valentin
2026-02-01 15:48:01 -08:00
parent 0bf2f2d827
commit 4fecfd469f
5 changed files with 129 additions and 3 deletions

View File

@@ -0,0 +1,10 @@
import { test, expect } from "bun:test";
import { shouldTranscodeToMp4 } from "../transcode";
test("transcode runs for non-mp4 videos", () => {
expect(shouldTranscodeToMp4({ mimeType: "video/x-matroska" })).toBe(true);
});
test("transcode skips for mp4", () => {
expect(shouldTranscodeToMp4({ mimeType: "video/mp4" })).toBe(false);
});

View File

@@ -7,7 +7,8 @@ import { closeDb } from "@tline/db";
import {
handleCopyToCanonical,
handleProcessAsset,
handleScanMinioPrefix
handleScanMinioPrefix,
handleTranscodeVideoMp4
} from "./jobs";
console.log(`[${getAppName()}] worker boot`);
@@ -30,6 +31,7 @@ const worker = new Worker(
if (job.name === "scan_minio_prefix") return handleScanMinioPrefix(job.data);
if (job.name === "process_asset") return handleProcessAsset(job.data);
if (job.name === "copy_to_canonical") return handleCopyToCanonical(job.data);
if (job.name === "transcode_video_mp4") return handleTranscodeVideoMp4(job.data);
throw new Error(`Unknown job: ${job.name}`);
},

View File

@@ -27,10 +27,14 @@ import {
copyToCanonicalPayloadSchema,
enqueueCopyToCanonical,
enqueueProcessAsset,
enqueueTranscodeVideoMp4,
processAssetPayloadSchema,
scanMinioPrefixPayloadSchema,
transcodeVideoMp4PayloadSchema,
} from "@tline/queue";
import { shouldTranscodeToMp4 } from "./transcode";
const allowedScanPrefixes = ["originals/"] as const;
function assertAllowedScanPrefix(prefix: string) {
@@ -584,6 +588,10 @@ export async function handleProcessAsset(raw: unknown) {
where id = ${asset.id}
`;
if (asset.media_type === "video" && shouldTranscodeToMp4({ mimeType: asset.mime_type })) {
await enqueueTranscodeVideoMp4({ assetId: asset.id });
}
// Only uploads (staging/*) are copied into canonical by default.
if (asset.active_key.startsWith("staging/")) {
await enqueueCopyToCanonical({ assetId: asset.id });
@@ -606,6 +614,91 @@ export async function handleProcessAsset(raw: unknown) {
}
}
export async function handleTranscodeVideoMp4(raw: unknown) {
const payload = transcodeVideoMp4PayloadSchema.parse(raw);
const db = getDb();
const s3 = getMinioInternalClient();
const [asset] = await db<
{
id: string;
bucket: string;
active_key: string;
mime_type: string;
}[]
>`
select id, bucket, active_key, mime_type
from assets
where id = ${payload.assetId}
limit 1
`;
if (!asset) throw new Error(`Asset not found: ${payload.assetId}`);
if (!shouldTranscodeToMp4({ mimeType: asset.mime_type })) {
return { ok: true, assetId: asset.id, skipped: "already_mp4" };
}
const tempDir = await mkdtemp(join(tmpdir(), "tline-transcode-"));
try {
const containerExt = asset.mime_type.split("/")[1] ?? "bin";
const inputPath = join(tempDir, `input.${containerExt}`);
const getRes = await s3.send(
new GetObjectCommand({
Bucket: asset.bucket,
Key: asset.active_key,
}),
);
if (!getRes.Body) throw new Error("Empty response body from S3");
await streamToFile(getRes.Body as Readable, inputPath);
const outputPath = join(tempDir, "mp4_720p.mp4");
await runCommand("ffmpeg", [
"-i",
inputPath,
"-vf",
"scale=-2:720",
"-c:v",
"libx264",
"-preset",
"fast",
"-crf",
"23",
"-c:a",
"aac",
"-b:a",
"128k",
"-movflags",
"+faststart",
"-y",
outputPath,
]);
const derivedKey = `derived/video/${asset.id}/mp4_720p.mp4`;
await uploadObject({
bucket: asset.bucket,
key: derivedKey,
filePath: outputPath,
contentType: "video/mp4",
});
await upsertVariant({
assetId: asset.id,
kind: "video_mp4",
size: 720,
key: derivedKey,
mimeType: "video/mp4",
width: null,
height: 720,
});
return { ok: true, assetId: asset.id, key: derivedKey };
} finally {
await rm(tempDir, { recursive: true, force: true });
}
}
export async function handleCopyToCanonical(raw: unknown) {
const payload = copyToCanonicalPayloadSchema.parse(raw);

View File

@@ -0,0 +1,3 @@
export function shouldTranscodeToMp4(input: { mimeType: string }) {
return input.mimeType !== "video/mp4";
}

View File

@@ -11,7 +11,8 @@ const envSchema = z.object({
export const jobNameSchema = z.enum([
"scan_minio_prefix",
"process_asset",
"copy_to_canonical"
"copy_to_canonical",
"transcode_video_mp4"
]);
export type QueueJobName = z.infer<typeof jobNameSchema>;
@@ -36,15 +37,23 @@ export const copyToCanonicalPayloadSchema = z
})
.strict();
export const transcodeVideoMp4PayloadSchema = z
.object({
assetId: z.string().uuid()
})
.strict();
export const payloadByJobNameSchema = z.discriminatedUnion("name", [
z.object({ name: z.literal("scan_minio_prefix"), payload: scanMinioPrefixPayloadSchema }),
z.object({ name: z.literal("process_asset"), payload: processAssetPayloadSchema }),
z.object({ name: z.literal("copy_to_canonical"), payload: copyToCanonicalPayloadSchema })
z.object({ name: z.literal("copy_to_canonical"), payload: copyToCanonicalPayloadSchema }),
z.object({ name: z.literal("transcode_video_mp4"), payload: transcodeVideoMp4PayloadSchema })
]);
export type ScanMinioPrefixPayload = z.infer<typeof scanMinioPrefixPayloadSchema>;
export type ProcessAssetPayload = z.infer<typeof processAssetPayloadSchema>;
export type CopyToCanonicalPayload = z.infer<typeof copyToCanonicalPayloadSchema>;
export type TranscodeVideoMp4Payload = z.infer<typeof transcodeVideoMp4PayloadSchema>;
type QueueEnv = z.infer<typeof envSchema>;
@@ -126,3 +135,12 @@ export async function enqueueCopyToCanonical(input: CopyToCanonicalPayload) {
backoff: { type: "exponential", delay: 1000 }
});
}
export async function enqueueTranscodeVideoMp4(input: TranscodeVideoMp4Payload) {
const payload = transcodeVideoMp4PayloadSchema.parse(input);
const queue = getQueue();
return queue.add("transcode_video_mp4", payload, {
attempts: 3,
backoff: { type: "exponential", delay: 1000 }
});
}