Files
flynn/src/automation/cron.ts
T
2026-02-18 10:14:45 -08:00

225 lines
6.5 KiB
TypeScript

import { Cron } from 'croner';
import type { CronJobConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import { auditLogger } from '../audit/index.js';
import { randomUUID } from 'crypto';
/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
type DeliveryMode = 'shared_session' | 'isolated_job' | 'announce';
export class CronScheduler implements ChannelAdapter {
readonly name = 'cron';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private cronInstances: Map<string, Cron> = new Map();
private jobs: Map<string, CronJobConfig> = new Map();
private lastTriggeredLocalDateByJob: Map<string, string> = new Map();
constructor(
private readonly jobConfigs: CronJobConfig[],
private readonly channelLookup: ChannelLookup,
private readonly deliveryMode: DeliveryMode = 'shared_session',
) {
for (const job of jobConfigs) {
this.jobs.set(job.name, job);
}
}
get status(): ChannelStatus {
return this._status;
}
async connect(): Promise<void> {
this._status = 'connected';
for (const job of this.jobConfigs) {
if (!job.enabled) {continue;}
const cronInstance = new Cron(job.schedule, {
timezone: job.timezone,
paused: false,
}, () => {
this.triggerJob(job.name);
});
this.cronInstances.set(job.name, cronInstance);
}
const enabledCount = this.jobConfigs.filter(j => j.enabled).length;
if (enabledCount > 0) {
console.log(`CronScheduler: ${enabledCount} job(s) scheduled`);
auditLogger?.systemStart('CronScheduler', { jobs_enabled: enabledCount });
}
}
async disconnect(): Promise<void> {
for (const [, cron] of this.cronInstances) {
cron.stop();
}
this.cronInstances.clear();
this._status = 'disconnected';
auditLogger?.systemStop('CronScheduler');
}
async send(peerId: string, message: OutboundMessage): Promise<void> {
// peerId is the cron job name — look up its output config
const job = this.jobs.get(peerId);
if (!job) {
console.warn(`No cron job found for '${peerId}'`);
return;
}
const outputAdapter = this.channelLookup.get(job.output.channel);
if (!outputAdapter) {
console.warn(`Output channel '${job.output.channel}' not found for cron job '${peerId}'`);
return;
}
await outputAdapter.send(job.output.peer, message);
}
onMessage(handler: (msg: InboundMessage) => void): void {
this.messageHandler = handler;
}
/** Manually trigger a job (also called by cron on schedule). */
triggerJob(jobName: string): void {
const job = this.jobs.get(jobName);
if (!job) {return;}
if (job.once_per_local_day) {
const dayKey = this.localDayKey(Date.now(), job.timezone);
const lastDayKey = this.lastTriggeredLocalDateByJob.get(jobName);
if (lastDayKey === dayKey) {
return;
}
this.lastTriggeredLocalDateByJob.set(jobName, dayKey);
}
const runId = `run-${randomUUID()}`;
const senderId = this.deliveryMode === 'shared_session'
? jobName
: this.deliveryMode === 'announce'
? `${jobName}:announce:${runId}`
: `${jobName}:${runId}`;
const msg: InboundMessage = {
id: `cron-${jobName}-${Date.now()}`,
channel: 'cron',
senderId,
senderName: `cron:${jobName}`,
text: job.message,
timestamp: Date.now(),
metadata: {
cronJob: jobName,
scheduled: true,
modelTier: job.model_tier,
deliveryMode: this.deliveryMode,
announce: this.deliveryMode === 'announce',
runId,
replyPeerId: jobName,
},
};
auditLogger?.cronTrigger({
job_name: jobName,
schedule: job.schedule,
message: job.message,
output_channel: job.output.channel,
output_peer: job.output.peer,
});
this.messageHandler?.(msg);
}
/** Get list of all job names (enabled and disabled). */
getJobNames(): string[] {
return Array.from(this.jobs.keys());
}
/** Get a job's config by name. */
getJob(name: string): CronJobConfig | undefined {
return this.jobs.get(name);
}
/**
* Add a new cron job at runtime and start it immediately.
* The job is ephemeral — it persists only until the daemon restarts.
* Returns true if the job was created, false if a job with that name already exists.
*/
addJob(config: CronJobConfig): boolean {
if (this.jobs.has(config.name)) {
return false;
}
this.jobs.set(config.name, config);
auditLogger?.cronAdd(config.name, config.schedule);
if (config.enabled && this._status === 'connected') {
const cronInstance = new Cron(config.schedule, {
timezone: config.timezone,
paused: false,
}, () => {
this.triggerJob(config.name);
});
this.cronInstances.set(config.name, cronInstance);
}
return true;
}
/**
* Remove a cron job by name. Stops the cron instance if running.
* Returns true if the job was found and removed, false otherwise.
*/
removeJob(name: string): boolean {
if (!this.jobs.has(name)) {
return false;
}
const cronInstance = this.cronInstances.get(name);
if (cronInstance) {
cronInstance.stop();
this.cronInstances.delete(name);
}
this.jobs.delete(name);
this.lastTriggeredLocalDateByJob.delete(name);
auditLogger?.cronRemove(name);
return true;
}
private localDayKey(timestamp: number, timezone?: string): string {
const date = new Date(timestamp);
const format = (timeZoneValue?: string): string => {
const parts = new Intl.DateTimeFormat('en-US', {
timeZone: timeZoneValue,
year: 'numeric',
month: '2-digit',
day: '2-digit',
}).formatToParts(date);
const year = parts.find((part) => part.type === 'year')?.value ?? '0000';
const month = parts.find((part) => part.type === 'month')?.value ?? '00';
const day = parts.find((part) => part.type === 'day')?.value ?? '00';
return `${year}-${month}-${day}`;
};
if (!timezone) {
return format();
}
try {
return format(timezone);
} catch {
console.warn(`CronScheduler: invalid timezone '${timezone}' for job local-day dedupe; falling back to system timezone`);
return format();
}
}
}