Initial commit
This commit is contained in:
17
packages/queue/package.json
Normal file
17
packages/queue/package.json
Normal file
@@ -0,0 +1,17 @@
|
||||
{
|
||||
"name": "@tline/queue",
|
||||
"version": "0.0.0",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"exports": {
|
||||
".": {
|
||||
"types": "./src/index.ts",
|
||||
"default": "./src/index.ts"
|
||||
}
|
||||
},
|
||||
"dependencies": {
|
||||
"bullmq": "^5.61.0",
|
||||
"ioredis": "^5.8.0",
|
||||
"zod": "^4.2.1"
|
||||
}
|
||||
}
|
||||
128
packages/queue/src/index.ts
Normal file
128
packages/queue/src/index.ts
Normal file
@@ -0,0 +1,128 @@
|
||||
import { z } from "zod";
|
||||
|
||||
import { Queue } from "bullmq";
|
||||
import IORedis from "ioredis";
|
||||
|
||||
const envSchema = z.object({
|
||||
REDIS_URL: z.string().min(1).default("redis://localhost:6379"),
|
||||
QUEUE_NAME: z.string().min(1).default("tline")
|
||||
});
|
||||
|
||||
export const jobNameSchema = z.enum([
|
||||
"scan_minio_prefix",
|
||||
"process_asset",
|
||||
"copy_to_canonical"
|
||||
]);
|
||||
|
||||
export type QueueJobName = z.infer<typeof jobNameSchema>;
|
||||
|
||||
export const scanMinioPrefixPayloadSchema = z
|
||||
.object({
|
||||
importId: z.string().uuid(),
|
||||
bucket: z.string().min(1),
|
||||
prefix: z.string().min(1)
|
||||
})
|
||||
.strict();
|
||||
|
||||
export const processAssetPayloadSchema = z
|
||||
.object({
|
||||
assetId: z.string().uuid()
|
||||
})
|
||||
.strict();
|
||||
|
||||
export const copyToCanonicalPayloadSchema = z
|
||||
.object({
|
||||
assetId: z.string().uuid()
|
||||
})
|
||||
.strict();
|
||||
|
||||
export const payloadByJobNameSchema = z.discriminatedUnion("name", [
|
||||
z.object({ name: z.literal("scan_minio_prefix"), payload: scanMinioPrefixPayloadSchema }),
|
||||
z.object({ name: z.literal("process_asset"), payload: processAssetPayloadSchema }),
|
||||
z.object({ name: z.literal("copy_to_canonical"), payload: copyToCanonicalPayloadSchema })
|
||||
]);
|
||||
|
||||
export type ScanMinioPrefixPayload = z.infer<typeof scanMinioPrefixPayloadSchema>;
|
||||
export type ProcessAssetPayload = z.infer<typeof processAssetPayloadSchema>;
|
||||
export type CopyToCanonicalPayload = z.infer<typeof copyToCanonicalPayloadSchema>;
|
||||
|
||||
type QueueEnv = z.infer<typeof envSchema>;
|
||||
|
||||
let cachedEnv: QueueEnv | undefined;
|
||||
let cachedRedis: IORedis | undefined;
|
||||
let cachedQueue: Queue | undefined;
|
||||
|
||||
export function getQueueEnv(): QueueEnv {
|
||||
if (cachedEnv) return cachedEnv;
|
||||
|
||||
const parsed = envSchema.safeParse(process.env);
|
||||
if (!parsed.success) {
|
||||
throw new Error(`Invalid queue env: ${parsed.error.message}`);
|
||||
}
|
||||
|
||||
cachedEnv = parsed.data;
|
||||
return cachedEnv;
|
||||
}
|
||||
|
||||
export function getQueueName() {
|
||||
return getQueueEnv().QUEUE_NAME;
|
||||
}
|
||||
|
||||
export function getRedis() {
|
||||
if (cachedRedis) return cachedRedis;
|
||||
const env = getQueueEnv();
|
||||
cachedRedis = new IORedis(env.REDIS_URL, {
|
||||
lazyConnect: true,
|
||||
maxRetriesPerRequest: null
|
||||
});
|
||||
|
||||
cachedRedis.on("error", () => {});
|
||||
|
||||
return cachedRedis;
|
||||
}
|
||||
|
||||
export function getQueue() {
|
||||
if (cachedQueue) return cachedQueue;
|
||||
getQueueEnv();
|
||||
|
||||
cachedQueue = new Queue(getQueueName(), {
|
||||
connection: getRedis()
|
||||
});
|
||||
return cachedQueue;
|
||||
}
|
||||
|
||||
export async function closeQueue() {
|
||||
await Promise.all([
|
||||
cachedQueue?.close(),
|
||||
cachedRedis?.quit().catch(() => cachedRedis?.disconnect())
|
||||
]);
|
||||
cachedQueue = undefined;
|
||||
cachedRedis = undefined;
|
||||
}
|
||||
|
||||
export async function enqueueScanMinioPrefix(input: ScanMinioPrefixPayload) {
|
||||
const payload = scanMinioPrefixPayloadSchema.parse(input);
|
||||
const queue = getQueue();
|
||||
return queue.add("scan_minio_prefix", payload, {
|
||||
attempts: 3,
|
||||
backoff: { type: "exponential", delay: 1000 }
|
||||
});
|
||||
}
|
||||
|
||||
export async function enqueueProcessAsset(input: ProcessAssetPayload) {
|
||||
const payload = processAssetPayloadSchema.parse(input);
|
||||
const queue = getQueue();
|
||||
return queue.add("process_asset", payload, {
|
||||
attempts: 3,
|
||||
backoff: { type: "exponential", delay: 1000 }
|
||||
});
|
||||
}
|
||||
|
||||
export async function enqueueCopyToCanonical(input: CopyToCanonicalPayload) {
|
||||
const payload = copyToCanonicalPayloadSchema.parse(input);
|
||||
const queue = getQueue();
|
||||
return queue.add("copy_to_canonical", payload, {
|
||||
attempts: 3,
|
||||
backoff: { type: "exponential", delay: 1000 }
|
||||
});
|
||||
}
|
||||
7
packages/queue/tsconfig.json
Normal file
7
packages/queue/tsconfig.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"extends": "../../tsconfig.base.json",
|
||||
"compilerOptions": {
|
||||
"types": ["bun-types"]
|
||||
},
|
||||
"include": ["src/**/*.ts"]
|
||||
}
|
||||
Reference in New Issue
Block a user