Files
flynn/src/gateway/handlers/agent.ts
T

211 lines
9.0 KiB
TypeScript

import type { GatewayRequest, GatewayAttachment, OutboundMessage } from '../protocol.js';
import type { SendFn } from '../router.js';
import { makeEvent, makeError, ErrorCode } from '../protocol.js';
import type { SessionBridge } from '../session-bridge.js';
import type { LaneQueue } from '../lane-queue.js';
import type { MetricsCollector } from '../metrics.js';
import type { Attachment } from '../../channels/types.js';
import type { SessionManager } from '../../session/manager.js';
import type { ModelTier } from '../../models/router.js';
import type { CommandRegistry } from '../../commands/index.js';
export interface AgentHandlerDeps {
sessionBridge: SessionBridge;
laneQueue: LaneQueue;
metrics?: MetricsCollector;
sessionManager?: SessionManager;
commandRegistry?: CommandRegistry;
}
export function createAgentHandlers(deps: AgentHandlerDeps) {
return {
'agent.send': async (request: GatewayRequest, send: SendFn): Promise<OutboundMessage | void> => {
const params = request.params as { message?: string; connectionId?: string; attachments?: GatewayAttachment[]; metadata?: { isCommand?: boolean; command?: string; commandArgs?: string } } | undefined;
if (!params?.message && !params?.metadata?.isCommand) {
return makeError(request.id, ErrorCode.InvalidRequest, 'message is required');
}
const connectionId = params.connectionId as string;
if (!connectionId) {
return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required (set by server)');
}
const agent = deps.sessionBridge.getAgent(connectionId);
if (!agent) {
return makeError(request.id, ErrorCode.SessionNotFound, 'No agent for this connection');
}
// Queue by session ID so multiple connections sharing a session are serialised.
// Falls back to connectionId if session lookup fails (shouldn't happen).
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const laneId = sessionId ?? connectionId;
// Enqueue the work — if the lane is idle it runs immediately,
// otherwise it waits for earlier requests on the same session to finish.
const requestId = request.id.toString();
deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' });
return deps.laneQueue.enqueue(laneId, async () => {
deps.sessionBridge.setBusy(connectionId, true);
const commandInput = params.metadata?.isCommand && typeof params.metadata.command === 'string'
? `/${params.metadata.command}${params.metadata.commandArgs ? ` ${params.metadata.commandArgs}` : ''}`
: params.message;
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) {
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const commandResult = await deps.commandRegistry.execute(commandInput, {
channel: 'ws',
senderId: connectionId,
sessionId: sessionId ?? `ws:${connectionId}`,
rawInput: commandInput,
services: {
getStatus: () => `Gateway session active. Current model tier: ${agent.getModelTier()}`,
getUsage: () => {
const usage = agent.getUsage();
const lines = [
'**Token Usage**',
'',
`Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`,
];
const delegationEntries = Object.entries(usage.delegation);
if (delegationEntries.length > 0) {
lines.push('');
lines.push('Delegation:');
for (const [tier, stats] of delegationEntries) {
lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`);
}
}
lines.push('');
lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`);
if (usage.total.estimatedCost > 0) {
lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`);
}
return lines.join('\n');
},
getModel: () => `Current model tier: ${agent.getModelTier()}`,
setModel: (tier) => {
const validTiers: ModelTier[] = ['fast', 'default', 'complex', 'local'];
const modelTier = tier as ModelTier;
if (!validTiers.includes(modelTier)) {
return `Invalid tier: ${tier}. Available: ${validTiers.join(', ')}`;
}
agent.setModelTier(modelTier);
if (sessionId && deps.sessionManager) {
deps.sessionManager.setSessionConfig('ws', sessionId, 'modelTier', modelTier);
}
return `Switched to model tier: ${modelTier}`;
},
compact: async () => {
const result = await agent.compact();
if (result && result.compactedCount > 0) {
return `Compacted ${result.compactedCount} messages: ${result.tokensBefore}${result.tokensAfter} tokens`;
}
return 'Nothing to compact.';
},
reset: () => {
agent.reset();
if (sessionId && deps.sessionManager) {
deps.sessionManager.deleteSessionConfig('ws', sessionId, 'modelTier');
}
return 'Session reset.';
},
},
});
if (commandResult.handled) {
send(makeEvent(request.id, 'done', { content: commandResult.text }));
return;
}
}
// Set up tool use callback to emit streaming events
deps.sessionBridge.setOnToolUse(connectionId, (event) => {
if (event.type === 'start') {
send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args }));
} else if (event.type === 'end') {
send(makeEvent(request.id, 'tool_end', {
tool: event.tool,
result: event.result ? {
success: event.result.success,
output: event.result.output,
error: event.result.error,
} : undefined,
}));
// Record tool failures as error events
if (event.result && !event.result.success) {
deps.metrics?.incrementErrors();
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'error',
source: 'tool',
message: `Tool '${event.tool}' failed: ${event.result.error ?? 'unknown error'}`,
context: { sessionId: laneId, tool: event.tool },
});
}
}
});
try {
// Convert gateway attachments to channel attachments
const attachments: Attachment[] | undefined = params.attachments?.map(a => ({
mimeType: a.mimeType,
data: a.data,
url: a.url,
filename: a.filename,
}));
const response = await agent.process(params.message!, attachments);
deps.metrics?.incrementMessages();
send(makeEvent(request.id, 'done', { content: response }));
} catch (err) {
const message = err instanceof Error ? err.message : 'Unknown error';
deps.metrics?.incrementErrors();
deps.metrics?.recordEvent({
timestamp: Date.now(),
level: 'error',
source: 'agent.send',
message,
context: { sessionId: laneId },
});
send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message }));
} finally {
deps.sessionBridge.setBusy(connectionId, false);
deps.sessionBridge.setOnToolUse(connectionId, undefined);
deps.metrics?.endRequest(requestId);
}
});
},
'agent.cancel': async (request: GatewayRequest): Promise<OutboundMessage> => {
const params = request.params as { connectionId?: string } | undefined;
const connectionId = params?.connectionId as string;
if (!connectionId) {
return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required');
}
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const laneId = sessionId ?? connectionId;
// Clear any queued (not-yet-started) work first.
deps.laneQueue.cancel(laneId);
const cancelled = deps.sessionBridge.cancel(connectionId);
return {
id: request.id,
result: {
cancelled,
message: cancelled
? 'Cancellation requested. The active operation will stop at the next safe point.'
: 'No active operation to cancel.',
},
};
},
};
}