340 lines
11 KiB
TypeScript
340 lines
11 KiB
TypeScript
import { describe, it, expect } from 'vitest';
|
|
import { LaneQueue, LaneQueueRejectedError } from './lane-queue.js';
|
|
|
|
describe('LaneQueue', () => {
|
|
it('executes a single item immediately', async () => {
|
|
const queue = new LaneQueue();
|
|
const result = await queue.enqueue('lane-a', async () => 42);
|
|
expect(result).toBe(42);
|
|
});
|
|
|
|
it('serialises items within the same lane', async () => {
|
|
const queue = new LaneQueue();
|
|
const order: number[] = [];
|
|
|
|
// Create a deferred to control timing
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
order.push(1);
|
|
await firstBlocks;
|
|
order.push(2);
|
|
return 'first';
|
|
});
|
|
|
|
const p2 = queue.enqueue('lane-a', async () => {
|
|
order.push(3);
|
|
return 'second';
|
|
});
|
|
|
|
const p3 = queue.enqueue('lane-a', async () => {
|
|
order.push(4);
|
|
return 'third';
|
|
});
|
|
|
|
// Only item 1 should have started
|
|
expect(order).toEqual([1]);
|
|
expect(queue.queueLength('lane-a')).toBe(2);
|
|
expect(queue.isProcessing('lane-a')).toBe(true);
|
|
|
|
// Release the first item
|
|
resolveFirst();
|
|
const results = await Promise.all([p1, p2, p3]);
|
|
|
|
expect(results).toEqual(['first', 'second', 'third']);
|
|
expect(order).toEqual([1, 2, 3, 4]);
|
|
});
|
|
|
|
it('runs independent lanes in parallel', async () => {
|
|
const queue = new LaneQueue();
|
|
const running: string[] = [];
|
|
|
|
let resolveA!: () => void;
|
|
const blocksA = new Promise<void>((r) => { resolveA = r; });
|
|
let resolveB!: () => void;
|
|
const blocksB = new Promise<void>((r) => { resolveB = r; });
|
|
|
|
const pA = queue.enqueue('lane-a', async () => {
|
|
running.push('a-start');
|
|
await blocksA;
|
|
running.push('a-end');
|
|
return 'A';
|
|
});
|
|
|
|
const pB = queue.enqueue('lane-b', async () => {
|
|
running.push('b-start');
|
|
await blocksB;
|
|
running.push('b-end');
|
|
return 'B';
|
|
});
|
|
|
|
// Both should have started concurrently
|
|
// Wait a tick for async execution
|
|
await new Promise<void>((r) => queueMicrotask(r));
|
|
expect(running).toContain('a-start');
|
|
expect(running).toContain('b-start');
|
|
|
|
resolveA();
|
|
resolveB();
|
|
const [rA, rB] = await Promise.all([pA, pB]);
|
|
|
|
expect(rA).toBe('A');
|
|
expect(rB).toBe('B');
|
|
});
|
|
|
|
it('error in one item does not block the next', async () => {
|
|
const queue = new LaneQueue();
|
|
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
throw new Error('boom');
|
|
});
|
|
|
|
const p2 = queue.enqueue('lane-a', async () => 'recovered');
|
|
|
|
resolveFirst();
|
|
|
|
await expect(p1).rejects.toThrow('boom');
|
|
const result = await p2;
|
|
expect(result).toBe('recovered');
|
|
});
|
|
|
|
it('cancel rejects pending items but does not affect active', async () => {
|
|
const queue = new LaneQueue();
|
|
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
|
|
const p2 = queue.enqueue('lane-a', async () => 'pending-1');
|
|
const p3 = queue.enqueue('lane-a', async () => 'pending-2');
|
|
|
|
expect(queue.queueLength('lane-a')).toBe(2);
|
|
|
|
// Cancel pending items
|
|
queue.cancel('lane-a');
|
|
expect(queue.queueLength('lane-a')).toBe(0);
|
|
|
|
// Active work should still complete
|
|
resolveFirst();
|
|
const result = await p1;
|
|
expect(result).toBe('active');
|
|
|
|
// Pending items should have been rejected
|
|
await expect(p2).rejects.toThrow('Lane cancelled');
|
|
await expect(p3).rejects.toThrow('Lane cancelled');
|
|
});
|
|
|
|
it('reports queue length correctly', async () => {
|
|
const queue = new LaneQueue();
|
|
|
|
expect(queue.queueLength('lane-a')).toBe(0);
|
|
expect(queue.isProcessing('lane-a')).toBe(false);
|
|
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'done';
|
|
});
|
|
|
|
// Active work, no pending
|
|
expect(queue.isProcessing('lane-a')).toBe(true);
|
|
expect(queue.queueLength('lane-a')).toBe(0);
|
|
|
|
const p2 = queue.enqueue('lane-a', async () => 'queued-1');
|
|
expect(queue.queueLength('lane-a')).toBe(1);
|
|
|
|
const p3 = queue.enqueue('lane-a', async () => 'queued-2');
|
|
expect(queue.queueLength('lane-a')).toBe(2);
|
|
|
|
resolveFirst();
|
|
await Promise.all([p1, p2, p3]);
|
|
|
|
// After all done, lane should be cleaned up
|
|
expect(queue.isProcessing('lane-a')).toBe(false);
|
|
expect(queue.queueLength('lane-a')).toBe(0);
|
|
});
|
|
|
|
it('cleans up empty lanes after completion', async () => {
|
|
const queue = new LaneQueue();
|
|
|
|
await queue.enqueue('lane-a', async () => 'done');
|
|
|
|
// Lane should be cleaned up (isProcessing returns false, queueLength 0)
|
|
expect(queue.isProcessing('lane-a')).toBe(false);
|
|
expect(queue.queueLength('lane-a')).toBe(0);
|
|
});
|
|
|
|
it('cancel on non-existent lane is a no-op', () => {
|
|
const queue = new LaneQueue();
|
|
// Should not throw
|
|
queue.cancel('no-such-lane');
|
|
expect(queue.queueLength('no-such-lane')).toBe(0);
|
|
});
|
|
|
|
it('can enqueue new work after a lane completes', async () => {
|
|
const queue = new LaneQueue();
|
|
|
|
const r1 = await queue.enqueue('lane-a', async () => 'first');
|
|
expect(r1).toBe('first');
|
|
|
|
const r2 = await queue.enqueue('lane-a', async () => 'second');
|
|
expect(r2).toBe('second');
|
|
});
|
|
|
|
it('followup mode keeps only the most recent pending request', async () => {
|
|
const queue = new LaneQueue({ mode: 'followup' });
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
|
|
const p3 = queue.enqueue('lane-a', async () => 'latest-pending');
|
|
|
|
await expect(p2).rejects.toThrow('Superseded by newer follow-up request');
|
|
resolveFirst();
|
|
|
|
await expect(p1).resolves.toBe('active');
|
|
await expect(p3).resolves.toBe('latest-pending');
|
|
});
|
|
|
|
it('steer_backlog mode replaces existing pending backlog with latest request', async () => {
|
|
const queue = new LaneQueue({ mode: 'steer_backlog' });
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
|
|
const p3 = queue.enqueue('lane-a', async () => 'new-pending');
|
|
|
|
await expect(p2).rejects.toThrow('Superseded by newer request');
|
|
resolveFirst();
|
|
|
|
await expect(p1).resolves.toBe('active');
|
|
await expect(p3).resolves.toBe('new-pending');
|
|
});
|
|
|
|
it('drop_new overflow rejects newest request when cap is reached', async () => {
|
|
const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' });
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
const p2 = queue.enqueue('lane-a', async () => 'pending-1');
|
|
const p3 = queue.enqueue('lane-a', async () => 'pending-2');
|
|
|
|
await expect(p3).rejects.toThrow('Lane queue full (drop_new): request rejected with 1 pending');
|
|
resolveFirst();
|
|
|
|
await expect(p1).resolves.toBe('active');
|
|
await expect(p2).resolves.toBe('pending-1');
|
|
});
|
|
|
|
it('drop_old overflow evicts oldest pending request when cap is reached', async () => {
|
|
const queue = new LaneQueue({ cap: 1, overflow: 'drop_old' });
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
const p2 = queue.enqueue('lane-a', async () => 'old-pending');
|
|
const p3 = queue.enqueue('lane-a', async () => 'new-pending');
|
|
|
|
await expect(p2).rejects.toThrow('Lane queue overflow (drop_old): oldest pending request dropped');
|
|
resolveFirst();
|
|
|
|
await expect(p1).resolves.toBe('active');
|
|
await expect(p3).resolves.toBe('new-pending');
|
|
});
|
|
|
|
it('supports per-enqueue policy overrides', async () => {
|
|
const queue = new LaneQueue({ mode: 'collect', cap: 10, overflow: 'drop_old' });
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
const p2 = queue.enqueue('lane-a', async () => 'old-pending', { mode: 'steer' });
|
|
const p3 = queue.enqueue('lane-a', async () => 'latest-pending', { mode: 'steer' });
|
|
|
|
await expect(p2).rejects.toThrow('Superseded by newer request');
|
|
resolveFirst();
|
|
|
|
await expect(p1).resolves.toBe('active');
|
|
await expect(p3).resolves.toBe('latest-pending');
|
|
});
|
|
|
|
it('applies debounce before starting queued work', async () => {
|
|
const queue = new LaneQueue({ debounceMs: 25 });
|
|
const events: string[] = [];
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
events.push('active:start');
|
|
await firstBlocks;
|
|
events.push('active:end');
|
|
return 'active';
|
|
});
|
|
|
|
const p2 = queue.enqueue('lane-a', async () => {
|
|
events.push('next:start');
|
|
return 'next';
|
|
});
|
|
|
|
resolveFirst();
|
|
await p1;
|
|
expect(queue.isProcessing('lane-a')).toBe(true);
|
|
expect(events).toEqual(['active:start', 'active:end']);
|
|
await new Promise((r) => setTimeout(r, 40));
|
|
await expect(p2).resolves.toBe('next');
|
|
expect(events).toEqual(['active:start', 'active:end', 'next:start']);
|
|
});
|
|
|
|
it('returns structured queue rejection errors', async () => {
|
|
const queue = new LaneQueue({ cap: 1, overflow: 'drop_new' });
|
|
let resolveFirst!: () => void;
|
|
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
|
|
|
|
const p1 = queue.enqueue('lane-a', async () => {
|
|
await firstBlocks;
|
|
return 'active';
|
|
});
|
|
const p2 = queue.enqueue('lane-a', async () => 'pending');
|
|
const p3 = queue.enqueue('lane-a', async () => 'dropped');
|
|
|
|
const err = await p3.catch((e) => e) as LaneQueueRejectedError;
|
|
expect(err).toBeInstanceOf(LaneQueueRejectedError);
|
|
expect(err.details.code).toBe('overflow');
|
|
expect(err.details.overflow).toBe('drop_new');
|
|
expect(err.details.laneId).toBe('lane-a');
|
|
|
|
resolveFirst();
|
|
await expect(p1).resolves.toBe('active');
|
|
await expect(p2).resolves.toBe('pending');
|
|
});
|
|
});
|