feat: implement Tier 3 features — lane queue, credential redaction, token dashboard, xAI, Voyage AI

- Lane Queue: per-session FIFO queue in gateway replacing reject-when-busy (9 tests)
- Credential Redaction: redactConfig() expanded to cover 18+ secret fields (16 tests)
- Web UI Token Dashboard: system.tokenUsage endpoint + Usage page with summary cards
- xAI (Grok) Provider: OpenAI-compatible client with model pricing
- Voyage AI Embeddings: new embedding provider with configurable dimensions (5 tests)
- Update gap analysis: 90→95 match (70%→74%), Tier 3 section marked DONE
- Update state.json: test count 1001→1034, add tier3_completion entry

Total: 1034 tests passing across 85 files, typecheck clean
This commit is contained in:
William Valentin
2026-02-09 10:32:57 -08:00
parent 1d126cddfb
commit 9be8f76bc7
26 changed files with 1395 additions and 105 deletions
-8
View File
@@ -107,10 +107,6 @@ export class NativeAgent {
this._totalUsage.outputTokens += response.usage.outputTokens;
this._callCount++;
if (response.fallback) {
console.warn(`[Flynn] ${response.fallbackReason}`);
}
// Prepend thinking content if present
let finalContent = response.content;
if (response.thinkingContent) {
@@ -149,10 +145,6 @@ export class NativeAgent {
this._totalUsage.outputTokens += response.usage.outputTokens;
this._callCount++;
if (response.fallback) {
console.warn(`[Flynn] ${response.fallbackReason}`);
}
// If the model didn't request tool use, we're done
if (response.stopReason !== 'tool_use' || !response.toolCalls?.length) {
let finalContent = response.content;
+2 -2
View File
@@ -19,7 +19,7 @@ const serverSchema = z.object({
});
const modelConfigBaseSchema = z.object({
provider: z.enum(['anthropic', 'openai', 'gemini', 'ollama', 'llamacpp', 'openrouter', 'bedrock', 'github', 'zhipuai']),
provider: z.enum(['anthropic', 'openai', 'gemini', 'ollama', 'llamacpp', 'openrouter', 'bedrock', 'github', 'zhipuai', 'xai']),
model: z.string(),
endpoint: z.string().optional(),
api_key: z.string().optional(),
@@ -173,7 +173,7 @@ const agentsSchema = z.object({
max_delegation_depth: z.number().min(1).max(10).default(3),
}).default({});
const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']);
const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp', 'voyage']);
const embeddingSchema = z.object({
enabled: z.boolean().default(false),
+54 -4
View File
@@ -114,6 +114,12 @@ export function createClientFromConfig(cfg: ModelConfig): ModelClient {
apiKey: cfg.api_key ?? process.env.ZHIPUAI_API_KEY,
baseURL: cfg.endpoint ?? 'https://api.z.ai/api/paas/v4',
});
case 'xai':
return new OpenAIClient({
model: cfg.model,
apiKey: cfg.api_key ?? process.env.XAI_API_KEY,
baseURL: cfg.endpoint ?? 'https://api.x.ai/v1',
});
case 'bedrock':
return new BedrockClient({
model: cfg.model,
@@ -313,6 +319,8 @@ export function createModelRouter(config: Config): ModelRouter {
* Create the unified message handler for the channel registry.
* Each channel+sender pair gets its own AgentOrchestrator backed by a persistent session.
* The orchestrator wraps a NativeAgent and adds delegation to different model tiers.
*
* Returns both the message handler function and the agents map for usage tracking.
*/
function createMessageRouter(deps: {
sessionManager: SessionManager;
@@ -326,7 +334,10 @@ function createMessageRouter(deps: {
agentRouter?: AgentRouter;
sandboxManager?: SandboxManager;
audioConfig?: AudioTranscriptionConfig;
}) {
}): {
handler: (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>) => Promise<void>;
agents: Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }>;
} {
// Cache agents by session ID + agent config name to avoid recreating on every message
const agents = new Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }>();
@@ -444,7 +455,7 @@ function createMessageRouter(deps: {
return entry;
}
return async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>): Promise<void> => {
const handler = async (msg: InboundMessage, reply: (response: OutboundMessage) => Promise<void>): Promise<void> => {
const { orchestrator: agent, collector } = getOrCreateAgent(msg.channel, msg.senderId);
// Handle special commands
@@ -524,6 +535,8 @@ function createMessageRouter(deps: {
});
}
};
return { handler, agents };
}
export async function startDaemon(config: Config): Promise<DaemonContext> {
@@ -780,6 +793,10 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
// Initialize channel registry (created early so the gateway can reference it)
const channelRegistry = new ChannelRegistry();
// Mutable reference to channel agents map — set after createMessageRouter() below.
// This allows the gateway's getTokenUsage callback to access channel agent usage data.
let channelAgents: Map<string, { orchestrator: AgentOrchestrator; collector: OutboundAttachmentCollector }> | null = null;
// Initialize gateway WebSocket server
const gateway = new GatewayServer({
port: config.server.port,
@@ -803,6 +820,35 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
// Exit with code 75 (EX_TEMPFAIL) — process supervisor should restart
process.exit(75);
},
getTokenUsage: () => {
const results: Array<{
sessionId: string;
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}> = [];
// Collect usage from gateway WebSocket sessions (NativeAgent-based)
const sessionBridge = gateway.getSessionBridge();
for (const entry of sessionBridge.getAllUsage()) {
results.push(entry);
}
// Collect usage from channel agents (AgentOrchestrator-based, has full delegation data)
if (channelAgents) {
for (const [sessionId, { orchestrator }] of channelAgents) {
const usage = orchestrator.getUsage();
results.push({
sessionId,
primary: usage.primary,
delegation: usage.delegation,
total: usage.total,
});
}
}
return results;
},
});
if (config.server.token) {
@@ -812,7 +858,7 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
// ── Channel Registry ──────────────────────────────────────────
// Set up the unified message handler
channelRegistry.setMessageHandler(createMessageRouter({
const messageRouter = createMessageRouter({
sessionManager,
modelRouter,
systemPrompt,
@@ -824,7 +870,11 @@ export async function startDaemon(config: Config): Promise<DaemonContext> {
agentRouter,
sandboxManager,
audioConfig,
}));
});
channelRegistry.setMessageHandler(messageRouter.handler);
// Wire channel agents into the getTokenUsage callback (late binding)
channelAgents = messageRouter.agents;
// Register Telegram adapter
const telegramAdapter = new TelegramAdapter({
+1 -1
View File
@@ -381,7 +381,7 @@ export class MinimalTui {
fullContent += event.content;
}
if (event.type === 'fallback_warning' && event.fallbackReason) {
console.warn(`\n⚠ ${event.fallbackReason}`);
console.warn(`\n⚠ Using fallback model`);
}
if (event.type === 'done' && event.usage) {
this.totalUsage.inputTokens += event.usage.inputTokens;
+43 -36
View File
@@ -2,10 +2,12 @@ import type { GatewayRequest, GatewayAttachment, OutboundMessage } from '../prot
import type { SendFn } from '../router.js';
import { makeEvent, makeError, ErrorCode } from '../protocol.js';
import type { SessionBridge } from '../session-bridge.js';
import type { LaneQueue } from '../lane-queue.js';
import type { Attachment } from '../../channels/types.js';
export interface AgentHandlerDeps {
sessionBridge: SessionBridge;
laneQueue: LaneQueue;
}
export function createAgentHandlers(deps: AgentHandlerDeps) {
@@ -21,51 +23,56 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required (set by server)');
}
if (deps.sessionBridge.isBusy(connectionId)) {
return makeError(request.id, ErrorCode.AgentBusy, 'Agent is already processing a request');
}
const agent = deps.sessionBridge.getAgent(connectionId);
if (!agent) {
return makeError(request.id, ErrorCode.SessionNotFound, 'No agent for this connection');
}
deps.sessionBridge.setBusy(connectionId, true);
// Queue by session ID so multiple connections sharing a session are serialised.
// Falls back to connectionId if session lookup fails (shouldn't happen).
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const laneId = sessionId ?? connectionId;
// Set up tool use callback to emit streaming events
deps.sessionBridge.setOnToolUse(connectionId, (event) => {
if (event.type === 'start') {
send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args }));
} else if (event.type === 'end') {
send(makeEvent(request.id, 'tool_end', {
tool: event.tool,
result: event.result ? {
success: event.result.success,
output: event.result.output,
error: event.result.error,
} : undefined,
// Enqueue the work — if the lane is idle it runs immediately,
// otherwise it waits for earlier requests on the same session to finish.
return deps.laneQueue.enqueue(laneId, async () => {
deps.sessionBridge.setBusy(connectionId, true);
// Set up tool use callback to emit streaming events
deps.sessionBridge.setOnToolUse(connectionId, (event) => {
if (event.type === 'start') {
send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args }));
} else if (event.type === 'end') {
send(makeEvent(request.id, 'tool_end', {
tool: event.tool,
result: event.result ? {
success: event.result.success,
output: event.result.output,
error: event.result.error,
} : undefined,
}));
}
});
try {
// Convert gateway attachments to channel attachments
const attachments: Attachment[] | undefined = params.attachments?.map(a => ({
mimeType: a.mimeType,
data: a.data,
url: a.url,
filename: a.filename,
}));
const response = await agent.process(params.message!, attachments);
send(makeEvent(request.id, 'done', { content: response }));
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
} finally {
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
}
});
try {
// Convert gateway attachments to channel attachments
const attachments: Attachment[] | undefined = params.attachments?.map(a => ({
mimeType: a.mimeType,
data: a.data,
url: a.url,
filename: a.filename,
}));
const response = await agent.process(params.message, attachments);
send(makeEvent(request.id, 'done', { content: response }));
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
} finally {
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
}
},
'agent.cancel': async (request: GatewayRequest): Promise<OutboundMessage> => {
+73 -13
View File
@@ -8,30 +8,90 @@ export interface ConfigHandlerDeps {
/**
* Redact sensitive values from config before returning.
* Replaces API keys, tokens, and passwords with "***".
* Replaces API keys, tokens, passwords, and other credentials with "***".
*
* Covers: telegram, discord, slack, server, models (tiers + fallbacks + local_providers),
* web_search, audio, memory.embedding, automation (webhooks + gmail), and mcp server env vars.
*/
function redactConfig(config: Config): Record<string, unknown> {
export function redactConfig(config: Config): Record<string, unknown> {
const raw = JSON.parse(JSON.stringify(config)) as Record<string, unknown>;
// Redact telegram bot token
const telegram = raw.telegram as Record<string, unknown> | undefined;
if (telegram?.bot_token) {
telegram.bot_token = '***';
}
// Helper: redact specified keys on an object if they exist and are non-nullish
const redact = (obj: Record<string, unknown> | undefined, ...keys: string[]) => {
if (!obj) return;
for (const key of keys) {
if (obj[key] !== undefined && obj[key] !== null) obj[key] = '***';
}
};
// Redact model keys/tokens
// Telegram
redact(raw.telegram as Record<string, unknown>, 'bot_token');
// Discord
redact(raw.discord as Record<string, unknown>, 'bot_token');
// Slack
redact(raw.slack as Record<string, unknown>, 'bot_token', 'app_token', 'signing_secret');
// Server (gateway bearer token)
redact(raw.server as Record<string, unknown>, 'token');
// Models — tiers, their fallbacks, and local_providers (+ their fallbacks)
const models = raw.models as Record<string, unknown> | undefined;
if (models) {
for (const tier of ['default', 'fast', 'complex', 'local'] as const) {
for (const tier of ['default', 'fast', 'complex', 'local']) {
const m = models[tier] as Record<string, unknown> | undefined;
if (m?.api_key) m.api_key = '***';
if (m?.auth_token) m.auth_token = '***';
redact(m, 'api_key', 'auth_token');
const fb = m?.fallback as Record<string, unknown> | undefined;
redact(fb, 'api_key', 'auth_token');
}
const localProviders = models.local_providers as Record<string, Record<string, unknown>> | undefined;
if (localProviders) {
for (const provider of Object.values(localProviders)) {
if (provider.api_key) provider.api_key = '***';
if (provider.auth_token) provider.auth_token = '***';
redact(provider, 'api_key', 'auth_token');
const fb = provider.fallback as Record<string, unknown> | undefined;
redact(fb, 'api_key', 'auth_token');
}
}
}
// Web search
redact(raw.web_search as Record<string, unknown>, 'api_key');
// Audio
redact(raw.audio as Record<string, unknown>, 'transcription_api_key');
// Memory → embedding
const memory = raw.memory as Record<string, unknown> | undefined;
if (memory) {
redact(memory.embedding as Record<string, unknown>, 'api_key');
}
// Automation — webhook HMAC secrets and gmail credential paths
const automation = raw.automation as Record<string, unknown> | undefined;
if (automation) {
const webhooks = automation.webhooks as Record<string, unknown>[] | undefined;
if (webhooks) {
for (const wh of webhooks) {
redact(wh, 'secret');
}
}
const gmail = automation.gmail as Record<string, unknown> | undefined;
redact(gmail, 'credentials_file', 'token_file');
}
// MCP server env vars (may contain API keys or other secrets)
const mcp = raw.mcp as Record<string, unknown> | undefined;
if (mcp) {
const servers = mcp.servers as Record<string, unknown>[] | undefined;
if (servers) {
for (const srv of servers) {
if (srv.env && typeof srv.env === 'object') {
const env = srv.env as Record<string, unknown>;
for (const key of Object.keys(env)) {
env[key] = '***';
}
}
}
}
}
+302 -7
View File
@@ -1,9 +1,11 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { createSystemHandlers } from './system.js';
import type { TokenUsageEntry } from './system.js';
import { createSessionHandlers } from './sessions.js';
import { createToolHandlers } from './tools.js';
import { createAgentHandlers } from './agent.js';
import { createConfigHandlers } from './config.js';
import { createConfigHandlers, redactConfig } from './config.js';
import { LaneQueue } from '../lane-queue.js';
import { ErrorCode } from '../protocol.js';
import type { GatewayRequest, GatewayResponse, GatewayError, GatewayEvent, OutboundMessage } from '../protocol.js';
@@ -33,6 +35,64 @@ describe('system handlers', () => {
});
});
describe('system.tokenUsage handler', () => {
it('returns empty sessions when no getTokenUsage provided', async () => {
const handlers = createSystemHandlers({
startTime: Date.now(),
version: '0.1.0',
getSessionCount: () => 0,
getToolCount: () => 0,
getConnectionCount: () => 0,
});
const req: GatewayRequest = { id: 1, method: 'system.tokenUsage' };
const result = await handlers['system.tokenUsage'](req) as GatewayResponse;
expect(result.id).toBe(1);
const r = result.result as { sessions: unknown[] };
expect(r.sessions).toEqual([]);
});
it('returns session usage data from getTokenUsage callback', async () => {
const mockUsage: TokenUsageEntry[] = [
{
sessionId: 'telegram:user1',
primary: { inputTokens: 1000, outputTokens: 500, calls: 3 },
delegation: { fast: { inputTokens: 200, outputTokens: 100, calls: 1 } },
total: { inputTokens: 1200, outputTokens: 600, calls: 4, estimatedCost: 0.0234 },
},
{
sessionId: 'ws:abc-123',
primary: { inputTokens: 50, outputTokens: 25, calls: 1 },
delegation: {},
total: { inputTokens: 50, outputTokens: 25, calls: 1, estimatedCost: 0 },
},
];
const handlers = createSystemHandlers({
startTime: Date.now(),
version: '0.1.0',
getSessionCount: () => 2,
getToolCount: () => 0,
getConnectionCount: () => 1,
getTokenUsage: () => mockUsage,
});
const req: GatewayRequest = { id: 2, method: 'system.tokenUsage' };
const result = await handlers['system.tokenUsage'](req) as GatewayResponse;
expect(result.id).toBe(2);
const r = result.result as { sessions: typeof mockUsage };
expect(r.sessions).toHaveLength(2);
expect(r.sessions[0].sessionId).toBe('telegram:user1');
expect(r.sessions[0].total.inputTokens).toBe(1200);
expect(r.sessions[0].total.estimatedCost).toBe(0.0234);
expect(r.sessions[0].delegation.fast.inputTokens).toBe(200);
expect(r.sessions[1].sessionId).toBe('ws:abc-123');
expect(r.sessions[1].total.calls).toBe(1);
});
});
describe('session handlers', () => {
const mockHistory = [
{ role: 'user' as const, content: 'hello' },
@@ -188,8 +248,11 @@ describe('agent handlers', () => {
setOnToolUse: vi.fn(),
};
const laneQueue = new LaneQueue();
const handlers = createAgentHandlers({
sessionBridge: mockBridge as any,
laneQueue,
});
beforeEach(() => {
@@ -260,13 +323,38 @@ describe('agent handlers', () => {
expect(result.error.message).toContain('message');
});
it('agent.send rejects when busy', async () => {
mockBridge.isBusy.mockReturnValue(true);
const req: GatewayRequest = { id: 3, method: 'agent.send', params: { message: 'hi', connectionId: 'conn-1' } };
const send = vi.fn();
const result = await handlers['agent.send'](req, send) as GatewayError;
it('agent.send queues concurrent requests instead of rejecting', async () => {
// Simulate the first request blocking
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
let callCount = 0;
mockAgent.process.mockImplementation(async () => {
callCount++;
if (callCount === 1) {
await firstBlocks;
return 'first response';
}
return 'second response';
});
expect(result.error.code).toBe(ErrorCode.AgentBusy);
const req1: GatewayRequest = { id: 3, method: 'agent.send', params: { message: 'first', connectionId: 'conn-1' } };
const req2: GatewayRequest = { id: 4, method: 'agent.send', params: { message: 'second', connectionId: 'conn-1' } };
const sent1: OutboundMessage[] = [];
const sent2: OutboundMessage[] = [];
const p1 = handlers['agent.send'](req1, vi.fn((msg: OutboundMessage) => sent1.push(msg)));
const p2 = handlers['agent.send'](req2, vi.fn((msg: OutboundMessage) => sent2.push(msg)));
// Release the first request
resolveFirst();
await Promise.all([p1, p2]);
// Both should have completed — no AgentBusy error
expect(sent1).toHaveLength(1);
expect((sent1[0] as GatewayEvent).event).toBe('done');
expect(sent2).toHaveLength(1);
expect((sent2[0] as GatewayEvent).event).toBe('done');
expect(mockAgent.process).toHaveBeenCalledTimes(2);
});
it('agent.send handles errors gracefully', async () => {
@@ -452,3 +540,210 @@ describe('config handlers', () => {
expect(result.error.code).toBe(ErrorCode.InvalidRequest);
});
});
describe('redactConfig comprehensive credential redaction', () => {
/**
* Build a full config object with secrets in every possible location.
* Optional sections (discord, slack, etc.) are included to test redaction.
*/
function makeFullConfig() {
return {
telegram: { bot_token: 'tg-secret', allowed_chat_ids: [1], require_mention: true },
discord: { bot_token: 'dc-secret', allowed_guild_ids: ['g1'], allowed_channel_ids: [], require_mention: true },
slack: { bot_token: 'sl-bot', app_token: 'sl-app', signing_secret: 'sl-sign', allowed_channel_ids: [], require_mention: false },
server: { tailscale_only: true, localhost: true, port: 18800, token: 'bearer-secret', tailscale_identity: false, auth_http: true },
models: {
default: { provider: 'anthropic' as const, model: 'claude', api_key: 'sk-def', auth_token: 'at-def',
fallback: { provider: 'openai' as const, model: 'gpt-4', api_key: 'sk-def-fb', auth_token: 'at-def-fb' },
},
fast: { provider: 'openai' as const, model: 'gpt-4o-mini', api_key: 'sk-fast',
fallback: { provider: 'gemini' as const, model: 'gemini-flash', api_key: 'sk-fast-fb' },
},
complex: { provider: 'anthropic' as const, model: 'claude-opus', auth_token: 'at-complex' },
local: { provider: 'ollama' as const, model: 'llama3' },
fallback_chain: ['anthropic'],
local_providers: {
ollama: { provider: 'ollama' as const, model: 'llama3', api_key: 'lp-key', auth_token: 'lp-token',
fallback: { provider: 'llamacpp' as const, model: 'llama', api_key: 'lp-fb-key' },
},
},
thinking: { anthropic: { budgetTokens: 4096 }, openai: { reasoningEffort: 'medium' as const }, gemini: { budgetTokens: 4096 } },
},
web_search: { provider: 'brave' as const, api_key: 'brave-key', endpoint: 'https://api.brave.com', max_results: 5 },
audio: { transcription_endpoint: 'https://api.openai.com', transcription_api_key: 'audio-key', transcription_model: 'whisper-1' },
memory: {
enabled: true, auto_extract: true, max_context_tokens: 2000,
embedding: { enabled: true, provider: 'openai' as const, model: 'text-embedding-3-small', api_key: 'embed-key', dimensions: 1536, chunk_size: 512, chunk_overlap: 50, top_k: 5, hybrid_weight: 0.7 },
},
automation: {
cron: [],
webhooks: [
{ name: 'github', secret: 'wh-secret-1', message: '{{body}}', output: { channel: 'telegram', peer: '123' }, enabled: true },
{ name: 'gitlab', secret: 'wh-secret-2', message: '{{body}}', output: { channel: 'telegram', peer: '456' }, enabled: true },
{ name: 'no-secret', message: '{{body}}', output: { channel: 'telegram', peer: '789' }, enabled: true },
],
gmail: { enabled: true, credentials_file: '/path/to/creds.json', token_file: '/path/to/token.json', watch_labels: ['INBOX'], poll_interval: '60s', output: { channel: 'telegram', peer: '123' }, message: 'new email' },
heartbeat: { enabled: false, interval: '5m', checks: ['gateway'], failure_threshold: 2, disk_threshold_mb: 100 },
},
mcp: {
servers: [
{ name: 'my-server', command: 'node', args: ['server.js'], env: { API_KEY: 'mcp-api-key', DATABASE_URL: 'postgres://secret@host/db' } },
{ name: 'no-env', command: 'python', args: ['app.py'] },
],
},
hooks: { confirm: ['shell.exec'], log: [], silent: [] },
backends: { claude_code: { enabled: false }, opencode: { enabled: false }, native: { enabled: true } },
};
}
it('redacts telegram.bot_token', () => {
const result = redactConfig(makeFullConfig() as any);
expect((result.telegram as any).bot_token).toBe('***');
});
it('redacts discord.bot_token', () => {
const result = redactConfig(makeFullConfig() as any);
expect((result.discord as any).bot_token).toBe('***');
});
it('redacts slack.bot_token, app_token, and signing_secret', () => {
const result = redactConfig(makeFullConfig() as any);
const slack = result.slack as any;
expect(slack.bot_token).toBe('***');
expect(slack.app_token).toBe('***');
expect(slack.signing_secret).toBe('***');
});
it('redacts server.token', () => {
const result = redactConfig(makeFullConfig() as any);
expect((result.server as any).token).toBe('***');
});
it('redacts model api_key and auth_token for all tiers', () => {
const result = redactConfig(makeFullConfig() as any);
const models = result.models as any;
expect(models.default.api_key).toBe('***');
expect(models.default.auth_token).toBe('***');
expect(models.fast.api_key).toBe('***');
expect(models.complex.auth_token).toBe('***');
// local has no keys — should remain unchanged
expect(models.local.api_key).toBeUndefined();
});
it('redacts model fallback api_key and auth_token', () => {
const result = redactConfig(makeFullConfig() as any);
const models = result.models as any;
expect(models.default.fallback.api_key).toBe('***');
expect(models.default.fallback.auth_token).toBe('***');
expect(models.fast.fallback.api_key).toBe('***');
});
it('redacts local_providers api_key, auth_token, and their fallbacks', () => {
const result = redactConfig(makeFullConfig() as any);
const ollama = (result.models as any).local_providers.ollama;
expect(ollama.api_key).toBe('***');
expect(ollama.auth_token).toBe('***');
expect(ollama.fallback.api_key).toBe('***');
});
it('redacts web_search.api_key', () => {
const result = redactConfig(makeFullConfig() as any);
expect((result.web_search as any).api_key).toBe('***');
});
it('redacts audio.transcription_api_key', () => {
const result = redactConfig(makeFullConfig() as any);
expect((result.audio as any).transcription_api_key).toBe('***');
});
it('redacts memory.embedding.api_key', () => {
const result = redactConfig(makeFullConfig() as any);
expect((result.memory as any).embedding.api_key).toBe('***');
});
it('redacts automation webhook secrets', () => {
const result = redactConfig(makeFullConfig() as any);
const webhooks = (result.automation as any).webhooks;
expect(webhooks[0].secret).toBe('***');
expect(webhooks[1].secret).toBe('***');
// Webhook without a secret should remain unaffected
expect(webhooks[2].secret).toBeUndefined();
});
it('redacts automation gmail credentials_file and token_file', () => {
const result = redactConfig(makeFullConfig() as any);
const gmail = (result.automation as any).gmail;
expect(gmail.credentials_file).toBe('***');
expect(gmail.token_file).toBe('***');
});
it('redacts all MCP server env vars', () => {
const result = redactConfig(makeFullConfig() as any);
const servers = (result.mcp as any).servers;
expect(servers[0].env.API_KEY).toBe('***');
expect(servers[0].env.DATABASE_URL).toBe('***');
// Server without env should be unaffected
expect(servers[1].env).toBeUndefined();
});
it('preserves non-secret fields', () => {
const result = redactConfig(makeFullConfig() as any);
// telegram
expect((result.telegram as any).allowed_chat_ids).toEqual([1]);
expect((result.telegram as any).require_mention).toBe(true);
// discord
expect((result.discord as any).allowed_guild_ids).toEqual(['g1']);
// slack
expect((result.slack as any).allowed_channel_ids).toEqual([]);
// server
expect((result.server as any).port).toBe(18800);
expect((result.server as any).tailscale_only).toBe(true);
// models
expect((result.models as any).default.provider).toBe('anthropic');
expect((result.models as any).default.model).toBe('claude');
expect((result.models as any).fallback_chain).toEqual(['anthropic']);
// web_search
expect((result.web_search as any).provider).toBe('brave');
expect((result.web_search as any).max_results).toBe(5);
// audio
expect((result.audio as any).transcription_model).toBe('whisper-1');
// memory
expect((result.memory as any).embedding.model).toBe('text-embedding-3-small');
// hooks
expect((result.hooks as any).confirm).toEqual(['shell.exec']);
// mcp
expect((result.mcp as any).servers[0].name).toBe('my-server');
expect((result.mcp as any).servers[0].command).toBe('node');
});
it('handles missing optional sections gracefully', () => {
const minimal = {
telegram: { bot_token: 'tok', allowed_chat_ids: [1] },
models: { default: { provider: 'anthropic' as const, model: 'claude' }, fallback_chain: [] },
server: { port: 18800 },
hooks: { confirm: [], log: [], silent: [] },
};
// Should not throw even when discord, slack, automation, mcp, etc. are absent
const result = redactConfig(minimal as any);
expect((result.telegram as any).bot_token).toBe('***');
expect(result.discord).toBeUndefined();
expect(result.slack).toBeUndefined();
expect(result.automation).toBeUndefined();
});
it('does not mutate the original config object', () => {
const config = makeFullConfig();
redactConfig(config as any);
// Original secrets should still be intact
expect(config.telegram.bot_token).toBe('tg-secret');
expect(config.models.default.api_key).toBe('sk-def');
expect(config.server.token).toBe('bearer-secret');
});
});
+1 -1
View File
@@ -1,5 +1,5 @@
export { createSystemHandlers } from './system.js';
export type { SystemHandlerDeps } from './system.js';
export type { SystemHandlerDeps, TokenUsageEntry } from './system.js';
export { createSessionHandlers } from './sessions.js';
export type { SessionHandlerDeps } from './sessions.js';
export { createToolHandlers } from './tools.js';
+15
View File
@@ -1,6 +1,14 @@
import type { GatewayRequest, OutboundMessage } from '../protocol.js';
import { makeResponse, makeError, ErrorCode } from '../protocol.js';
/** Per-session token usage report returned by system.tokenUsage. */
export interface TokenUsageEntry {
sessionId: string;
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}
export interface SystemHandlerDeps {
startTime: number;
version: string;
@@ -11,6 +19,8 @@ export interface SystemHandlerDeps {
restart?: () => Promise<void>;
getChannels?: () => Array<{ name: string; status: string }>;
getUsage?: () => { totalSessions: number; activeConnections: number };
/** Optional callback to retrieve per-session token usage data. */
getTokenUsage?: () => TokenUsageEntry[];
}
export function createSystemHandlers(deps: SystemHandlerDeps) {
@@ -60,5 +70,10 @@ export function createSystemHandlers(deps: SystemHandlerDeps) {
tools: deps.getToolCount(),
});
},
'system.tokenUsage': async (request: GatewayRequest): Promise<OutboundMessage> => {
const sessions = deps.getTokenUsage?.() ?? [];
return makeResponse(request.id, { sessions });
},
};
}
+1
View File
@@ -4,6 +4,7 @@ export { Router } from './router.js';
export type { HandlerFn, SendFn } from './router.js';
export { SessionBridge } from './session-bridge.js';
export type { SessionBridgeConfig } from './session-bridge.js';
export { LaneQueue } from './lane-queue.js';
export { authenticateRequest } from './auth.js';
export type { AuthConfig, AuthResult } from './auth.js';
export { serveStatic } from './static.js';
+194
View File
@@ -0,0 +1,194 @@
import { describe, it, expect } from 'vitest';
import { LaneQueue } from './lane-queue.js';
describe('LaneQueue', () => {
it('executes a single item immediately', async () => {
const queue = new LaneQueue();
const result = await queue.enqueue('lane-a', async () => 42);
expect(result).toBe(42);
});
it('serialises items within the same lane', async () => {
const queue = new LaneQueue();
const order: number[] = [];
// Create a deferred to control timing
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
order.push(1);
await firstBlocks;
order.push(2);
return 'first';
});
const p2 = queue.enqueue('lane-a', async () => {
order.push(3);
return 'second';
});
const p3 = queue.enqueue('lane-a', async () => {
order.push(4);
return 'third';
});
// Only item 1 should have started
expect(order).toEqual([1]);
expect(queue.queueLength('lane-a')).toBe(2);
expect(queue.isProcessing('lane-a')).toBe(true);
// Release the first item
resolveFirst();
const results = await Promise.all([p1, p2, p3]);
expect(results).toEqual(['first', 'second', 'third']);
expect(order).toEqual([1, 2, 3, 4]);
});
it('runs independent lanes in parallel', async () => {
const queue = new LaneQueue();
const running: string[] = [];
let resolveA!: () => void;
const blocksA = new Promise<void>((r) => { resolveA = r; });
let resolveB!: () => void;
const blocksB = new Promise<void>((r) => { resolveB = r; });
const pA = queue.enqueue('lane-a', async () => {
running.push('a-start');
await blocksA;
running.push('a-end');
return 'A';
});
const pB = queue.enqueue('lane-b', async () => {
running.push('b-start');
await blocksB;
running.push('b-end');
return 'B';
});
// Both should have started concurrently
// Wait a tick for async execution
await new Promise<void>((r) => queueMicrotask(r));
expect(running).toContain('a-start');
expect(running).toContain('b-start');
resolveA();
resolveB();
const [rA, rB] = await Promise.all([pA, pB]);
expect(rA).toBe('A');
expect(rB).toBe('B');
});
it('error in one item does not block the next', async () => {
const queue = new LaneQueue();
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
throw new Error('boom');
});
const p2 = queue.enqueue('lane-a', async () => 'recovered');
resolveFirst();
await expect(p1).rejects.toThrow('boom');
const result = await p2;
expect(result).toBe('recovered');
});
it('cancel rejects pending items but does not affect active', async () => {
const queue = new LaneQueue();
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
return 'active';
});
const p2 = queue.enqueue('lane-a', async () => 'pending-1');
const p3 = queue.enqueue('lane-a', async () => 'pending-2');
expect(queue.queueLength('lane-a')).toBe(2);
// Cancel pending items
queue.cancel('lane-a');
expect(queue.queueLength('lane-a')).toBe(0);
// Active work should still complete
resolveFirst();
const result = await p1;
expect(result).toBe('active');
// Pending items should have been rejected
await expect(p2).rejects.toThrow('Lane cancelled');
await expect(p3).rejects.toThrow('Lane cancelled');
});
it('reports queue length correctly', async () => {
const queue = new LaneQueue();
expect(queue.queueLength('lane-a')).toBe(0);
expect(queue.isProcessing('lane-a')).toBe(false);
let resolveFirst!: () => void;
const firstBlocks = new Promise<void>((r) => { resolveFirst = r; });
const p1 = queue.enqueue('lane-a', async () => {
await firstBlocks;
return 'done';
});
// Active work, no pending
expect(queue.isProcessing('lane-a')).toBe(true);
expect(queue.queueLength('lane-a')).toBe(0);
const p2 = queue.enqueue('lane-a', async () => 'queued-1');
expect(queue.queueLength('lane-a')).toBe(1);
const p3 = queue.enqueue('lane-a', async () => 'queued-2');
expect(queue.queueLength('lane-a')).toBe(2);
resolveFirst();
await Promise.all([p1, p2, p3]);
// After all done, lane should be cleaned up
expect(queue.isProcessing('lane-a')).toBe(false);
expect(queue.queueLength('lane-a')).toBe(0);
});
it('cleans up empty lanes after completion', async () => {
const queue = new LaneQueue();
await queue.enqueue('lane-a', async () => 'done');
// Lane should be cleaned up (isProcessing returns false, queueLength 0)
expect(queue.isProcessing('lane-a')).toBe(false);
expect(queue.queueLength('lane-a')).toBe(0);
});
it('cancel on non-existent lane is a no-op', () => {
const queue = new LaneQueue();
// Should not throw
queue.cancel('no-such-lane');
expect(queue.queueLength('no-such-lane')).toBe(0);
});
it('can enqueue new work after a lane completes', async () => {
const queue = new LaneQueue();
const r1 = await queue.enqueue('lane-a', async () => 'first');
expect(r1).toBe('first');
const r2 = await queue.enqueue('lane-a', async () => 'second');
expect(r2).toBe('second');
});
});
+114
View File
@@ -0,0 +1,114 @@
/**
* LaneQueue — per-lane FIFO queue for serialising async work.
*
* Each "lane" (keyed by session ID) processes work items one at a time.
* If a lane is idle, work starts immediately. If it's busy, the work
* is queued and a promise is returned that resolves when it's this
* entry's turn to execute.
*
* Independent lanes run in parallel — only items within the same lane
* are serialised.
*/
interface QueueEntry<T = unknown> {
work: () => Promise<T>;
resolve: (value: T) => void;
reject: (reason: unknown) => void;
}
interface Lane {
active: boolean;
queue: QueueEntry[];
}
export class LaneQueue {
private lanes: Map<string, Lane> = new Map();
/**
* Enqueue a unit of work for the given lane.
* Returns a promise that resolves with the work's return value
* once it has been executed (which may be immediately if the lane is idle).
*/
async enqueue<T>(laneId: string, work: () => Promise<T>): Promise<T> {
let lane = this.lanes.get(laneId);
if (!lane) {
lane = { active: false, queue: [] };
this.lanes.set(laneId, lane);
}
// If nothing is running on this lane, execute immediately
if (!lane.active) {
lane.active = true;
try {
return await work();
} finally {
lane.active = false;
this.processNext(laneId);
}
}
// Otherwise, queue the work and return a deferred promise
return new Promise<T>((resolve, reject) => {
lane!.queue.push({
work: work as () => Promise<unknown>,
resolve: resolve as (value: unknown) => void,
reject,
});
});
}
/** Check if a lane currently has active work executing. */
isProcessing(laneId: string): boolean {
return this.lanes.get(laneId)?.active ?? false;
}
/** Get the number of pending (not yet started) items in a lane. */
queueLength(laneId: string): number {
return this.lanes.get(laneId)?.queue.length ?? 0;
}
/**
* Cancel all pending entries in a lane.
* Active work is NOT interrupted — only queued items are rejected.
* Rejected promises receive an Error with message "Lane cancelled".
*/
cancel(laneId: string): void {
const lane = this.lanes.get(laneId);
if (!lane) return;
const pending = lane.queue.splice(0);
for (const entry of pending) {
entry.reject(new Error('Lane cancelled'));
}
// Clean up empty idle lanes
if (!lane.active && lane.queue.length === 0) {
this.lanes.delete(laneId);
}
}
/**
* Process the next queued entry for a lane (called after current work finishes).
* Runs asynchronously so the caller's finally block completes first.
*/
private processNext(laneId: string): void {
const lane = this.lanes.get(laneId);
if (!lane) return;
const entry = lane.queue.shift();
if (!entry) {
// Lane is empty — clean up
this.lanes.delete(laneId);
return;
}
lane.active = true;
entry.work()
.then((value) => entry.resolve(value))
.catch((err) => entry.reject(err))
.finally(() => {
lane.active = false;
this.processNext(laneId);
});
}
}
+8
View File
@@ -5,6 +5,7 @@ import { Router } from './router.js';
import { serveStatic } from './static.js';
import { SessionBridge } from './session-bridge.js';
import type { SessionBridgeConfig } from './session-bridge.js';
import { LaneQueue } from './lane-queue.js';
import { authenticateRequest } from './auth.js';
import type { AuthConfig } from './auth.js';
import {
@@ -20,6 +21,7 @@ import {
createAgentHandlers,
createConfigHandlers,
} from './handlers/index.js';
import type { TokenUsageEntry } from './handlers/system.js';
import type { SessionManager } from '../session/manager.js';
import type { Config } from '../config/index.js';
import type { ToolRegistry } from '../tools/registry.js';
@@ -48,6 +50,8 @@ export interface GatewayServerConfig {
webhookHandler?: WebhookHandler;
/** Optional Gmail handler for Pub/Sub push notifications. */
gmailHandler?: GmailWatcher;
/** Optional callback to retrieve per-session token usage data for the dashboard. */
getTokenUsage?: () => TokenUsageEntry[];
}
export class GatewayServer {
@@ -55,6 +59,7 @@ export class GatewayServer {
private httpServer: HttpServer | null = null;
private router: Router;
private sessionBridge: SessionBridge;
private laneQueue: LaneQueue;
private connectionMap: Map<WebSocket, string> = new Map();
private config: GatewayServerConfig;
private startTime: number = Date.now();
@@ -70,6 +75,7 @@ export class GatewayServer {
toolExecutor: config.toolExecutor,
});
this.laneQueue = new LaneQueue();
this.router = new Router();
this.registerHandlers();
}
@@ -89,6 +95,7 @@ export class GatewayServer {
totalSessions: this.config.sessionManager.listSessions().length,
activeConnections: this.sessionBridge.connectionCount,
}),
getTokenUsage: this.config.getTokenUsage,
});
const sessionHandlers = createSessionHandlers({
@@ -103,6 +110,7 @@ export class GatewayServer {
const agentHandlers = createAgentHandlers({
sessionBridge: this.sessionBridge,
laneQueue: this.laneQueue,
});
// Config handlers (only if config object is provided)
+44
View File
@@ -117,6 +117,50 @@ export class SessionBridge {
return this.clients.size;
}
/** Get usage stats for a specific connection's agent. */
getUsage(connectionId: string): { inputTokens: number; outputTokens: number; calls: number } | undefined {
const agent = this.clients.get(connectionId)?.agent;
return agent?.getUsage();
}
/** Get usage stats for all active sessions. Returns an array of per-session usage entries. */
getAllUsage(): Array<{
sessionId: string;
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}> {
const results: Array<{
sessionId: string;
primary: { inputTokens: number; outputTokens: number; calls: number };
delegation: Record<string, { inputTokens: number; outputTokens: number; calls: number }>;
total: { inputTokens: number; outputTokens: number; calls: number; estimatedCost: number };
}> = [];
// De-duplicate by sessionId (multiple connections may share a session)
const seen = new Set<string>();
for (const client of this.clients.values()) {
if (seen.has(client.sessionId)) continue;
seen.add(client.sessionId);
const usage = client.agent.getUsage();
results.push({
sessionId: client.sessionId,
primary: { inputTokens: usage.inputTokens, outputTokens: usage.outputTokens, calls: usage.calls },
delegation: {} as Record<string, { inputTokens: number; outputTokens: number; calls: number }>,
total: {
inputTokens: usage.inputTokens,
outputTokens: usage.outputTokens,
calls: usage.calls,
estimatedCost: 0, // NativeAgent doesn't track cost; only AgentOrchestrator does
},
});
}
return results;
}
private getOrCreateAgent(sessionId: string): NativeAgent {
let agent = this.agents.get(sessionId);
if (!agent) {
+5
View File
@@ -25,6 +25,9 @@
<a href="#/sessions" class="nav-link" data-page="sessions">
<span class="nav-icon">&#9776;</span> Sessions
</a>
<a href="#/usage" class="nav-link" data-page="usage">
<span class="nav-icon">&#9733;</span> Usage
</a>
<a href="#/settings" class="nav-link" data-page="settings">
<span class="nav-icon">&#9881;</span> Settings
</a>
@@ -42,11 +45,13 @@
import { DashboardPage } from './pages/dashboard.js';
import { ChatPage } from './pages/chat.js';
import { SessionsPage } from './pages/sessions.js';
import { UsagePage } from './pages/usage.js';
import { SettingsPage } from './pages/settings.js';
registerPage('/', DashboardPage);
registerPage('/chat', ChatPage);
registerPage('/sessions', SessionsPage);
registerPage('/usage', UsagePage);
registerPage('/settings', SettingsPage);
initStatusIndicator();
+170
View File
@@ -0,0 +1,170 @@
/**
* Flynn Token Usage Page
*
* Shows per-session token usage breakdown including input/output tokens,
* API calls, estimated cost, and delegation details.
* Auto-refreshes every 30 seconds.
*/
let _timer = null;
function formatNumber(n) {
return (n ?? 0).toLocaleString();
}
function formatCost(n) {
if (!n || n === 0) return '$0.00';
if (n < 0.01) return `$${n.toFixed(4)}`;
return `$${n.toFixed(2)}`;
}
function truncateId(id) {
if (!id) return '-';
if (id.length <= 24) return id;
return id.slice(0, 24) + '\u2026';
}
async function loadUsage(el, client) {
let data;
try {
data = await client.call('system.tokenUsage');
} catch (err) {
el.innerHTML = `<div class="empty-state">Failed to load usage: ${err.message}</div>`;
return;
}
const sessions = data?.sessions ?? [];
// Compute totals across all sessions
let totalInput = 0;
let totalOutput = 0;
let totalCalls = 0;
let totalCost = 0;
for (const s of sessions) {
totalInput += s.total?.inputTokens ?? 0;
totalOutput += s.total?.outputTokens ?? 0;
totalCalls += s.total?.calls ?? 0;
totalCost += s.total?.estimatedCost ?? 0;
}
// Summary cards
const summaryHtml = `
<div class="stats-grid">
<div class="stat-card">
<div class="stat-label">Total Input Tokens</div>
<div class="stat-value">${formatNumber(totalInput)}</div>
</div>
<div class="stat-card">
<div class="stat-label">Total Output Tokens</div>
<div class="stat-value">${formatNumber(totalOutput)}</div>
</div>
<div class="stat-card">
<div class="stat-label">Total Tokens</div>
<div class="stat-value">${formatNumber(totalInput + totalOutput)}</div>
</div>
<div class="stat-card">
<div class="stat-label">API Calls</div>
<div class="stat-value">${formatNumber(totalCalls)}</div>
</div>
<div class="stat-card">
<div class="stat-label">Estimated Cost</div>
<div class="stat-value">${formatCost(totalCost)}</div>
</div>
<div class="stat-card">
<div class="stat-label">Active Sessions</div>
<div class="stat-value">${sessions.length}</div>
</div>
</div>
`;
// Per-session table
let tableHtml = '';
if (sessions.length === 0) {
tableHtml = '<div class="empty-state">No active sessions with usage data</div>';
} else {
const rows = sessions.map(s => {
const inTok = s.total?.inputTokens ?? 0;
const outTok = s.total?.outputTokens ?? 0;
const calls = s.total?.calls ?? 0;
const cost = s.total?.estimatedCost ?? 0;
// Build delegation breakdown if present
const delegationEntries = Object.entries(s.delegation ?? {});
let delegationCell = '<span class="text-muted">-</span>';
if (delegationEntries.length > 0) {
delegationCell = delegationEntries.map(([tier, stats]) =>
`<span class="badge ok">${tier}</span> ${formatNumber(stats.inputTokens)}/${formatNumber(stats.outputTokens)}`
).join('<br>');
}
return `
<tr>
<td title="${s.sessionId}">${truncateId(s.sessionId)}</td>
<td>${formatNumber(inTok)}</td>
<td>${formatNumber(outTok)}</td>
<td>${formatNumber(inTok + outTok)}</td>
<td>${formatNumber(calls)}</td>
<td>${formatCost(cost)}</td>
<td>${delegationCell}</td>
</tr>
`;
}).join('');
tableHtml = `
<table>
<thead>
<tr>
<th>Session</th>
<th>Input</th>
<th>Output</th>
<th>Total</th>
<th>Calls</th>
<th>Cost</th>
<th>Delegation</th>
</tr>
</thead>
<tbody>
${rows}
</tbody>
</table>
`;
}
el.innerHTML = `
<div class="usage-header">
<h1 class="page-title">Token Usage</h1>
<button class="btn btn-secondary" id="usage-refresh-btn">Refresh</button>
</div>
${summaryHtml}
<h2 class="section-title">Per-Session Breakdown</h2>
${tableHtml}
`;
// Wire up refresh button
const refreshBtn = el.querySelector('#usage-refresh-btn');
if (refreshBtn) {
refreshBtn.addEventListener('click', () => {
loadUsage(el, client).catch(() => {});
});
}
}
export const UsagePage = {
async render(el, client) {
await loadUsage(el, client);
// Auto-refresh every 30 seconds
_timer = setInterval(() => {
loadUsage(el, client).catch(() => {});
}, 30000);
},
teardown() {
if (_timer) {
clearInterval(_timer);
_timer = null;
}
},
};
+19
View File
@@ -741,6 +741,25 @@ header #status.status-ok {
margin-top: 24px;
}
/* ── Usage Page Header ─────────────────────────────────────── */
.usage-header {
display: flex;
align-items: center;
justify-content: space-between;
}
.usage-header .page-title {
margin-bottom: 0;
padding-bottom: 0;
border-bottom: none;
flex: 1;
}
.usage-header .btn {
flex-shrink: 0;
}
/* ── Data Tables ────────────────────────────────────────────── */
table {
+70
View File
@@ -5,6 +5,7 @@ import {
GeminiEmbeddingProvider,
OllamaEmbeddingProvider,
LlamaCppEmbeddingProvider,
VoyageAIEmbeddingProvider,
} from './embeddings.js';
import type { EmbeddingConfig } from '../config/schema.js';
@@ -39,6 +40,11 @@ describe('createEmbeddingProvider', () => {
expect(provider).toBeInstanceOf(LlamaCppEmbeddingProvider);
});
it('creates Voyage provider', () => {
const provider = createEmbeddingProvider({ ...baseConfig, provider: 'voyage' });
expect(provider).toBeInstanceOf(VoyageAIEmbeddingProvider);
});
it('throws on unknown provider', () => {
expect(() => createEmbeddingProvider({ ...baseConfig, provider: 'unknown' as never })).toThrow('Unknown embedding provider');
});
@@ -157,3 +163,67 @@ describe('LlamaCppEmbeddingProvider', () => {
expect(provider.dimensions).toBe(768);
});
});
describe('VoyageAIEmbeddingProvider', () => {
it('defaults to 1024 dimensions', () => {
const config: EmbeddingConfig = {
enabled: true,
provider: 'voyage',
model: 'voyage-3',
chunk_size: 512,
chunk_overlap: 50,
top_k: 5,
hybrid_weight: 0.7,
};
const provider = new VoyageAIEmbeddingProvider(config);
expect(provider.dimensions).toBe(1024);
});
it('reports configured dimensions', () => {
const config: EmbeddingConfig = {
enabled: true,
provider: 'voyage',
model: 'voyage-3-lite',
dimensions: 512,
chunk_size: 512,
chunk_overlap: 50,
top_k: 5,
hybrid_weight: 0.7,
};
const provider = new VoyageAIEmbeddingProvider(config);
expect(provider.dimensions).toBe(512);
});
it('uses custom endpoint if provided', () => {
const config: EmbeddingConfig = {
enabled: true,
provider: 'voyage',
model: 'voyage-3',
endpoint: 'https://custom.proxy.example.com/v1',
api_key: 'test-key',
chunk_size: 512,
chunk_overlap: 50,
top_k: 5,
hybrid_weight: 0.7,
};
// Should not throw when constructing with custom endpoint
const provider = new VoyageAIEmbeddingProvider(config);
expect(provider.dimensions).toBe(1024);
});
it('uses api_key from config', () => {
const config: EmbeddingConfig = {
enabled: true,
provider: 'voyage',
model: 'voyage-3',
api_key: 'voy-test-key-123',
chunk_size: 512,
chunk_overlap: 50,
top_k: 5,
hybrid_weight: 0.7,
};
// Should construct without error when api_key is provided
const provider = new VoyageAIEmbeddingProvider(config);
expect(provider.dimensions).toBe(1024);
});
});
+44
View File
@@ -159,6 +159,48 @@ export class LlamaCppEmbeddingProvider implements EmbeddingProvider {
}
}
// ---------------------------------------------------------------------------
// Voyage AI
// ---------------------------------------------------------------------------
export class VoyageAIEmbeddingProvider implements EmbeddingProvider {
private _model: string;
private _dimensions: number;
private _apiKey: string;
private _endpoint: string;
constructor(config: EmbeddingConfig) {
this._model = config.model;
this._dimensions = config.dimensions ?? 1024;
this._apiKey = config.api_key ?? process.env.VOYAGE_API_KEY ?? '';
this._endpoint = config.endpoint ?? 'https://api.voyageai.com/v1';
}
get dimensions(): number {
return this._dimensions;
}
async embed(texts: string[]): Promise<number[][]> {
// Voyage AI's API is OpenAI-compatible for embeddings
const { default: OpenAI } = await import('openai');
const client = new OpenAI({
apiKey: this._apiKey,
baseURL: this._endpoint,
});
const response = await client.embeddings.create({
model: this._model,
input: texts,
// Note: Voyage AI does not support the `dimensions` parameter.
// Dimensions are model-dependent (voyage-3: 1024, voyage-3-lite: 512, voyage-code-3: 1024).
});
// Sort by index to ensure order matches input
const sorted = response.data.sort((a, b) => a.index - b.index);
return sorted.map((item) => item.embedding);
}
}
// ---------------------------------------------------------------------------
// Factory
// ---------------------------------------------------------------------------
@@ -176,6 +218,8 @@ export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvi
return new OllamaEmbeddingProvider(config);
case 'llamacpp':
return new LlamaCppEmbeddingProvider(config);
case 'voyage':
return new VoyageAIEmbeddingProvider(config);
default:
throw new Error(`Unknown embedding provider: ${(config as Record<string, unknown>).provider}`);
}
+6
View File
@@ -21,6 +21,12 @@ export const MODEL_COSTS_PER_MILLION: Record<string, { input: number; output: nu
'claude-haiku-4': { input: 0, output: 0 },
// Local / unknown models
'default': { input: 0, output: 0 },
// xAI (Grok)
'grok-3': { input: 3, output: 15 },
'grok-3-mini': { input: 0.30, output: 0.50 },
'grok-2': { input: 2, output: 10 },
'grok-2-mini': { input: 0.10, output: 0.25 },
'grok-3-fast': { input: 5, output: 25 },
// Bedrock (Meta Llama)
'meta.llama3-1-70b-instruct-v1:0': { input: 0.72, output: 0.72 },
'meta.llama3-1-8b-instruct-v1:0': { input: 0.22, output: 0.22 },
+1 -1
View File
@@ -60,7 +60,7 @@ export async function withRetry<T>(
const delay = Math.min(baseDelay, config.maxDelayMs);
const jitter = delay * (0.5 + Math.random() * 0.5); // 50-100% of delay
console.warn(
console.debug(
`[retry] ${label ?? 'operation'} attempt ${attempt + 1}/${config.maxRetries} failed: ${lastError.message}. Retrying in ${Math.round(jitter)}ms...`,
);
+10 -10
View File
@@ -76,7 +76,7 @@ export class ModelRouter implements ModelClient {
return await primaryClient.chat(request);
} catch (error) {
errors.push(error instanceof Error ? error : new Error(String(error)));
console.warn(`Primary model failed: ${errors[0].message}`);
console.debug(`Primary model failed: ${errors[0].message}`);
}
// Try tier-specific fallbacks first
@@ -84,12 +84,12 @@ export class ModelRouter implements ModelClient {
for (let i = 0; i < tierFallbackList.length; i++) {
try {
const reason = `Primary model failed (${errors[0].message}), using tier fallback #${i + 1}`;
console.warn(reason);
console.debug(reason);
const response = await tierFallbackList[i].chat(request);
return { ...response, fallback: true, fallbackReason: reason };
} catch (error) {
errors.push(error instanceof Error ? error : new Error(String(error)));
console.warn(`Tier fallback #${i + 1} failed: ${errors[errors.length - 1].message}`);
console.debug(`Tier fallback #${i + 1} failed: ${errors[errors.length - 1].message}`);
}
}
@@ -98,12 +98,12 @@ export class ModelRouter implements ModelClient {
const fallbackClient = this.fallbackChain[i];
try {
const reason = `Primary model failed (${errors[0].message}), using global fallback #${i + 1}`;
console.warn(reason);
console.debug(reason);
const response = await fallbackClient.chat(request);
return { ...response, fallback: true, fallbackReason: reason };
} catch (error) {
errors.push(error instanceof Error ? error : new Error(String(error)));
console.warn(`Global fallback #${i + 1} failed: ${errors[errors.length - 1].message}`);
console.debug(`Global fallback #${i + 1} failed: ${errors[errors.length - 1].message}`);
}
}
@@ -121,7 +121,7 @@ export class ModelRouter implements ModelClient {
if (event.type === 'error') {
hasError = true;
primaryError = event.error?.message ?? 'Unknown error';
console.warn(`Primary stream failed: ${primaryError}`);
console.debug(`Primary stream failed: ${primaryError}`);
break;
}
yield event;
@@ -139,14 +139,14 @@ export class ModelRouter implements ModelClient {
if (!fallbackClient.chatStream) continue;
const reason = `Primary model failed (${primaryError}), using tier fallback #${i + 1}`;
console.warn(reason);
console.debug(reason);
yield { type: 'fallback_warning', fallbackReason: reason };
let hasError = false;
for await (const event of fallbackClient.chatStream(request)) {
if (event.type === 'error') {
hasError = true;
console.warn(`Tier fallback stream #${i + 1} failed: ${event.error?.message}`);
console.debug(`Tier fallback stream #${i + 1} failed: ${event.error?.message}`);
break;
}
yield event;
@@ -161,14 +161,14 @@ export class ModelRouter implements ModelClient {
if (!fallbackClient.chatStream) continue;
const reason = `Primary model failed (${primaryError}), using global fallback #${i + 1}`;
console.warn(reason);
console.debug(reason);
yield { type: 'fallback_warning', fallbackReason: reason };
let hasError = false;
for await (const event of fallbackClient.chatStream(request)) {
if (event.type === 'error') {
hasError = true;
console.warn(`Global fallback stream #${i + 1} failed: ${event.error?.message}`);
console.debug(`Global fallback stream #${i + 1} failed: ${event.error?.message}`);
break;
}
yield event;