Files
flynn/src/gateway/lane-queue.ts
T
William Valentin 6090508bad style: auto-fix ESLint issues (curly braces and formatting)
- Add curly braces to all if/else/for/while statements
- Fix indentation and trailing spaces
- Auto-fixed 372 linting errors using eslint --fix
- Remaining issues are warnings only (non-null assertions, explicit any types)
2026-02-11 10:30:24 -08:00

124 lines
3.3 KiB
TypeScript

/**
* LaneQueue — per-lane FIFO queue for serialising async work.
*
* Each "lane" (keyed by session ID) processes work items one at a time.
* If a lane is idle, work starts immediately. If it's busy, the work
* is queued and a promise is returned that resolves when it's this
* entry's turn to execute.
*
* Independent lanes run in parallel — only items within the same lane
* are serialised.
*/
interface QueueEntry<T = unknown> {
work: () => Promise<T>;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}
interface Lane {
active: boolean;
queue: QueueEntry[];
}
export class LaneQueue {
private lanes: Map<string, Lane> = new Map();
/**
* Enqueue a unit of work for the given lane.
* Returns a promise that resolves with the work's return value
* once it has been executed (which may be immediately if the lane is idle).
*/
async enqueue<T>(laneId: string, work: () => Promise<T>): Promise<T> {
let lane = this.lanes.get(laneId);
if (!lane) {
lane = { active: false, queue: [] };
this.lanes.set(laneId, lane);
}
// If nothing is running on this lane, execute immediately
if (!lane.active) {
lane.active = true;
try {
return await work();
} finally {
lane.active = false;
this.processNext(laneId);
}
}
// Otherwise, queue the work and return a deferred promise
return new Promise<T>((resolve, reject) => {
lane!.queue.push({
work: work as () => Promise<unknown>,
resolve: resolve as (value: unknown) => void,
reject,
});
});
}
/** Check if a lane currently has active work executing. */
isProcessing(laneId: string): boolean {
return this.lanes.get(laneId)?.active ?? false;
}
/** Get the number of pending (not yet started) items in a lane. */
queueLength(laneId: string): number {
return this.lanes.get(laneId)?.queue.length ?? 0;
}
/** Get the total number of pending items across all lanes. */
totalPending(): number {
let total = 0;
for (const lane of this.lanes.values()) {
total += lane.queue.length;
}
return total;
}
/**
* Cancel all pending entries in a lane.
* Active work is NOT interrupted — only queued items are rejected.
* Rejected promises receive an Error with message "Lane cancelled".
*/
cancel(laneId: string): void {
const lane = this.lanes.get(laneId);
if (!lane) {return;}
const pending = lane.queue.splice(0);
for (const entry of pending) {
entry.reject(new Error('Lane cancelled'));
}
// Clean up empty idle lanes
if (!lane.active && lane.queue.length === 0) {
this.lanes.delete(laneId);
}
}
/**
* Process the next queued entry for a lane (called after current work finishes).
* Runs asynchronously so the caller's finally block completes first.
*/
private processNext(laneId: string): void {
const lane = this.lanes.get(laneId);
if (!lane) {return;}
const entry = lane.queue.shift();
if (!entry) {
// Lane is empty — clean up
this.lanes.delete(laneId);
return;
}
lane.active = true;
entry.work()
.then((value) => entry.resolve(value))
.catch((err) => entry.reject(err))
.finally(() => {
lane.active = false;
this.processNext(laneId);
});
}
}