From a133afad06d9c0faf55a9ae12cc7589c0470f201 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 4 Feb 2026 19:32:16 -0800 Subject: [PATCH] feat: compute asset sha256 for dedupe --- apps/worker/src/hash-utils.ts | 11 ++++++++ apps/worker/src/jobs.ts | 29 ++++++++++++++++---- packages/db/migrations/0007_asset_hashes.sql | 12 ++++++++ 3 files changed, 46 insertions(+), 6 deletions(-) create mode 100644 apps/worker/src/hash-utils.ts create mode 100644 packages/db/migrations/0007_asset_hashes.sql diff --git a/apps/worker/src/hash-utils.ts b/apps/worker/src/hash-utils.ts new file mode 100644 index 0000000..f23d247 --- /dev/null +++ b/apps/worker/src/hash-utils.ts @@ -0,0 +1,11 @@ +import { createHash } from "node:crypto"; +import { promises as fs } from "fs"; + +export async function computeFileSha256(filePath: string): Promise { + const hash = createHash("sha256"); + const content = await fs.readFile(filePath); + + hash.update(content); + + return hash.digest("hex"); +} diff --git a/apps/worker/src/jobs.ts b/apps/worker/src/jobs.ts index 39e668a..c6e88ae 100644 --- a/apps/worker/src/jobs.ts +++ b/apps/worker/src/jobs.ts @@ -34,6 +34,7 @@ import { } from "@tline/queue"; import { shouldTranscodeToMp4 } from "./transcode"; +import { computeFileSha256 } from "./hash-utils"; const allowedScanPrefixes = ["originals/"] as const; @@ -244,6 +245,16 @@ async function upsertVariant(input: { `; } +async function upsertAssetHash(input: { assetId: string; bucket: string; sha256: string }) { + const db = getDb(); + await db` + insert into asset_hashes (asset_id, bucket, sha256) + values (${input.assetId}, ${input.bucket}, ${input.sha256}) + on conflict (asset_id) + do update set sha256 = excluded.sha256, bucket = excluded.bucket + `; +} + async function getObjectLastModified(input: { bucket: string; key: string }): Promise { const s3 = getMinioInternalClient(); const res = await s3.send(new HeadObjectCommand({ Bucket: input.bucket, Key: input.key })); @@ -437,10 +448,13 @@ export async function handleProcessAsset(raw: unknown) { Key: asset.active_key, }), ); - if (!getRes.Body) throw new Error("Empty response body from S3"); - await streamToFile(getRes.Body as Readable, inputPath); + if (!getRes.Body) throw new Error("Empty response body from S3"); + await streamToFile(getRes.Body as Readable, inputPath); - const updates: Record = { + const sha256 = await computeFileSha256(inputPath); + await upsertAssetHash({ assetId: asset.id, bucket: asset.bucket, sha256 }); + + const updates: Record = { capture_ts_utc: null, date_confidence: null, width: null, @@ -763,10 +777,13 @@ export async function handleTranscodeVideoMp4(raw: unknown) { Key: asset.active_key, }), ); - if (!getRes.Body) throw new Error("Empty response body from S3"); - await streamToFile(getRes.Body as Readable, inputPath); + if (!getRes.Body) throw new Error("Empty response body from S3"); + await streamToFile(getRes.Body as Readable, inputPath); - const outputPath = join(tempDir, "mp4_720p.mp4"); + const sha256 = await computeFileSha256(inputPath); + await upsertAssetHash({ assetId: asset.id, bucket: asset.bucket, sha256 }); + + const outputPath = join(tempDir, "mp4_720p.mp4"); await runCommand("ffmpeg", [ "-i", inputPath, diff --git a/packages/db/migrations/0007_asset_hashes.sql b/packages/db/migrations/0007_asset_hashes.sql new file mode 100644 index 0000000..317344e --- /dev/null +++ b/packages/db/migrations/0007_asset_hashes.sql @@ -0,0 +1,12 @@ +CREATE TABLE IF NOT EXISTS asset_hashes ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + asset_id uuid NOT NULL REFERENCES assets(id) ON DELETE CASCADE, + bucket text NOT NULL, + sha256 text NOT NULL, + created_at timestamptz NOT NULL DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS asset_hashes_asset_id_idx ON asset_hashes(asset_id); + +CREATE UNIQUE INDEX IF NOT EXISTS asset_hashes_bucket_sha256_idx +ON asset_hashes(bucket, sha256) WHERE sha256 IS NOT NULL;