feat: compute asset sha256 for dedupe
This commit is contained in:
11
apps/worker/src/hash-utils.ts
Normal file
11
apps/worker/src/hash-utils.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import { createHash } from "node:crypto";
|
||||
import { promises as fs } from "fs";
|
||||
|
||||
export async function computeFileSha256(filePath: string): Promise<string> {
|
||||
const hash = createHash("sha256");
|
||||
const content = await fs.readFile(filePath);
|
||||
|
||||
hash.update(content);
|
||||
|
||||
return hash.digest("hex");
|
||||
}
|
||||
@@ -34,6 +34,7 @@ import {
|
||||
} from "@tline/queue";
|
||||
|
||||
import { shouldTranscodeToMp4 } from "./transcode";
|
||||
import { computeFileSha256 } from "./hash-utils";
|
||||
|
||||
const allowedScanPrefixes = ["originals/"] as const;
|
||||
|
||||
@@ -244,6 +245,16 @@ async function upsertVariant(input: {
|
||||
`;
|
||||
}
|
||||
|
||||
async function upsertAssetHash(input: { assetId: string; bucket: string; sha256: string }) {
|
||||
const db = getDb();
|
||||
await db`
|
||||
insert into asset_hashes (asset_id, bucket, sha256)
|
||||
values (${input.assetId}, ${input.bucket}, ${input.sha256})
|
||||
on conflict (asset_id)
|
||||
do update set sha256 = excluded.sha256, bucket = excluded.bucket
|
||||
`;
|
||||
}
|
||||
|
||||
async function getObjectLastModified(input: { bucket: string; key: string }): Promise<Date | null> {
|
||||
const s3 = getMinioInternalClient();
|
||||
const res = await s3.send(new HeadObjectCommand({ Bucket: input.bucket, Key: input.key }));
|
||||
@@ -440,6 +451,9 @@ export async function handleProcessAsset(raw: unknown) {
|
||||
if (!getRes.Body) throw new Error("Empty response body from S3");
|
||||
await streamToFile(getRes.Body as Readable, inputPath);
|
||||
|
||||
const sha256 = await computeFileSha256(inputPath);
|
||||
await upsertAssetHash({ assetId: asset.id, bucket: asset.bucket, sha256 });
|
||||
|
||||
const updates: Record<string, unknown> = {
|
||||
capture_ts_utc: null,
|
||||
date_confidence: null,
|
||||
@@ -766,6 +780,9 @@ export async function handleTranscodeVideoMp4(raw: unknown) {
|
||||
if (!getRes.Body) throw new Error("Empty response body from S3");
|
||||
await streamToFile(getRes.Body as Readable, inputPath);
|
||||
|
||||
const sha256 = await computeFileSha256(inputPath);
|
||||
await upsertAssetHash({ assetId: asset.id, bucket: asset.bucket, sha256 });
|
||||
|
||||
const outputPath = join(tempDir, "mp4_720p.mp4");
|
||||
await runCommand("ffmpeg", [
|
||||
"-i",
|
||||
|
||||
12
packages/db/migrations/0007_asset_hashes.sql
Normal file
12
packages/db/migrations/0007_asset_hashes.sql
Normal file
@@ -0,0 +1,12 @@
|
||||
CREATE TABLE IF NOT EXISTS asset_hashes (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
asset_id uuid NOT NULL REFERENCES assets(id) ON DELETE CASCADE,
|
||||
bucket text NOT NULL,
|
||||
sha256 text NOT NULL,
|
||||
created_at timestamptz NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS asset_hashes_asset_id_idx ON asset_hashes(asset_id);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS asset_hashes_bucket_sha256_idx
|
||||
ON asset_hashes(bucket, sha256) WHERE sha256 IS NOT NULL;
|
||||
Reference in New Issue
Block a user