Files
flynn/src/gateway/lane-queue.ts
T
2026-02-16 12:04:33 -08:00

294 lines
8.3 KiB
TypeScript

/**
* LaneQueue — per-lane FIFO queue for serialising async work.
*
* Each "lane" (keyed by session ID) processes work items one at a time.
* If a lane is idle, work starts immediately. If it's busy, the work
* is queued and a promise is returned that resolves when it's this
* entry's turn to execute.
*
* Independent lanes run in parallel — only items within the same lane
* are serialised.
*/
interface QueueEntry<T = unknown> {
work: () => Promise<T>;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
policy: LaneQueueConfig;
metadata?: LaneQueueEnqueueMetadata;
}
interface Lane {
active: boolean;
queue: QueueEntry[];
debounceTimer?: ReturnType<typeof setTimeout>;
}
export type LaneQueueMode = 'collect' | 'followup' | 'steer' | 'steer_backlog' | 'interrupt';
export type LaneQueueOverflow = 'drop_old' | 'drop_new';
export interface LaneQueueConfig {
mode: LaneQueueMode;
cap: number;
overflow: LaneQueueOverflow;
debounceMs: number;
summarizeOverflow: boolean;
}
export interface LaneQueueEnqueueMetadata {
requestId?: string;
label?: string;
}
export interface LaneQueueEnqueueOptions {
policy?: Partial<LaneQueueConfig>;
metadata?: LaneQueueEnqueueMetadata;
}
export type LaneQueueRejectCode = 'superseded' | 'overflow' | 'cancelled';
export interface LaneQueueRejectDetails {
code: LaneQueueRejectCode;
laneId: string;
mode: LaneQueueMode;
overflow?: LaneQueueOverflow;
droppedCount?: number;
message: string;
}
export class LaneQueueRejectedError extends Error {
readonly details: LaneQueueRejectDetails;
constructor(details: LaneQueueRejectDetails) {
super(details.message);
this.name = 'LaneQueueRejectedError';
this.details = details;
}
}
export class LaneQueue {
private lanes: Map<string, Lane> = new Map();
private config: LaneQueueConfig;
constructor(config?: Partial<LaneQueueConfig>) {
this.config = {
mode: config?.mode ?? 'collect',
cap: Math.max(1, config?.cap ?? 50),
overflow: config?.overflow ?? 'drop_old',
debounceMs: Math.max(0, config?.debounceMs ?? 0),
summarizeOverflow: config?.summarizeOverflow ?? true,
};
}
/**
* Enqueue a unit of work for the given lane.
* Returns a promise that resolves with the work's return value
* once it has been executed (which may be immediately if the lane is idle).
*/
async enqueue<T>(
laneId: string,
work: () => Promise<T>,
policyOrOptions?: Partial<LaneQueueConfig> | LaneQueueEnqueueOptions,
): Promise<T> {
const options = this.normalizeEnqueueOptions(policyOrOptions);
const effective = this.resolvePolicy(options.policy);
let lane = this.lanes.get(laneId);
if (!lane) {
lane = { active: false, queue: [] };
this.lanes.set(laneId, lane);
}
// If nothing is running on this lane, execute immediately
if (!lane.active && !lane.debounceTimer) {
lane.active = true;
try {
return await work();
} finally {
lane.active = false;
this.processNext(laneId);
}
}
if (effective.mode === 'steer' || effective.mode === 'steer_backlog' || effective.mode === 'interrupt') {
this.rejectPending(laneId, lane, {
code: 'superseded',
laneId,
mode: effective.mode,
message: 'Superseded by newer request',
});
} else if (effective.mode === 'followup' && lane.queue.length > 0) {
this.rejectPending(laneId, lane, {
code: 'superseded',
laneId,
mode: effective.mode,
message: 'Superseded by newer follow-up request',
});
}
if (lane.queue.length >= effective.cap) {
if (effective.overflow === 'drop_new') {
return Promise.reject(
new LaneQueueRejectedError({
code: 'overflow',
laneId,
mode: effective.mode,
overflow: 'drop_new',
droppedCount: 1,
message: effective.summarizeOverflow
? `Lane queue full (drop_new): request rejected with ${lane.queue.length} pending`
: 'Lane queue full (drop_new)',
}),
);
}
// drop_old
const dropped = lane.queue.shift();
dropped?.reject(
new LaneQueueRejectedError({
code: 'overflow',
laneId,
mode: effective.mode,
overflow: 'drop_old',
droppedCount: 1,
message: effective.summarizeOverflow
? 'Lane queue overflow (drop_old): oldest pending request dropped'
: 'Lane queue overflow (drop_old)',
}),
);
}
// Otherwise, queue the work and return a deferred promise
return new Promise<T>((resolve, reject) => {
lane.queue.push({
work: work as () => Promise<unknown>,
resolve: resolve as (value: unknown) => void,
reject,
policy: effective,
metadata: options.metadata,
});
});
}
/** Check if a lane currently has active work executing. */
isProcessing(laneId: string): boolean {
const lane = this.lanes.get(laneId);
return (lane?.active ?? false) || Boolean(lane?.debounceTimer);
}
/** Get the number of pending (not yet started) items in a lane. */
queueLength(laneId: string): number {
return this.lanes.get(laneId)?.queue.length ?? 0;
}
/** Get the total number of pending items across all lanes. */
totalPending(): number {
let total = 0;
for (const lane of this.lanes.values()) {
total += lane.queue.length;
}
return total;
}
/**
* Cancel all pending entries in a lane.
* Active work is NOT interrupted — only queued items are rejected.
* Rejected promises receive an Error with message "Lane cancelled".
*/
cancel(laneId: string): void {
const lane = this.lanes.get(laneId);
if (!lane) {return;}
if (lane.debounceTimer) {
clearTimeout(lane.debounceTimer);
lane.debounceTimer = undefined;
}
this.rejectPending(laneId, lane, {
code: 'cancelled',
laneId,
mode: this.config.mode,
message: 'Lane cancelled',
});
// Clean up empty idle lanes
if (!lane.active && lane.queue.length === 0 && !lane.debounceTimer) {
this.lanes.delete(laneId);
}
}
private rejectPending(laneId: string, lane: Lane, details: LaneQueueRejectDetails): void {
const pending = lane.queue.splice(0);
for (const entry of pending) {
entry.reject(new LaneQueueRejectedError({ ...details, laneId, mode: entry.policy.mode }));
}
}
private resolvePolicy(policy?: Partial<LaneQueueConfig>): LaneQueueConfig {
return {
mode: policy?.mode ?? this.config.mode,
cap: Math.max(1, policy?.cap ?? this.config.cap),
overflow: policy?.overflow ?? this.config.overflow,
debounceMs: Math.max(0, policy?.debounceMs ?? this.config.debounceMs),
summarizeOverflow: policy?.summarizeOverflow ?? this.config.summarizeOverflow,
};
}
/**
* Process the next queued entry for a lane (called after current work finishes).
* Runs asynchronously so the caller's finally block completes first.
*/
private processNext(laneId: string, skipDebounce = false): void {
const lane = this.lanes.get(laneId);
if (!lane) {return;}
if (lane.active || lane.debounceTimer) {
return;
}
const next = lane.queue[0];
if (!next) {
this.lanes.delete(laneId);
return;
}
if (!skipDebounce && next.policy.debounceMs > 0) {
lane.debounceTimer = setTimeout(() => {
lane.debounceTimer = undefined;
this.processNext(laneId, true);
}, next.policy.debounceMs);
return;
}
const entry = lane.queue.shift();
if (!entry) {
// Lane is empty — clean up
this.lanes.delete(laneId);
return;
}
lane.active = true;
entry.work()
.then((value) => entry.resolve(value))
.catch((err) => entry.reject(err))
.finally(() => {
lane.active = false;
this.processNext(laneId);
});
}
private normalizeEnqueueOptions(
policyOrOptions?: Partial<LaneQueueConfig> | LaneQueueEnqueueOptions,
): LaneQueueEnqueueOptions {
if (!policyOrOptions) {
return {};
}
if ('policy' in policyOrOptions || 'metadata' in policyOrOptions) {
return policyOrOptions as LaneQueueEnqueueOptions;
}
return {
policy: policyOrOptions as Partial<LaneQueueConfig>,
};
}
}