Files
flynn/src/daemon/index.ts
T
William Valentin 73fc5d173d feat: add auto-login for GitHub Copilot when no token is available
GitHubModelsClient now lazily resolves tokens at first API call. If no
token exists (env var, stored OAuth, or config), it triggers the OAuth
device flow automatically via an onLoginRequired callback wired in both
the TUI and daemon entry points.
2026-02-06 22:33:48 -08:00

703 lines
25 KiB
TypeScript

import { Lifecycle } from './lifecycle.js';
import type { Config, ModelConfig } from '../config/index.js';
import { AnthropicClient, OpenAIClient, OllamaClient, LlamaCppClient, GeminiClient, BedrockClient, GitHubModelsClient, ModelRouter, DEFAULT_RETRY_CONFIG } from '../models/index.js';
import type { ModelClient, RetryConfig } from '../models/index.js';
import { AgentOrchestrator, type DelegationConfig } from '../backends/index.js';
import { SessionStore, SessionManager } from '../session/index.js';
import { HookEngine } from '../hooks/index.js';
import { ToolRegistry, ToolExecutor, ToolPolicy, allBuiltinTools, createWebSearchTools, createProcessTools, ProcessManager, BrowserManager, createBrowserTools } from '../tools/index.js';
import type { Tool } from '../tools/types.js';
import { MemoryStore } from '../memory/index.js';
import { createMemoryTools } from '../tools/builtin/index.js';
import { GatewayServer } from '../gateway/index.js';
import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js';
import { CronScheduler } from '../automation/index.js';
import type { InboundMessage, OutboundMessage } from '../channels/index.js';
import { McpManager } from '../mcp/index.js';
import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js';
import { assembleSystemPrompt } from '../prompt/index.js';
import { AgentConfigRegistry, AgentRouter } from '../agents/index.js';
import { DockerSandbox, SandboxManager, createSandboxedShellTool, createSandboxedProcessStartTool } from '../sandbox/index.js';
import { resolve } from 'path';
import { homedir } from 'os';
import { mkdirSync } from 'fs';
export interface DaemonContext {
config: Config;
lifecycle: Lifecycle;
sessionStore: SessionStore;
sessionManager: SessionManager;
hookEngine: HookEngine;
modelRouter: ModelRouter;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
gateway: GatewayServer;
channelRegistry: ChannelRegistry;
mcpManager: McpManager;
skillRegistry: SkillRegistry;
skillInstaller: SkillInstaller;
agentConfigRegistry: AgentConfigRegistry;
agentRouter: AgentRouter;
sandboxManager?: SandboxManager;
browserManager?: BrowserManager;
}
function loadSystemPrompt(config: Config): string {
const searchDirs = [
process.cwd(),
resolve(import.meta.dirname, '../..'),
...(config.prompt.search_dirs ?? []),
];
const result = assembleSystemPrompt({
searchDirs,
extraSections: config.prompt.extra_sections,
});
if (result.loadedFiles.length > 0) {
console.log(`Loaded prompt templates: ${result.loadedFiles.map(f => f.split('/').pop()).join(', ')}`);
}
return result.prompt;
}
/**
* Create a ModelClient from a provider config entry.
* Dispatches on the `provider` field so all tiers and fallback entries
* use the correct client implementation.
*/
export function createClientFromConfig(cfg: ModelConfig): ModelClient {
switch (cfg.provider) {
case 'anthropic':
return new AnthropicClient({
model: cfg.model,
apiKey: cfg.api_key,
authToken: cfg.auth_token,
});
case 'openai':
return new OpenAIClient({
model: cfg.model,
apiKey: cfg.api_key,
});
case 'ollama':
return new OllamaClient({
model: cfg.model,
host: cfg.endpoint,
numGpu: cfg.num_gpu,
});
case 'llamacpp':
return new LlamaCppClient({
endpoint: cfg.endpoint ?? 'http://localhost:8080',
model: cfg.model,
authToken: cfg.auth_token,
});
case 'gemini':
return new GeminiClient({
model: cfg.model,
apiKey: cfg.api_key,
});
case 'openrouter':
return new OpenAIClient({
model: cfg.model,
apiKey: cfg.api_key ?? process.env.OPENROUTER_API_KEY,
baseURL: cfg.endpoint ?? 'https://openrouter.ai/api/v1',
});
case 'bedrock':
return new BedrockClient({
model: cfg.model,
region: cfg.endpoint,
accessKeyId: cfg.api_key,
secretAccessKey: cfg.auth_token,
});
case 'github':
return new GitHubModelsClient({
model: cfg.model,
apiKey: cfg.api_key,
endpoint: cfg.endpoint,
onLoginRequired: async () => {
const { loginGitHub } = await import('../auth/index.js');
return loginGitHub((userCode, verificationUri) => {
console.log(`GitHub login required. Visit: ${verificationUri}`);
console.log(`Enter code: ${userCode}`);
});
},
});
default:
throw new Error(`Unknown model provider: ${(cfg as Record<string, unknown>).provider}`);
}
}
function createModelRouter(config: Config): ModelRouter {
const models = config.models;
const defaultClient = createClientFromConfig(models.default);
const fastClient = models.fast ? createClientFromConfig(models.fast) : undefined;
const complexClient = models.complex ? createClientFromConfig(models.complex) : undefined;
const localClient = models.local ? createClientFromConfig(models.local) : undefined;
// Build fallback chain — each entry references a tier name or 'local'
const fallbackChain: ModelClient[] = [];
for (const providerName of models.fallback_chain) {
if (providerName === 'local' && localClient) {
fallbackChain.push(localClient);
} else if (providerName === 'default') {
// Allows re-trying the default provider in the chain
fallbackChain.push(defaultClient);
} else if (providerName === 'fast' && fastClient) {
fallbackChain.push(fastClient);
} else if (providerName === 'complex' && complexClient) {
fallbackChain.push(complexClient);
} else if (models.local_providers?.[providerName]) {
// Named provider from local_providers map
fallbackChain.push(createClientFromConfig(models.local_providers[providerName]));
} else {
console.warn(`Fallback chain entry "${providerName}" not found — skipping`);
}
}
console.log(`Model router: default=${models.default.provider}/${models.default.model}, ` +
`fallback=[${models.fallback_chain.join(', ')}]`);
// Build retry config if enabled
const retryConfig: RetryConfig | undefined = config.retry.enabled ? {
maxRetries: config.retry.max_retries,
initialDelayMs: config.retry.initial_delay_ms,
backoffMultiplier: config.retry.backoff_multiplier,
maxDelayMs: config.retry.max_delay_ms,
nonRetryablePatterns: DEFAULT_RETRY_CONFIG.nonRetryablePatterns,
} : undefined;
if (retryConfig) {
console.log(`Retry policy: max_retries=${retryConfig.maxRetries}, initial_delay=${retryConfig.initialDelayMs}ms`);
}
return new ModelRouter({
default: defaultClient,
fast: fastClient,
complex: complexClient,
local: localClient,
fallbackChain,
retryConfig,
});
}
/**
* Create the unified message handler for the channel registry.
* Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session.
* The orchestrator wraps a NativeAgent and adds delegation to different model tiers.
*/
function createMessageRouter(deps: {
sessionManager: SessionManager;
modelRouter: ModelRouter;
systemPrompt: string;
toolRegistry: ToolRegistry;
toolExecutor: ToolExecutor;
config: Config;
memoryStore?: MemoryStore;
agentConfigRegistry?: AgentConfigRegistry;
agentRouter?: AgentRouter;
sandboxManager?: SandboxManager;
}) {
// Cache agents by session ID + agent config name to avoid recreating on every message
const agents = new Map<string, AgentOrchestrator>();
function getOrCreateAgent(channel: string, senderId: string): AgentOrchestrator {
// Resolve agent config name via routing (sender → channel → default fallback)
const agentConfigName = deps.agentRouter?.resolve(channel, senderId);
const agentConfig = agentConfigName ? deps.agentConfigRegistry?.get(agentConfigName) : undefined;
// Include agent config name in cache key so different agents aren't shared
const sessionId = agentConfigName
? `${channel}:${senderId}:${agentConfigName}`
: `${channel}:${senderId}`;
let agent = agents.get(sessionId);
if (!agent) {
const session = deps.sessionManager.getSession(channel, senderId);
// Use agent config overrides where available, falling back to global config
const effectiveSystemPrompt = agentConfig?.systemPrompt ?? deps.systemPrompt;
const effectiveTier = agentConfig?.modelTier ?? deps.config.agents.primary_tier ?? 'default';
const effectiveProvider = deps.config.models.default.provider;
const delegationConfig: DelegationConfig = {
compaction: deps.config.agents.delegation.compaction ?? 'fast',
memory_extraction: deps.config.agents.delegation.memory_extraction ?? 'fast',
classification: deps.config.agents.delegation.classification ?? 'fast',
tool_summarisation: deps.config.agents.delegation.tool_summarisation ?? 'fast',
complex_reasoning: deps.config.agents.delegation.complex_reasoning ?? 'complex',
};
// Clone the tool registry and replace shell tools with sandboxed versions if configured
let effectiveToolRegistry = deps.toolRegistry;
if (agentConfig?.sandbox && deps.sandboxManager && deps.config.sandbox.enabled) {
effectiveToolRegistry = deps.toolRegistry.clone();
// Lazy sandbox: create the sandboxed tools with a deferred sandbox reference
// The sandbox is created on first use via SandboxManager.getOrCreate()
const sandboxSessionId = sessionId;
const sandboxManager = deps.sandboxManager;
// Create a proxy sandbox that lazily initializes
const lazySandboxShell: Tool = {
name: 'shell.exec',
description: 'Execute a shell command inside a sandboxed container and return stdout/stderr.',
inputSchema: {
type: 'object',
properties: {
command: { type: 'string', description: 'The shell command to execute' },
cwd: { type: 'string', description: 'Working directory inside the container (optional)' },
timeout: { type: 'number', description: 'Timeout in milliseconds (default 30000)' },
},
required: ['command'],
},
execute: async (rawArgs: unknown) => {
const sandbox = await sandboxManager.getOrCreate(sandboxSessionId);
const tool = createSandboxedShellTool(sandbox);
return tool.execute(rawArgs);
},
};
const lazySandboxProcess: Tool = {
name: 'process.start',
description: 'Start a command in the background inside a sandboxed container.',
inputSchema: {
type: 'object',
properties: {
command: { type: 'string', description: 'The shell command to run in the background' },
cwd: { type: 'string', description: 'Working directory inside the container (optional)' },
},
required: ['command'],
},
execute: async (rawArgs: unknown) => {
const sandbox = await sandboxManager.getOrCreate(sandboxSessionId);
const tool = createSandboxedProcessStartTool(sandbox);
return tool.execute(rawArgs);
},
};
effectiveToolRegistry.replace(lazySandboxShell);
effectiveToolRegistry.replace(lazySandboxProcess);
}
agent = new AgentOrchestrator({
modelRouter: deps.modelRouter,
systemPrompt: effectiveSystemPrompt,
session,
toolRegistry: effectiveToolRegistry,
toolExecutor: deps.toolExecutor,
primaryTier: effectiveTier,
delegation: delegationConfig,
maxDelegationDepth: deps.config.agents.max_delegation_depth ?? 3,
compaction: deps.config.compaction.enabled ? {
thresholdPct: deps.config.compaction.threshold_pct,
keepTurns: deps.config.compaction.keep_turns,
summaryMaxTokens: deps.config.compaction.summary_max_tokens,
} : undefined,
modelName: deps.config.models.default.model,
contextWindow: deps.config.models.default.context_window,
memoryStore: deps.memoryStore,
toolPolicyContext: {
agent: effectiveTier,
provider: effectiveProvider,
},
});
agents.set(sessionId, agent);
}
return agent;
}
return async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>): Promise<void> => {
const agent = getOrCreateAgent(msg.channel, msg.senderId);
// Handle special commands
if (msg.metadata?.isCommand) {
if (msg.metadata.command === 'reset') {
agent.reset();
return;
}
if (msg.metadata.command === 'compact') {
const result = await agent.compact();
if (result && result.compactedCount > 0) {
await reply({
text: `Compacted ${result.compactedCount} messages: ${result.tokensBefore}${result.tokensAfter} tokens`,
replyTo: msg.id,
});
} else {
await reply({
text: 'Nothing to compact.',
replyTo: msg.id,
});
}
return;
}
if (msg.metadata.command === 'usage') {
const usage = agent.getUsage();
const lines = [
'**Token Usage**',
'',
`Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`,
];
const delegationEntries = Object.entries(usage.delegation);
if (delegationEntries.length > 0) {
lines.push('');
lines.push('Delegation:');
for (const [tier, stats] of delegationEntries) {
lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`);
}
}
lines.push('');
lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`);
if (usage.total.estimatedCost > 0) {
lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`);
}
await reply({ text: lines.join('\n'), replyTo: msg.id });
return;
}
}
try {
const response = await agent.process(msg.text, msg.attachments);
await reply({ text: response, replyTo: msg.id });
} catch (error) {
console.error(`Error processing message from ${msg.channel}:${msg.senderId}:`, error);
await reply({
text: 'Sorry, an error occurred while processing your message.',
replyTo: msg.id,
});
}
};
}
export async function startDaemon(config: Config): Promise<DaemonContext> {
const lifecycle = new Lifecycle();
// Ensure data directory exists
const dataDir = resolve(homedir(), '.local/share/flynn');
mkdirSync(dataDir, { recursive: true });
// Initialize memory store
const memoryDir = config.memory.dir ?? resolve(dataDir, 'memory');
mkdirSync(memoryDir, { recursive: true });
const memoryStore = config.memory.enabled
? new MemoryStore({ dir: memoryDir, maxContextTokens: config.memory.max_context_tokens })
: undefined;
// Initialize session store and manager
const sessionStore = new SessionStore(resolve(dataDir, 'sessions.db'));
const sessionManager = new SessionManager(sessionStore);
lifecycle.onShutdown(async () => {
sessionStore.close();
console.log('Session store closed');
});
// Initialize hook engine
const hookEngine = new HookEngine(config.hooks);
// Initialize tool registry and executor
const toolRegistry = new ToolRegistry();
for (const tool of allBuiltinTools) {
toolRegistry.register(tool);
}
// Register memory tools if memory is enabled
if (memoryStore) {
for (const tool of createMemoryTools(memoryStore)) {
toolRegistry.register(tool);
}
}
// Register web search tool if configured with credentials
if (config.web_search.api_key || config.web_search.endpoint) {
for (const tool of createWebSearchTools({
provider: config.web_search.provider,
apiKey: config.web_search.api_key,
endpoint: config.web_search.endpoint,
maxResults: config.web_search.max_results,
})) {
toolRegistry.register(tool);
}
}
// Initialize process manager and register process tools
const processManager = new ProcessManager({
maxConcurrent: config.process.max_concurrent,
maxRuntimeMinutes: config.process.max_runtime_minutes,
bufferSize: config.process.buffer_size,
});
for (const tool of createProcessTools(processManager)) {
toolRegistry.register(tool);
}
lifecycle.onShutdown(async () => {
await processManager.shutdown();
console.log('Process manager stopped');
});
// Initialize browser manager and register browser tools (if enabled)
let browserManager: BrowserManager | undefined;
if (config.browser?.enabled) {
browserManager = new BrowserManager({
executablePath: config.browser.executable_path,
wsEndpoint: config.browser.ws_endpoint,
headless: config.browser.headless,
maxPages: config.browser.max_pages,
defaultTimeout: config.browser.default_timeout,
});
for (const tool of createBrowserTools(browserManager)) {
toolRegistry.register(tool);
}
console.log(`Browser tools enabled (headless=${config.browser.headless})`);
lifecycle.onShutdown(async () => {
await browserManager!.shutdown();
console.log('Browser manager stopped');
});
}
const toolExecutor = new ToolExecutor(toolRegistry, hookEngine);
// Initialize tool policy from config
const toolPolicy = new ToolPolicy(config.tools);
toolRegistry.setPolicy(toolPolicy);
const effectiveProfile = toolPolicy.getEffectiveProfile();
if (effectiveProfile !== 'full') {
console.log(`Tool policy: profile=${effectiveProfile}, deny=[${config.tools.deny.join(', ')}]`);
}
// Initialize MCP manager and start configured servers
const mcpManager = new McpManager(toolRegistry);
if (config.mcp.servers.length > 0) {
console.log(`Starting ${config.mcp.servers.length} MCP server(s)...`);
await mcpManager.startAll(config.mcp.servers);
}
lifecycle.onShutdown(async () => {
await mcpManager.stopAll();
console.log('MCP servers stopped');
});
// Initialize skills system
const defaultManagedDir = resolve(homedir(), '.flynn/workspace/skills');
const skillRegistry = new SkillRegistry();
const skillInstaller = new SkillInstaller(config.skills.managed_dir ?? defaultManagedDir);
const skills = loadAllSkills({
bundledDir: config.skills.bundled_dir,
managedDir: config.skills.managed_dir ?? defaultManagedDir,
workspaceDir: config.skills.workspace_dir,
});
for (const skill of skills) {
skillRegistry.register(skill);
}
if (skills.length > 0) {
const available = skillRegistry.listAvailable().length;
console.log(`Loaded ${skills.length} skill(s) (${available} available)`);
}
// Initialize agent config registry and router
const agentConfigRegistry = new AgentConfigRegistry();
if (config.agent_configs && Object.keys(config.agent_configs).length > 0) {
agentConfigRegistry.loadFromConfig(config.agent_configs);
console.log(`Loaded ${Object.keys(config.agent_configs).length} agent config(s)`);
}
const agentRouter = new AgentRouter(config.routing);
// Initialize sandbox manager if Docker is available
let sandboxManager: SandboxManager | undefined;
if (config.sandbox.enabled) {
const dockerAvailable = await DockerSandbox.isAvailable();
if (dockerAvailable) {
sandboxManager = new SandboxManager(config.sandbox);
console.log(`Docker sandbox enabled (image=${config.sandbox.image}, network=${config.sandbox.network})`);
} else {
console.warn('Docker sandbox enabled but Docker not available — falling back to host execution');
}
}
if (sandboxManager) {
lifecycle.onShutdown(async () => {
await sandboxManager!.destroyAll();
console.log('Docker sandboxes destroyed');
});
}
// Initialize model router
const modelRouter = createModelRouter(config);
// Load system prompt and append skill instructions
let systemPrompt = loadSystemPrompt(config);
const skillAdditions = skillRegistry.getSystemPromptAdditions();
if (skillAdditions) {
systemPrompt = `${systemPrompt}\n\n# Available Skills\n\n${skillAdditions}`;
}
// Initialize gateway WebSocket server
const gateway = new GatewayServer({
port: config.server.port,
host: config.server.localhost ? '127.0.0.1' : '0.0.0.0',
sessionManager,
modelClient: modelRouter,
systemPrompt,
toolRegistry,
toolExecutor,
auth: {
token: config.server.token,
tailscaleIdentity: config.server.tailscale_identity,
},
authHttp: config.server.auth_http,
uiDir: resolve(import.meta.dirname, '../gateway/ui'),
config,
restart: async () => {
console.log('Restart requested via gateway');
await lifecycle.shutdown();
// Exit with code 75 (EX_TEMPFAIL) — process supervisor should restart
process.exit(75);
},
});
if (config.server.token) {
console.log(`Gateway auth: token required${config.server.tailscale_identity ? ' + Tailscale identity' : ''}`);
}
// ── Channel Registry ──────────────────────────────────────────
const channelRegistry = new ChannelRegistry();
// Set up the unified message handler
channelRegistry.setMessageHandler(createMessageRouter({
sessionManager,
modelRouter,
systemPrompt,
toolRegistry,
toolExecutor,
config,
memoryStore,
agentConfigRegistry,
agentRouter,
sandboxManager,
}));
// Register Telegram adapter
const telegramAdapter = new TelegramAdapter({
botToken: config.telegram.bot_token,
allowedChatIds: config.telegram.allowed_chat_ids,
requireMention: config.telegram.require_mention,
hookEngine,
});
channelRegistry.register(telegramAdapter);
// Register Discord adapter (if configured)
if (config.discord) {
const discordAdapter = new DiscordAdapter({
botToken: config.discord.bot_token,
allowedGuildIds: config.discord.allowed_guild_ids.length > 0 ? config.discord.allowed_guild_ids : undefined,
allowedChannelIds: config.discord.allowed_channel_ids.length > 0 ? config.discord.allowed_channel_ids : undefined,
requireMention: config.discord.require_mention,
});
channelRegistry.register(discordAdapter);
}
// Register Slack adapter (if configured)
if (config.slack) {
const slackAdapter = new SlackAdapter({
botToken: config.slack.bot_token,
appToken: config.slack.app_token,
signingSecret: config.slack.signing_secret,
allowedChannelIds: config.slack.allowed_channel_ids.length > 0 ? config.slack.allowed_channel_ids : undefined,
requireMention: config.slack.require_mention,
});
channelRegistry.register(slackAdapter);
}
// Register WhatsApp adapter (if configured)
if (config.whatsapp) {
const whatsappAdapter = new WhatsAppAdapter({
allowedNumbers: config.whatsapp.allowed_numbers.length > 0 ? config.whatsapp.allowed_numbers : undefined,
allowedGroupIds: config.whatsapp.allowed_group_ids.length > 0 ? config.whatsapp.allowed_group_ids : undefined,
requireMention: config.whatsapp.require_mention,
dataDir: config.whatsapp.data_dir,
});
channelRegistry.register(whatsappAdapter);
}
// Register WebChat adapter (wraps the gateway)
const webChatAdapter = new WebChatAdapter({ gateway });
channelRegistry.register(webChatAdapter);
// Register cron scheduler adapter (if any cron jobs configured)
if (config.automation.cron.length > 0) {
const cronScheduler = new CronScheduler(config.automation.cron, channelRegistry);
channelRegistry.register(cronScheduler);
console.log(`Registered ${config.automation.cron.length} cron job(s)`);
}
// ── Signal Handlers ───────────────────────────────────────────
const signalHandler = () => {
lifecycle.shutdown().then(() => process.exit(0));
};
process.on('SIGINT', signalHandler);
process.on('SIGTERM', signalHandler);
lifecycle.onShutdown(async () => {
process.off('SIGINT', signalHandler);
process.off('SIGTERM', signalHandler);
});
// ── Start Services ────────────────────────────────────────────
// Register shutdown handler for channels (stops Telegram bot etc.)
lifecycle.onShutdown(async () => {
await channelRegistry.stopAll();
console.log('Channel adapters stopped');
});
// Start all channel adapters (Telegram long polling, WebChat status)
await channelRegistry.startAll();
// Start gateway (HTTP + WS server)
lifecycle.onShutdown(async () => {
await gateway.stop();
console.log('Gateway server stopped');
});
await gateway.start();
console.log('Flynn daemon started');
return {
config,
lifecycle,
sessionStore,
sessionManager,
hookEngine,
modelRouter,
toolRegistry,
toolExecutor,
gateway,
channelRegistry,
mcpManager,
skillRegistry,
skillInstaller,
agentConfigRegistry,
agentRouter,
sandboxManager,
browserManager,
};
}
export { Lifecycle } from './lifecycle.js';