294 lines
8.3 KiB
TypeScript
294 lines
8.3 KiB
TypeScript
/**
|
|
* LaneQueue — per-lane FIFO queue for serialising async work.
|
|
*
|
|
* Each "lane" (keyed by session ID) processes work items one at a time.
|
|
* If a lane is idle, work starts immediately. If it's busy, the work
|
|
* is queued and a promise is returned that resolves when it's this
|
|
* entry's turn to execute.
|
|
*
|
|
* Independent lanes run in parallel — only items within the same lane
|
|
* are serialised.
|
|
*/
|
|
|
|
interface QueueEntry<T = unknown> {
|
|
work: () => Promise<T>;
|
|
resolve: (value: T) => void;
|
|
reject: (reason: unknown) => void;
|
|
policy: LaneQueueConfig;
|
|
metadata?: LaneQueueEnqueueMetadata;
|
|
}
|
|
|
|
interface Lane {
|
|
active: boolean;
|
|
queue: QueueEntry[];
|
|
debounceTimer?: ReturnType<typeof setTimeout>;
|
|
}
|
|
|
|
export type LaneQueueMode = 'collect' | 'followup' | 'steer' | 'steer_backlog' | 'interrupt';
|
|
export type LaneQueueOverflow = 'drop_old' | 'drop_new';
|
|
|
|
export interface LaneQueueConfig {
|
|
mode: LaneQueueMode;
|
|
cap: number;
|
|
overflow: LaneQueueOverflow;
|
|
debounceMs: number;
|
|
summarizeOverflow: boolean;
|
|
}
|
|
|
|
export interface LaneQueueEnqueueMetadata {
|
|
requestId?: string;
|
|
label?: string;
|
|
}
|
|
|
|
export interface LaneQueueEnqueueOptions {
|
|
policy?: Partial<LaneQueueConfig>;
|
|
metadata?: LaneQueueEnqueueMetadata;
|
|
}
|
|
|
|
export type LaneQueueRejectCode = 'superseded' | 'overflow' | 'cancelled';
|
|
|
|
export interface LaneQueueRejectDetails {
|
|
code: LaneQueueRejectCode;
|
|
laneId: string;
|
|
mode: LaneQueueMode;
|
|
overflow?: LaneQueueOverflow;
|
|
droppedCount?: number;
|
|
message: string;
|
|
}
|
|
|
|
export class LaneQueueRejectedError extends Error {
|
|
readonly details: LaneQueueRejectDetails;
|
|
|
|
constructor(details: LaneQueueRejectDetails) {
|
|
super(details.message);
|
|
this.name = 'LaneQueueRejectedError';
|
|
this.details = details;
|
|
}
|
|
}
|
|
|
|
export class LaneQueue {
|
|
private lanes: Map<string, Lane> = new Map();
|
|
private config: LaneQueueConfig;
|
|
|
|
constructor(config?: Partial<LaneQueueConfig>) {
|
|
this.config = {
|
|
mode: config?.mode ?? 'collect',
|
|
cap: Math.max(1, config?.cap ?? 50),
|
|
overflow: config?.overflow ?? 'drop_old',
|
|
debounceMs: Math.max(0, config?.debounceMs ?? 0),
|
|
summarizeOverflow: config?.summarizeOverflow ?? true,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Enqueue a unit of work for the given lane.
|
|
* Returns a promise that resolves with the work's return value
|
|
* once it has been executed (which may be immediately if the lane is idle).
|
|
*/
|
|
async enqueue<T>(
|
|
laneId: string,
|
|
work: () => Promise<T>,
|
|
policyOrOptions?: Partial<LaneQueueConfig> | LaneQueueEnqueueOptions,
|
|
): Promise<T> {
|
|
const options = this.normalizeEnqueueOptions(policyOrOptions);
|
|
const effective = this.resolvePolicy(options.policy);
|
|
let lane = this.lanes.get(laneId);
|
|
if (!lane) {
|
|
lane = { active: false, queue: [] };
|
|
this.lanes.set(laneId, lane);
|
|
}
|
|
|
|
// If nothing is running on this lane, execute immediately
|
|
if (!lane.active && !lane.debounceTimer) {
|
|
lane.active = true;
|
|
try {
|
|
return await work();
|
|
} finally {
|
|
lane.active = false;
|
|
this.processNext(laneId);
|
|
}
|
|
}
|
|
|
|
if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') {
|
|
this.rejectPending(laneId, lane, {
|
|
code: 'superseded',
|
|
laneId,
|
|
mode: effective.mode,
|
|
message: 'Superseded by newer request',
|
|
});
|
|
} else if (effective.mode === 'followup' && lane.queue.length > 0) {
|
|
this.rejectPending(laneId, lane, {
|
|
code: 'superseded',
|
|
laneId,
|
|
mode: effective.mode,
|
|
message: 'Superseded by newer follow-up request',
|
|
});
|
|
}
|
|
|
|
if (lane.queue.length >= effective.cap) {
|
|
if (effective.overflow === 'drop_new') {
|
|
return Promise.reject(
|
|
new LaneQueueRejectedError({
|
|
code: 'overflow',
|
|
laneId,
|
|
mode: effective.mode,
|
|
overflow: 'drop_new',
|
|
droppedCount: 1,
|
|
message: effective.summarizeOverflow
|
|
? `Lane queue full (drop_new): request rejected with ${lane.queue.length} pending`
|
|
: 'Lane queue full (drop_new)',
|
|
}),
|
|
);
|
|
}
|
|
// drop_old
|
|
const dropped = lane.queue.shift();
|
|
dropped?.reject(
|
|
new LaneQueueRejectedError({
|
|
code: 'overflow',
|
|
laneId,
|
|
mode: effective.mode,
|
|
overflow: 'drop_old',
|
|
droppedCount: 1,
|
|
message: effective.summarizeOverflow
|
|
? 'Lane queue overflow (drop_old): oldest pending request dropped'
|
|
: 'Lane queue overflow (drop_old)',
|
|
}),
|
|
);
|
|
}
|
|
|
|
// Otherwise, queue the work and return a deferred promise
|
|
return new Promise<T>((resolve, reject) => {
|
|
lane.queue.push({
|
|
work: work as () => Promise<unknown>,
|
|
resolve: resolve as (value: unknown) => void,
|
|
reject,
|
|
policy: effective,
|
|
metadata: options.metadata,
|
|
});
|
|
});
|
|
}
|
|
|
|
/** Check if a lane currently has active work executing. */
|
|
isProcessing(laneId: string): boolean {
|
|
const lane = this.lanes.get(laneId);
|
|
return (lane?.active ?? false) || Boolean(lane?.debounceTimer);
|
|
}
|
|
|
|
/** Get the number of pending (not yet started) items in a lane. */
|
|
queueLength(laneId: string): number {
|
|
return this.lanes.get(laneId)?.queue.length ?? 0;
|
|
}
|
|
|
|
/** Get the total number of pending items across all lanes. */
|
|
totalPending(): number {
|
|
let total = 0;
|
|
for (const lane of this.lanes.values()) {
|
|
total += lane.queue.length;
|
|
}
|
|
return total;
|
|
}
|
|
|
|
/**
|
|
* Cancel all pending entries in a lane.
|
|
* Active work is NOT interrupted — only queued items are rejected.
|
|
* Rejected promises receive an Error with message "Lane cancelled".
|
|
*/
|
|
cancel(laneId: string): void {
|
|
const lane = this.lanes.get(laneId);
|
|
if (!lane) {return;}
|
|
|
|
if (lane.debounceTimer) {
|
|
clearTimeout(lane.debounceTimer);
|
|
lane.debounceTimer = undefined;
|
|
}
|
|
|
|
this.rejectPending(laneId, lane, {
|
|
code: 'cancelled',
|
|
laneId,
|
|
mode: this.config.mode,
|
|
message: 'Lane cancelled',
|
|
});
|
|
|
|
// Clean up empty idle lanes
|
|
if (!lane.active && lane.queue.length === 0 && !lane.debounceTimer) {
|
|
this.lanes.delete(laneId);
|
|
}
|
|
}
|
|
|
|
private rejectPending(laneId: string, lane: Lane, details: LaneQueueRejectDetails): void {
|
|
const pending = lane.queue.splice(0);
|
|
for (const entry of pending) {
|
|
entry.reject(new LaneQueueRejectedError({ ...details, laneId, mode: entry.policy.mode }));
|
|
}
|
|
}
|
|
|
|
private resolvePolicy(policy?: Partial<LaneQueueConfig>): LaneQueueConfig {
|
|
return {
|
|
mode: policy?.mode ?? this.config.mode,
|
|
cap: Math.max(1, policy?.cap ?? this.config.cap),
|
|
overflow: policy?.overflow ?? this.config.overflow,
|
|
debounceMs: Math.max(0, policy?.debounceMs ?? this.config.debounceMs),
|
|
summarizeOverflow: policy?.summarizeOverflow ?? this.config.summarizeOverflow,
|
|
};
|
|
}
|
|
|
|
/**
|
|
* Process the next queued entry for a lane (called after current work finishes).
|
|
* Runs asynchronously so the caller's finally block completes first.
|
|
*/
|
|
private processNext(laneId: string, skipDebounce = false): void {
|
|
const lane = this.lanes.get(laneId);
|
|
if (!lane) {return;}
|
|
|
|
if (lane.active || lane.debounceTimer) {
|
|
return;
|
|
}
|
|
|
|
const next = lane.queue[0];
|
|
if (!next) {
|
|
this.lanes.delete(laneId);
|
|
return;
|
|
}
|
|
|
|
if (!skipDebounce && next.policy.debounceMs > 0) {
|
|
lane.debounceTimer = setTimeout(() => {
|
|
lane.debounceTimer = undefined;
|
|
this.processNext(laneId, true);
|
|
}, next.policy.debounceMs);
|
|
return;
|
|
}
|
|
|
|
const entry = lane.queue.shift();
|
|
if (!entry) {
|
|
// Lane is empty — clean up
|
|
this.lanes.delete(laneId);
|
|
return;
|
|
}
|
|
|
|
lane.active = true;
|
|
entry.work()
|
|
.then((value) => entry.resolve(value))
|
|
.catch((err) => entry.reject(err))
|
|
.finally(() => {
|
|
lane.active = false;
|
|
this.processNext(laneId);
|
|
});
|
|
}
|
|
|
|
private normalizeEnqueueOptions(
|
|
policyOrOptions?: Partial<LaneQueueConfig> | LaneQueueEnqueueOptions,
|
|
): LaneQueueEnqueueOptions {
|
|
if (!policyOrOptions) {
|
|
return {};
|
|
}
|
|
|
|
if ('policy' in policyOrOptions || 'metadata' in policyOrOptions) {
|
|
return policyOrOptions as LaneQueueEnqueueOptions;
|
|
}
|
|
|
|
return {
|
|
policy: policyOrOptions as Partial<LaneQueueConfig>,
|
|
};
|
|
}
|
|
}
|