diff --git a/apps/worker/src/__tests__/transcode-plan.test.ts b/apps/worker/src/__tests__/transcode-plan.test.ts new file mode 100644 index 0000000..c6c1d55 --- /dev/null +++ b/apps/worker/src/__tests__/transcode-plan.test.ts @@ -0,0 +1,10 @@ +import { test, expect } from "bun:test"; +import { shouldTranscodeToMp4 } from "../transcode"; + +test("transcode runs for non-mp4 videos", () => { + expect(shouldTranscodeToMp4({ mimeType: "video/x-matroska" })).toBe(true); +}); + +test("transcode skips for mp4", () => { + expect(shouldTranscodeToMp4({ mimeType: "video/mp4" })).toBe(false); +}); diff --git a/apps/worker/src/index.ts b/apps/worker/src/index.ts index 7f8bfed..729fac8 100644 --- a/apps/worker/src/index.ts +++ b/apps/worker/src/index.ts @@ -7,7 +7,8 @@ import { closeDb } from "@tline/db"; import { handleCopyToCanonical, handleProcessAsset, - handleScanMinioPrefix + handleScanMinioPrefix, + handleTranscodeVideoMp4 } from "./jobs"; console.log(`[${getAppName()}] worker boot`); @@ -30,6 +31,7 @@ const worker = new Worker( if (job.name === "scan_minio_prefix") return handleScanMinioPrefix(job.data); if (job.name === "process_asset") return handleProcessAsset(job.data); if (job.name === "copy_to_canonical") return handleCopyToCanonical(job.data); + if (job.name === "transcode_video_mp4") return handleTranscodeVideoMp4(job.data); throw new Error(`Unknown job: ${job.name}`); }, diff --git a/apps/worker/src/jobs.ts b/apps/worker/src/jobs.ts index 2f4465e..6fc1631 100644 --- a/apps/worker/src/jobs.ts +++ b/apps/worker/src/jobs.ts @@ -27,10 +27,14 @@ import { copyToCanonicalPayloadSchema, enqueueCopyToCanonical, enqueueProcessAsset, + enqueueTranscodeVideoMp4, processAssetPayloadSchema, scanMinioPrefixPayloadSchema, + transcodeVideoMp4PayloadSchema, } from "@tline/queue"; +import { shouldTranscodeToMp4 } from "./transcode"; + const allowedScanPrefixes = ["originals/"] as const; function assertAllowedScanPrefix(prefix: string) { @@ -584,6 +588,10 @@ export async function handleProcessAsset(raw: unknown) { where id = ${asset.id} `; + if (asset.media_type === "video" && shouldTranscodeToMp4({ mimeType: asset.mime_type })) { + await enqueueTranscodeVideoMp4({ assetId: asset.id }); + } + // Only uploads (staging/*) are copied into canonical by default. if (asset.active_key.startsWith("staging/")) { await enqueueCopyToCanonical({ assetId: asset.id }); @@ -606,6 +614,91 @@ export async function handleProcessAsset(raw: unknown) { } } +export async function handleTranscodeVideoMp4(raw: unknown) { + const payload = transcodeVideoMp4PayloadSchema.parse(raw); + const db = getDb(); + const s3 = getMinioInternalClient(); + + const [asset] = await db< + { + id: string; + bucket: string; + active_key: string; + mime_type: string; + }[] + >` + select id, bucket, active_key, mime_type + from assets + where id = ${payload.assetId} + limit 1 + `; + + if (!asset) throw new Error(`Asset not found: ${payload.assetId}`); + + if (!shouldTranscodeToMp4({ mimeType: asset.mime_type })) { + return { ok: true, assetId: asset.id, skipped: "already_mp4" }; + } + + const tempDir = await mkdtemp(join(tmpdir(), "tline-transcode-")); + + try { + const containerExt = asset.mime_type.split("/")[1] ?? "bin"; + const inputPath = join(tempDir, `input.${containerExt}`); + const getRes = await s3.send( + new GetObjectCommand({ + Bucket: asset.bucket, + Key: asset.active_key, + }), + ); + if (!getRes.Body) throw new Error("Empty response body from S3"); + await streamToFile(getRes.Body as Readable, inputPath); + + const outputPath = join(tempDir, "mp4_720p.mp4"); + await runCommand("ffmpeg", [ + "-i", + inputPath, + "-vf", + "scale=-2:720", + "-c:v", + "libx264", + "-preset", + "fast", + "-crf", + "23", + "-c:a", + "aac", + "-b:a", + "128k", + "-movflags", + "+faststart", + "-y", + outputPath, + ]); + + const derivedKey = `derived/video/${asset.id}/mp4_720p.mp4`; + await uploadObject({ + bucket: asset.bucket, + key: derivedKey, + filePath: outputPath, + contentType: "video/mp4", + }); + + await upsertVariant({ + assetId: asset.id, + kind: "video_mp4", + size: 720, + key: derivedKey, + mimeType: "video/mp4", + width: null, + height: 720, + }); + + return { ok: true, assetId: asset.id, key: derivedKey }; + } finally { + await rm(tempDir, { recursive: true, force: true }); + } +} + export async function handleCopyToCanonical(raw: unknown) { const payload = copyToCanonicalPayloadSchema.parse(raw); diff --git a/apps/worker/src/transcode.ts b/apps/worker/src/transcode.ts new file mode 100644 index 0000000..0edd549 --- /dev/null +++ b/apps/worker/src/transcode.ts @@ -0,0 +1,3 @@ +export function shouldTranscodeToMp4(input: { mimeType: string }) { + return input.mimeType !== "video/mp4"; +} diff --git a/packages/queue/src/index.ts b/packages/queue/src/index.ts index f969488..408a676 100644 --- a/packages/queue/src/index.ts +++ b/packages/queue/src/index.ts @@ -11,7 +11,8 @@ const envSchema = z.object({ export const jobNameSchema = z.enum([ "scan_minio_prefix", "process_asset", - "copy_to_canonical" + "copy_to_canonical", + "transcode_video_mp4" ]); export type QueueJobName = z.infer; @@ -36,15 +37,23 @@ export const copyToCanonicalPayloadSchema = z }) .strict(); +export const transcodeVideoMp4PayloadSchema = z + .object({ + assetId: z.string().uuid() + }) + .strict(); + export const payloadByJobNameSchema = z.discriminatedUnion("name", [ z.object({ name: z.literal("scan_minio_prefix"), payload: scanMinioPrefixPayloadSchema }), z.object({ name: z.literal("process_asset"), payload: processAssetPayloadSchema }), - z.object({ name: z.literal("copy_to_canonical"), payload: copyToCanonicalPayloadSchema }) + z.object({ name: z.literal("copy_to_canonical"), payload: copyToCanonicalPayloadSchema }), + z.object({ name: z.literal("transcode_video_mp4"), payload: transcodeVideoMp4PayloadSchema }) ]); export type ScanMinioPrefixPayload = z.infer; export type ProcessAssetPayload = z.infer; export type CopyToCanonicalPayload = z.infer; +export type TranscodeVideoMp4Payload = z.infer; type QueueEnv = z.infer; @@ -126,3 +135,12 @@ export async function enqueueCopyToCanonical(input: CopyToCanonicalPayload) { backoff: { type: "exponential", delay: 1000 } }); } + +export async function enqueueTranscodeVideoMp4(input: TranscodeVideoMp4Payload) { + const payload = transcodeVideoMp4PayloadSchema.parse(input); + const queue = getQueue(); + return queue.add("transcode_video_mp4", payload, { + attempts: 3, + backoff: { type: "exponential", delay: 1000 } + }); +}