Files
porthole/apps/worker/src/jobs.ts
T
2026-02-04 19:32:16 -08:00

893 lines
26 KiB
TypeScript

import { spawn } from "child_process";
import { mkdtemp, rm } from "fs/promises";
import { tmpdir } from "os";
import { join } from "path";
import { createWriteStream, createReadStream } from "fs";
import { Readable } from "stream";
import sharp from "sharp";
import {
computeImageVariantPlan,
computeVideoPosterPlan,
pickSmallestVariantSize,
} from "./variants";
import {
CopyObjectCommand,
GetObjectCommand,
HeadObjectCommand,
ListObjectsV2Command,
PutObjectCommand
} from "@aws-sdk/client-s3";
import { getDb } from "@tline/db";
import { getMinioInternalClient } from "@tline/minio";
import {
copyToCanonicalPayloadSchema,
enqueueCopyToCanonical,
enqueueProcessAsset,
enqueueTranscodeVideoMp4,
processAssetPayloadSchema,
scanMinioPrefixPayloadSchema,
transcodeVideoMp4PayloadSchema,
} from "@tline/queue";
import { shouldTranscodeToMp4 } from "./transcode";
import { computeFileSha256 } from "./hash-utils";
const allowedScanPrefixes = ["originals/"] as const;
function assertAllowedScanPrefix(prefix: string) {
if (allowedScanPrefixes.some((allowed) => prefix.startsWith(allowed))) return;
throw new Error(`scan prefix not allowed: ${prefix}`);
}
function getExtensionLower(key: string) {
const dot = key.lastIndexOf(".");
if (dot === -1) return "";
return key.slice(dot + 1).toLowerCase();
}
function inferMedia(
key: string,
): { mediaType: "image" | "video"; mimeType: string } | null {
const ext = getExtensionLower(key);
if (["jpg", "jpeg"].includes(ext))
return { mediaType: "image", mimeType: "image/jpeg" };
if (ext === "png") return { mediaType: "image", mimeType: "image/png" };
if (ext === "gif") return { mediaType: "image", mimeType: "image/gif" };
if (ext === "webp") return { mediaType: "image", mimeType: "image/webp" };
if (ext === "heic") return { mediaType: "image", mimeType: "image/heic" };
if (ext === "heif") return { mediaType: "image", mimeType: "image/heif" };
if (ext === "mov") return { mediaType: "video", mimeType: "video/quicktime" };
if (ext === "mp4") return { mediaType: "video", mimeType: "video/mp4" };
if (ext === "m4v") return { mediaType: "video", mimeType: "video/x-m4v" };
if (ext === "mkv")
return { mediaType: "video", mimeType: "video/x-matroska" };
return null;
}
async function listAllObjectKeys(input: { bucket: string; prefix: string }) {
const s3 = getMinioInternalClient();
const keys: string[] = [];
let continuationToken: string | undefined;
do {
const res = await s3.send(
new ListObjectsV2Command({
Bucket: input.bucket,
Prefix: input.prefix,
ContinuationToken: continuationToken,
}),
);
for (const obj of res.Contents ?? []) {
if (!obj.Key) continue;
keys.push(obj.Key);
}
continuationToken = res.IsTruncated ? res.NextContinuationToken : undefined;
} while (continuationToken);
return keys;
}
export async function handleScanMinioPrefix(raw: unknown) {
const payload = scanMinioPrefixPayloadSchema.parse(raw);
assertAllowedScanPrefix(payload.prefix);
const keys = await listAllObjectKeys({
bucket: payload.bucket,
prefix: payload.prefix,
});
const db = getDb();
let processed = 0;
let skipped = 0;
let enqueued = 0;
for (const key of keys) {
if (key.endsWith("/")) {
skipped++;
continue;
}
const inferred = inferMedia(key);
if (!inferred) {
skipped++;
continue;
}
const rows = await db<
{
id: string;
status: "new" | "processing" | "ready" | "failed";
}[]
>`
insert into assets (bucket, media_type, mime_type, source_key, active_key)
values (${payload.bucket}, ${inferred.mediaType}, ${inferred.mimeType}, ${key}, ${key})
on conflict (bucket, source_key)
do update
set media_type = excluded.media_type,
mime_type = excluded.mime_type,
active_key = excluded.active_key
returning id, status
`;
processed++;
const [asset] = rows;
if (!asset) continue;
if (asset.status === "new" || asset.status === "failed") {
await enqueueProcessAsset({ assetId: asset.id });
enqueued++;
}
}
return {
ok: true,
importId: payload.importId,
bucket: payload.bucket,
scannedPrefix: payload.prefix,
found: keys.length,
processed,
skipped,
enqueued,
};
}
function streamToFile(stream: Readable, filePath: string): Promise<void> {
return new Promise((resolve, reject) => {
const writeStream = createWriteStream(filePath);
stream.pipe(writeStream);
writeStream.on("finish", resolve);
writeStream.on("error", reject);
});
}
async function runCommand(cmd: string, args: string[]): Promise<string> {
return new Promise((resolve, reject) => {
const proc = spawn(cmd, args);
let stdout = "";
let stderr = "";
proc.stdout.on("data", (data) => {
stdout += data.toString();
});
proc.stderr.on("data", (data) => {
stderr += data.toString();
});
proc.on("close", (code) => {
if (code === 0) {
resolve(stdout);
} else {
reject(new Error(`${cmd} failed with code ${code}: ${stderr}`));
}
});
proc.on("error", reject);
});
}
async function uploadObject(input: {
bucket: string;
key: string;
filePath: string;
contentType?: string;
}): Promise<void> {
const s3 = getMinioInternalClient();
await s3.send(
new PutObjectCommand({
Bucket: input.bucket,
Key: input.key,
Body: createReadStream(input.filePath),
ContentType: input.contentType,
}),
);
}
async function upsertVariant(input: {
assetId: string;
kind: "thumb" | "poster" | "video_mp4";
size: number;
key: string;
mimeType: string;
width?: number | null;
height?: number | null;
}) {
const db = getDb();
await db`
insert into asset_variants (asset_id, kind, size, key, mime_type, width, height)
values (
${input.assetId},
${input.kind},
${input.size},
${input.key},
${input.mimeType},
${input.width ?? null},
${input.height ?? null}
)
on conflict (asset_id, kind, size)
do update set key = excluded.key,
mime_type = excluded.mime_type,
width = excluded.width,
height = excluded.height
`;
}
async function upsertAssetHash(input: { assetId: string; bucket: string; sha256: string }) {
const db = getDb();
await db`
insert into asset_hashes (asset_id, bucket, sha256)
values (${input.assetId}, ${input.bucket}, ${input.sha256})
on conflict (asset_id)
do update set sha256 = excluded.sha256, bucket = excluded.bucket
`;
}
async function getObjectLastModified(input: { bucket: string; key: string }): Promise<Date | null> {
const s3 = getMinioInternalClient();
const res = await s3.send(new HeadObjectCommand({ Bucket: input.bucket, Key: input.key }));
return res.LastModified ?? null;
}
function parseExifDate(dateStr: string | undefined): Date | null {
if (!dateStr) return null;
const s = dateStr.trim();
// ExifTool commonly emits: "YYYY:MM:DD HH:MM:SS", sometimes with fractional seconds and/or tz.
const m = s.match(
/^(\d{4}):(\d{2}):(\d{2})[ T](\d{2}):(\d{2}):(\d{2})(\.\d+)?(?:\s*(Z|[+-]\d{2}:\d{2}))?$/,
);
if (m) {
const [, y, mo, d, hh, mm, ss, frac, tz] = m;
// If tz missing, prefer deterministic UTC over server-local interpretation.
const iso = `${y}-${mo}-${d}T${hh}:${mm}:${ss}${frac ?? ""}${tz ?? "Z"}`;
const date = new Date(iso);
return isNaN(date.getTime()) ? null : date;
}
const date = new Date(s);
return isNaN(date.getTime()) ? null : date;
}
function parseGpsParts(parts: number[]): number | null {
if (parts.length === 0 || !Number.isFinite(parts[0])) return null;
const [deg, min, sec] = parts;
const sign = deg < 0 ? -1 : 1;
let value = Math.abs(deg);
if (Number.isFinite(min)) value += Math.abs(min) / 60;
if (Number.isFinite(sec)) value += Math.abs(sec) / 3600;
return sign * value;
}
function parseGpsFraction(input: string): number | null {
const trimmed = input.trim();
if (!trimmed) return null;
const match = trimmed.match(/^(-?\d+(?:\.\d+)?)\s*\/\s*(\d+(?:\.\d+)?)$/);
if (!match) return null;
const numerator = Number(match[1]);
const denominator = Number(match[2]);
if (!Number.isFinite(numerator) || !Number.isFinite(denominator)) return null;
if (denominator === 0) return null;
return numerator / denominator;
}
function parseGpsValue(value: unknown): number | null {
if (typeof value === "number") {
return Number.isFinite(value) ? value : null;
}
if (typeof value === "string") {
const trimmed = value.trim();
if (!trimmed) return null;
const direct = Number(trimmed);
if (!Number.isNaN(direct)) return direct;
const fraction = parseGpsFraction(trimmed);
if (fraction !== null) return fraction;
const parts = trimmed.match(/-?\d+(?:\.\d+)?/g);
if (!parts) return null;
return parseGpsParts(parts.map((part) => Number(part)).filter(Number.isFinite));
}
if (Array.isArray(value)) {
const parts = value
.map((part) => {
if (typeof part === "number") return part;
if (typeof part === "string") {
const fraction = parseGpsFraction(part);
if (fraction !== null) return fraction;
return Number(part);
}
if (typeof part === "object" && part !== null) {
const candidate = part as Record<string, unknown>;
const numerator = Number(candidate.numerator);
const denominator = Number(candidate.denominator);
if (Number.isFinite(numerator) && Number.isFinite(denominator) && denominator !== 0) {
return numerator / denominator;
}
}
return NaN;
})
.filter(Number.isFinite);
return parseGpsParts(parts);
}
return null;
}
function applyRefSign(value: number, ref: unknown, valueRaw: unknown): number {
const refChar = typeof ref === "string" ? ref.trim().toUpperCase() : "";
const rawChar =
typeof valueRaw === "string"
? (valueRaw.trim().match(/[NSEW]/i)?.[0]?.toUpperCase() ?? "")
: "";
const normalized = refChar || rawChar;
if (normalized === "S" || normalized === "W") return -Math.abs(value);
if (normalized === "N" || normalized === "E") return Math.abs(value);
return value;
}
function parseGpsCoord(
value: unknown,
ref: unknown,
kind: "lat" | "lon",
): number | null {
const parsed = parseGpsValue(value);
if (parsed === null) return null;
const signed = applyRefSign(parsed, ref, value);
if (!Number.isFinite(signed)) return null;
if (kind === "lat") {
return signed >= -90 && signed <= 90 ? signed : null;
}
return signed >= -180 && signed <= 180 ? signed : null;
}
function extractGps(tags: Record<string, unknown>) {
const lat = parseGpsCoord(tags.GPSLatitude, tags.GPSLatitudeRef, "lat");
const lon = parseGpsCoord(tags.GPSLongitude, tags.GPSLongitudeRef, "lon");
if (lat === null || lon === null) return null;
return { lat, lon };
}
function isPlausibleCaptureTs(date: Date) {
const ts = date.getTime();
if (!Number.isFinite(ts)) return false;
const year = date.getUTCFullYear();
// Guard against bogus container/default dates; allow up to 24h in future.
return year >= 1971 && ts <= Date.now() + 24 * 60 * 60 * 1000;
}
function inferExtFromKey(key: string): string {
const ext = getExtensionLower(key);
return ext || "bin";
}
function pad2(n: number) {
return String(n).padStart(2, "0");
}
function utcDateParts(date: Date) {
const y = date.getUTCFullYear();
const m = date.getUTCMonth() + 1;
const d = date.getUTCDate();
return { y, m, d };
}
export async function handleProcessAsset(raw: unknown) {
const payload = processAssetPayloadSchema.parse(raw);
const db = getDb();
const s3 = getMinioInternalClient();
await db`
update assets
set status = 'processing', error_message = null
where id = ${payload.assetId}
and status in ('new', 'failed')
`;
try {
const [asset] = await db<
{
id: string;
bucket: string;
active_key: string;
media_type: "image" | "video";
mime_type: string;
created_at: Date;
}[]
>`
select id, bucket, active_key, media_type, mime_type, created_at
from assets
where id = ${payload.assetId}
`;
if (!asset) {
throw new Error(`Asset not found: ${payload.assetId}`);
}
const tempDir = await mkdtemp(join(tmpdir(), "tline-process-"));
try {
const containerExt = asset.mime_type.split("/")[1] ?? "bin";
const inputPath = join(tempDir, `input.${containerExt}`);
const getRes = await s3.send(
new GetObjectCommand({
Bucket: asset.bucket,
Key: asset.active_key,
}),
);
if (!getRes.Body) throw new Error("Empty response body from S3");
await streamToFile(getRes.Body as Readable, inputPath);
const sha256 = await computeFileSha256(inputPath);
await upsertAssetHash({ assetId: asset.id, bucket: asset.bucket, sha256 });
const updates: Record<string, unknown> = {
capture_ts_utc: null,
date_confidence: null,
width: null,
height: null,
rotation: null,
duration_seconds: null,
thumb_small_key: null,
thumb_med_key: null,
poster_key: null,
raw_tags_json: null,
gps_lat: null,
gps_lon: null
};
let rawTags: Record<string, unknown> = {};
let captureTs: Date | null = null;
let dateConfidence:
| "camera"
| "container"
| "object_mtime"
| "import_time"
| null = null;
async function tryReadExifTags(): Promise<Record<string, unknown>> {
try {
const exifOutput = await runCommand("exiftool", ["-j", inputPath]);
const exifData = JSON.parse(exifOutput);
if (Array.isArray(exifData) && exifData.length > 0) {
const first = exifData[0];
if (first && typeof first === "object") {
return first as Record<string, unknown>;
}
}
return {};
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
return { exiftool_error: message };
}
}
function maybeSetCaptureDateFromTags(tags: Record<string, unknown>) {
if (captureTs) return;
// ExifTool uses different fields across image/video vendors.
const dateFields = [
"DateTimeOriginal",
"CreateDate",
"ModifyDate",
"MediaCreateDate",
"TrackCreateDate",
"CreationDate",
"GPSDateTime",
] as const;
for (const field of dateFields) {
const val = tags[field] as string | undefined;
if (!val) continue;
const parsed = parseExifDate(val);
if (parsed && isPlausibleCaptureTs(parsed)) {
captureTs = parsed;
dateConfidence = "camera";
return;
}
}
}
async function applyObjectMtimeFallback() {
if (captureTs) return;
try {
const mtime = await getObjectLastModified({
bucket: asset.bucket,
key: asset.active_key,
});
if (!mtime) return;
if (!isPlausibleCaptureTs(mtime)) return;
captureTs = mtime;
dateConfidence = "object_mtime";
rawTags = { ...rawTags, object_last_modified: mtime.toISOString() };
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
rawTags = { ...rawTags, object_last_modified_error: message };
}
}
if (asset.media_type === "image") {
rawTags = await tryReadExifTags();
maybeSetCaptureDateFromTags(rawTags);
const gps = extractGps(rawTags);
if (gps) {
updates.gps_lat = gps.lat;
updates.gps_lon = gps.lon;
}
await applyObjectMtimeFallback();
if (rawTags.ImageWidth !== undefined) updates.width = Number(rawTags.ImageWidth);
if (rawTags.ImageHeight !== undefined) updates.height = Number(rawTags.ImageHeight);
if (rawTags.Rotation !== undefined) updates.rotation = Number(rawTags.Rotation);
const imgMeta = await sharp(inputPath).metadata();
if (updates.width === null && imgMeta.width) updates.width = imgMeta.width;
if (updates.height === null && imgMeta.height) updates.height = imgMeta.height;
const imagePlan = computeImageVariantPlan();
const thumbKeys: Record<number, string> = {};
for (const item of imagePlan) {
const size = item.size;
const thumbPath = join(tempDir, `thumb_${size}.jpg`);
await sharp(inputPath)
.rotate()
.resize(size, size, { fit: "inside", withoutEnlargement: true })
.jpeg({ quality: 80 })
.toFile(thumbPath);
const thumbKey = `thumbs/${asset.id}/image_${size}.jpg`;
await uploadObject({
bucket: asset.bucket,
key: thumbKey,
filePath: thumbPath,
contentType: "image/jpeg",
});
await upsertVariant({
assetId: asset.id,
kind: "thumb",
size,
key: thumbKey,
mimeType: "image/jpeg",
width: typeof updates.width === "number" ? updates.width : null,
height: typeof updates.height === "number" ? updates.height : null,
});
thumbKeys[size] = thumbKey;
}
updates.thumb_small_key = thumbKeys[256] ?? null;
updates.thumb_med_key = thumbKeys[768] ?? null;
} else if (asset.media_type === "video") {
rawTags = await tryReadExifTags();
maybeSetCaptureDateFromTags(rawTags);
const gps = extractGps(rawTags);
if (gps) {
updates.gps_lat = gps.lat;
updates.gps_lon = gps.lon;
}
const ffprobeOutput = await runCommand("ffprobe", [
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=width,height,duration",
"-show_entries",
"format_tags=creation_time",
"-of",
"json",
inputPath
]);
const ffprobeData = JSON.parse(ffprobeOutput);
if (!captureTs && ffprobeData.format?.tags?.creation_time) {
const ts = new Date(ffprobeData.format.tags.creation_time);
if (!isNaN(ts.getTime()) && isPlausibleCaptureTs(ts)) {
captureTs = ts;
dateConfidence = "container";
}
}
await applyObjectMtimeFallback();
if (ffprobeData.streams?.[0]) {
const stream = ffprobeData.streams[0];
if (stream.width) updates.width = Number(stream.width);
if (stream.height) updates.height = Number(stream.height);
if (stream.duration)
updates.duration_seconds = Math.round(Number(stream.duration));
}
rawTags = { ...rawTags, ffprobe: ffprobeData };
const posterPlan = computeVideoPosterPlan();
const posterSmallest = pickSmallestVariantSize(posterPlan);
const posterKeys: Record<number, string> = {};
for (const item of posterPlan) {
const size = item.size;
const posterPath = join(tempDir, `poster_${size}.jpg`);
await runCommand("ffmpeg", [
"-i",
inputPath,
"-vf",
`scale=${size}:${size}:force_original_aspect_ratio=decrease`,
"-vframes",
"1",
"-q:v",
"2",
"-y",
posterPath
]);
const posterKey = `thumbs/${asset.id}/poster_${size}.jpg`;
await uploadObject({
bucket: asset.bucket,
key: posterKey,
filePath: posterPath,
contentType: "image/jpeg",
});
await upsertVariant({
assetId: asset.id,
kind: "poster",
size,
key: posterKey,
mimeType: "image/jpeg",
width: typeof updates.width === "number" ? updates.width : null,
height: typeof updates.height === "number" ? updates.height : null,
});
posterKeys[size] = posterKey;
}
updates.poster_key = posterSmallest ? posterKeys[posterSmallest] ?? null : null;
}
if (asset.media_type === "video" && typeof updates.poster_key !== "string") {
throw new Error("poster generation did not produce output");
}
if (
asset.media_type === "image" &&
(typeof updates.thumb_small_key !== "string" || typeof updates.thumb_med_key !== "string")
) {
throw new Error("thumb generation did not produce output");
}
if (!captureTs) {
captureTs = new Date(asset.created_at);
dateConfidence = "import_time";
rawTags = {
...rawTags,
capture_date_fallback: "import_time",
};
}
updates.capture_ts_utc = captureTs;
updates.date_confidence = dateConfidence;
updates.raw_tags_json = rawTags;
await db`
update assets
set ${db(
updates,
"capture_ts_utc",
"date_confidence",
"width",
"height",
"rotation",
"duration_seconds",
"thumb_small_key",
"thumb_med_key",
"poster_key",
"raw_tags_json",
"gps_lat",
"gps_lon"
)}, status = 'ready', error_message = null
where id = ${asset.id}
`;
if (asset.media_type === "video" && shouldTranscodeToMp4({ mimeType: asset.mime_type })) {
await enqueueTranscodeVideoMp4({ assetId: asset.id });
}
// Only uploads (staging/*) are copied into canonical by default.
if (asset.active_key.startsWith("staging/")) {
await enqueueCopyToCanonical({ assetId: asset.id });
}
return { ok: true };
} finally {
await rm(tempDir, { recursive: true, force: true });
}
} catch (err) {
const message = err instanceof Error ? err.message : "unknown_error";
await db`
update assets
set status = 'failed', error_message = ${message}
where id = ${payload.assetId}
`;
throw err;
}
}
export async function handleTranscodeVideoMp4(raw: unknown) {
const payload = transcodeVideoMp4PayloadSchema.parse(raw);
const db = getDb();
const s3 = getMinioInternalClient();
const [asset] = await db<
{
id: string;
bucket: string;
active_key: string;
mime_type: string;
}[]
>`
select id, bucket, active_key, mime_type
from assets
where id = ${payload.assetId}
limit 1
`;
if (!asset) throw new Error(`Asset not found: ${payload.assetId}`);
if (!shouldTranscodeToMp4({ mimeType: asset.mime_type })) {
return { ok: true, assetId: asset.id, skipped: "already_mp4" };
}
const tempDir = await mkdtemp(join(tmpdir(), "tline-transcode-"));
try {
const containerExt = asset.mime_type.split("/")[1] ?? "bin";
const inputPath = join(tempDir, `input.${containerExt}`);
const getRes = await s3.send(
new GetObjectCommand({
Bucket: asset.bucket,
Key: asset.active_key,
}),
);
if (!getRes.Body) throw new Error("Empty response body from S3");
await streamToFile(getRes.Body as Readable, inputPath);
const sha256 = await computeFileSha256(inputPath);
await upsertAssetHash({ assetId: asset.id, bucket: asset.bucket, sha256 });
const outputPath = join(tempDir, "mp4_720p.mp4");
await runCommand("ffmpeg", [
"-i",
inputPath,
"-vf",
"scale=-2:720",
"-c:v",
"libx264",
"-preset",
"fast",
"-crf",
"23",
"-c:a",
"aac",
"-b:a",
"128k",
"-movflags",
"+faststart",
"-y",
outputPath,
]);
const derivedKey = `derived/video/${asset.id}/mp4_720p.mp4`;
await uploadObject({
bucket: asset.bucket,
key: derivedKey,
filePath: outputPath,
contentType: "video/mp4",
});
await upsertVariant({
assetId: asset.id,
kind: "video_mp4",
size: 720,
key: derivedKey,
mimeType: "video/mp4",
width: null,
height: 720,
});
return { ok: true, assetId: asset.id, key: derivedKey };
} finally {
await rm(tempDir, { recursive: true, force: true });
}
}
export async function handleCopyToCanonical(raw: unknown) {
const payload = copyToCanonicalPayloadSchema.parse(raw);
const db = getDb();
const s3 = getMinioInternalClient();
const [asset] = await db<
{
id: string;
bucket: string;
source_key: string;
active_key: string;
canonical_key: string | null;
capture_ts_utc: Date | null;
}[]
>`
select id, bucket, source_key, active_key, canonical_key, capture_ts_utc
from assets
where id = ${payload.assetId}
limit 1
`;
if (!asset) throw new Error(`Asset not found: ${payload.assetId}`);
// Canonical layout is date-based; if we don't have a date yet, do nothing.
// This job can be retried later after metadata extraction improves.
if (!asset.capture_ts_utc) {
return { ok: true, assetId: asset.id, skipped: "missing_capture_ts" };
}
// Never copy external archive originals by default.
if (asset.source_key.startsWith("originals/")) {
return { ok: true, assetId: asset.id, skipped: "external_archive" };
}
const ext = inferExtFromKey(asset.source_key);
const { y, m, d } = utcDateParts(new Date(asset.capture_ts_utc));
const canonicalKey = `canonical/originals/${y}/${pad2(m)}/${pad2(d)}/${asset.id}.${ext}`;
// Idempotency: if already canonicalized, don't redo work.
if (asset.canonical_key === canonicalKey && asset.active_key === canonicalKey) {
return { ok: true, assetId: asset.id, canonicalKey, already: true };
}
await s3.send(
new CopyObjectCommand({
Bucket: asset.bucket,
Key: canonicalKey,
CopySource: `${asset.bucket}/${asset.active_key}`,
MetadataDirective: "COPY",
}),
);
await db`
update assets
set canonical_key = ${canonicalKey}, active_key = ${canonicalKey}
where id = ${asset.id}
`;
return { ok: true, assetId: asset.id, canonicalKey };
}