Files
flynn/src/daemon/index.ts
T
William Valentin 35a0061de9 feat(01-02): extract channel adapter registration into src/daemon/channels.ts
- Move Telegram, Discord, Slack, WhatsApp, WebChat adapter setup to channels.ts
- Move CronScheduler, WebhookHandler, GmailWatcher registration to channels.ts
- Clean up index.ts imports (remove unused adapter value imports)
- index.ts calls registerChannels() and receives cronScheduler for tool wiring
2026-02-09 20:14:23 -08:00

386 lines
14 KiB
TypeScript

import { Lifecycle } from './lifecycle.js';
import { createModelRouter } from './models.js';
import { initMemory } from './memory.js';
import { createMessageRouter } from './routing.js';
import { initAgents } from './agents.js';
import { initTools } from './tools.js';
import { registerChannels } from './channels.js';
import type { Config } from '../config/index.js';
import type { AudioTranscriptionConfig } from '../models/media.js';
import { ModelRouter } from '../models/index.js';
import { AgentOrchestrator } from '../backends/index.js';
import { OutboundAttachmentCollector } from '../backends/native/attachments.js';
import { SessionStore, SessionManager, parseDuration } from '../session/index.js';
import { HookEngine } from '../hooks/index.js';
import type { ToolRegistry, ToolExecutor, BrowserManager } from '../tools/index.js';
import { createSessionTools, createAgentsListTool, createMessageSendTool, createCronTools } from '../tools/index.js';
import { GatewayServer } from '../gateway/index.js';
import { ChannelRegistry, PairingManager } from '../channels/index.js';
import type { CronScheduler } from '../automation/index.js';
import { HeartbeatMonitor } from '../automation/index.js';
import { McpManager } from '../mcp/index.js';
import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js';
import { assembleSystemPrompt } from '../prompt/index.js';
import type { AgentConfigRegistry, AgentRouter } from '../agents/index.js';
import type { SandboxManager } from '../sandbox/index.js';
import { resolve } from 'path';
import { homedir } from 'os';
import { mkdirSync } from 'fs';
export interface DaemonContext {
config: Config;
lifecycle: Lifecycle;
sessionStore: SessionStore;
sessionManager: SessionManager;
hookEngine: HookEngine;
modelRouter: ModelRouter;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
gateway: GatewayServer;
channelRegistry: ChannelRegistry;
mcpManager: McpManager;
skillRegistry: SkillRegistry;
skillInstaller: SkillInstaller;
agentConfigRegistry: AgentConfigRegistry;
agentRouter: AgentRouter;
sandboxManager?: SandboxManager;
browserManager?: BrowserManager;
}
function loadSystemPrompt(config: Config): string {
const searchDirs = [
process.cwd(),
resolve(import.meta.dirname, '../..'),
...(config.prompt.search_dirs ?? []),
];
const result = assembleSystemPrompt({
searchDirs,
extraSections: config.prompt.extra_sections,
});
if (result.loadedFiles.length > 0) {
console.log(`Loaded prompt templates: ${result.loadedFiles.map(f => f.split('/').pop()).join(', ')}`);
}
return result.prompt;
}
export async function startDaemon(config: Config): Promise<DaemonContext> {
const lifecycle = new Lifecycle();
// Ensure data directory exists (FLYNN_DATA_DIR overrides default for Docker/custom deployments)
const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn');
mkdirSync(dataDir, { recursive: true });
// Initialize session store and manager
const sessionStore = new SessionStore(resolve(dataDir, 'sessions.db'));
const sessionManager = new SessionManager(sessionStore);
lifecycle.onShutdown(async () => {
sessionStore.close();
console.log('Session store closed');
});
// Session pruning timer (TTL-based cleanup)
const ttlMs = parseDuration(config.sessions?.ttl ?? '30d');
if (ttlMs) {
const pruneInterval = setInterval(() => {
const cutoff = Math.floor((Date.now() - ttlMs) / 1000); // created_at is unix seconds
const pruned = sessionStore.pruneStale(cutoff);
if (pruned.length > 0) {
sessionManager.evictSessions(pruned);
console.log(`Pruned ${pruned.length} stale session(s) (TTL: ${config.sessions?.ttl ?? '30d'})`);
}
}, 3_600_000); // every hour
lifecycle.onShutdown(async () => {
clearInterval(pruneInterval);
});
}
// Initialize hook engine
const hookEngine = new HookEngine(config.hooks);
// Initialize tool registry, executor, web search, process tools, browser tools, and tool policy
const { toolRegistry, toolExecutor, browserManager } = initTools({ config, lifecycle, hookEngine });
// Initialize memory store, vector search, and memory tools
const { memoryStore, hybridSearch, memoryDir } = await initMemory({ config, dataDir, lifecycle, toolRegistry });
// Initialize MCP manager and start configured servers
const mcpManager = new McpManager(toolRegistry);
if (config.mcp.servers.length > 0) {
console.log(`Starting ${config.mcp.servers.length} MCP server(s)...`);
await mcpManager.startAll(config.mcp.servers);
}
lifecycle.onShutdown(async () => {
await mcpManager.stopAll();
console.log('MCP servers stopped');
});
// Initialize skills system
const defaultManagedDir = resolve(homedir(), '.flynn/workspace/skills');
const skillRegistry = new SkillRegistry();
const skillInstaller = new SkillInstaller(config.skills.managed_dir ?? defaultManagedDir);
const skills = loadAllSkills({
bundledDir: config.skills.bundled_dir,
managedDir: config.skills.managed_dir ?? defaultManagedDir,
workspaceDir: config.skills.workspace_dir,
});
for (const skill of skills) {
skillRegistry.register(skill);
}
if (skills.length > 0) {
const available = skillRegistry.listAvailable().length;
console.log(`Loaded ${skills.length} skill(s) (${available} available)`);
}
// Initialize agent config registry, router, and sandbox manager
const { agentConfigRegistry, agentRouter, sandboxManager } = await initAgents({ config, lifecycle });
// Initialize audio transcription config
const audioConfig: AudioTranscriptionConfig = {
endpoint: config.audio.transcription_endpoint,
apiKey: config.audio.transcription_api_key,
model: config.audio.transcription_model,
};
// Initialize model router
const modelRouter = createModelRouter(config);
// Load system prompt and append skill instructions
let systemPrompt = loadSystemPrompt(config);
const skillAdditions = skillRegistry.getSystemPromptAdditions();
if (skillAdditions) {
systemPrompt = `${systemPrompt}\n\n# Available Skills\n\n${skillAdditions}`;
}
// Initialize channel registry (created early so the gateway can reference it)
const channelRegistry = new ChannelRegistry();
// Create PairingManager if pairing is enabled
let pairingManager: PairingManager | undefined;
if (config.pairing.enabled) {
// Parse code_ttl: supports '5m', '1h', '30s' → milliseconds
const ttlMatch = config.pairing.code_ttl.match(/^(\d+)(s|m|h)$/);
const codeTtlMs = ttlMatch
? Number(ttlMatch[1]) * ({ s: 1000, m: 60_000, h: 3_600_000 }[ttlMatch[2] as 's' | 'm' | 'h'])
: 5 * 60_000; // default 5 minutes
pairingManager = new PairingManager({
enabled: true,
codeTtl: codeTtlMs,
codeLength: config.pairing.code_length,
});
console.log(`Pairing codes enabled (TTL: ${config.pairing.code_ttl}, length: ${config.pairing.code_length})`);
}
// Mutable reference to channel agents map — set after createMessageRouter() below.
// This allows the gateway's getTokenUsage callback to access channel agent usage data.
let channelAgents: Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }> | null = null;
// Initialize gateway WebSocket server
const gateway = new GatewayServer({
port: config.server.port,
host: config.server.localhost ? '127.0.0.1' : '0.0.0.0',
sessionManager,
modelClient: modelRouter,
systemPrompt,
toolRegistry,
toolExecutor,
auth: {
token: config.server.token,
tailscaleIdentity: config.server.tailscale_identity,
},
authHttp: config.server.auth_http,
lock: config.server.lock,
uiDir: resolve(import.meta.dirname, '../gateway/ui'),
config,
channelRegistry,
pairingManager,
restart: async () => {
console.log('Restart requested via gateway');
await lifecycle.shutdown();
// Exit with code 75 (EX_TEMPFAIL) — process supervisor should restart
process.exit(75);
},
getTokenUsage: () => {
const results: Array<{
sessionId: string;
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}> = [];
// Collect usage from gateway WebSocket sessions (NativeAgent-based)
const sessionBridge = gateway.getSessionBridge();
for (const entry of sessionBridge.getAllUsage()) {
results.push(entry);
}
// Collect usage from channel agents (AgentOrchestrator-based, has full delegation data)
if (channelAgents) {
for (const [sessionId, { orchestrator }] of channelAgents) {
const usage = orchestrator.getUsage();
results.push({
sessionId,
primary: usage.primary,
delegation: usage.delegation,
total: usage.total,
});
}
}
return results;
},
});
if (config.server.token) {
console.log(`Gateway auth: token required${config.server.tailscale_identity ? ' + Tailscale identity' : ''}`);
}
// ── Channel Registry ──────────────────────────────────────────
// Set up the unified message handler
const messageRouter = createMessageRouter({
sessionManager,
modelRouter,
systemPrompt,
toolRegistry,
toolExecutor,
config,
memoryStore,
agentConfigRegistry,
agentRouter,
sandboxManager,
audioConfig,
});
channelRegistry.setMessageHandler(messageRouter.handler);
// Wire channel agents into the getTokenUsage callback (late binding)
channelAgents = messageRouter.agents;
// Register all channel adapters (Telegram, Discord, Slack, WhatsApp, WebChat, cron, webhooks, Gmail)
const { cronScheduler } = registerChannels({ config, channelRegistry, hookEngine, pairingManager, gateway });
// ── Register Tier 1 agent tools ─────────────────────────────
// Session management tools (list, history, create, delete)
for (const tool of createSessionTools(sessionManager)) {
toolRegistry.register(tool);
}
// Agent discovery tool
toolRegistry.register(createAgentsListTool(agentConfigRegistry));
// Cross-channel messaging tool
toolRegistry.register(createMessageSendTool(channelRegistry));
// Cron management tools (if scheduler is active)
if (cronScheduler) {
for (const tool of createCronTools(cronScheduler)) {
toolRegistry.register(tool);
}
}
// ── Signal Handlers ───────────────────────────────────────────
const signalHandler = () => {
lifecycle.shutdown().then(() => process.exit(0));
};
process.on('SIGINT', signalHandler);
process.on('SIGTERM', signalHandler);
lifecycle.onShutdown(async () => {
process.off('SIGINT', signalHandler);
process.off('SIGTERM', signalHandler);
});
// ── Start Services ────────────────────────────────────────────
// Register shutdown handler for channels (stops Telegram bot etc.)
lifecycle.onShutdown(async () => {
await channelRegistry.stopAll();
console.log('Channel adapters stopped');
});
// Start all channel adapters (Telegram long polling, WebChat status)
await channelRegistry.startAll();
// Start gateway (HTTP + WS server)
lifecycle.onShutdown(async () => {
await gateway.stop();
console.log('Gateway server stopped');
});
await gateway.start();
// ── Tailscale Serve ────────────────────────────────────────────
if (config.server.tailscale?.serve) {
const { startTailscaleServe, stopTailscaleServe } = await import('../gateway/tailscale.js');
const tsConfig = {
localPort: config.server.port,
servePort: config.server.tailscale.port,
hostname: config.server.tailscale.hostname,
};
try {
await startTailscaleServe(tsConfig);
lifecycle.onShutdown(async () => {
await stopTailscaleServe(tsConfig);
});
} catch {
console.warn('Tailscale Serve failed to start — gateway still accessible on local port');
}
}
// ── Heartbeat Monitor ──────────────────────────────────────────
const heartbeatMonitor = new HeartbeatMonitor({
config: config.automation.heartbeat,
getGatewayPort: () => config.server.port,
modelRouter,
channelLister: channelRegistry,
memoryDir: config.memory.enabled ? memoryDir : undefined,
dataDir,
channelLookup: channelRegistry,
});
heartbeatMonitor.start();
lifecycle.onShutdown(async () => {
heartbeatMonitor.stop();
console.log('Heartbeat monitor stopped');
});
console.log('Flynn daemon started');
return {
config,
lifecycle,
sessionStore,
sessionManager,
hookEngine,
modelRouter,
toolRegistry,
toolExecutor,
gateway,
channelRegistry,
mcpManager,
skillRegistry,
skillInstaller,
agentConfigRegistry,
agentRouter,
sandboxManager,
browserManager,
};
}
export { Lifecycle } from './lifecycle.js';
export { createClientFromConfig, anthropicToGitHubModel, createAutoFallbackClient, createModelRouter } from './models.js';