import type { GatewayRequest, GatewayAttachment, OutboundMessage } from '../protocol.js'; import type { SendFn } from '../router.js'; import { makeEvent, makeError, ErrorCode } from '../protocol.js'; import type { SessionBridge } from '../session-bridge.js'; import type { LaneQueue } from '../lane-queue.js'; import type { MetricsCollector } from '../metrics.js'; import type { Attachment } from '../../channels/types.js'; import type { SessionManager } from '../../session/manager.js'; import type { ModelTier } from '../../models/router.js'; import type { CommandRegistry } from '../../commands/index.js'; export interface AgentHandlerDeps { sessionBridge: SessionBridge; laneQueue: LaneQueue; metrics?: MetricsCollector; sessionManager?: SessionManager; commandRegistry?: CommandRegistry; } export function createAgentHandlers(deps: AgentHandlerDeps) { return { 'agent.send': async (request: GatewayRequest, send: SendFn): Promise => { const params = request.params as { message?: string; connectionId?: string; attachments?: GatewayAttachment[]; metadata?: { isCommand?: boolean; command?: string; commandArgs?: string } } | undefined; if (!params?.message && !params?.metadata?.isCommand) { return makeError(request.id, ErrorCode.InvalidRequest, 'message is required'); } const connectionId = params.connectionId as string; if (!connectionId) { return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required (set by server)'); } const agent = deps.sessionBridge.getAgent(connectionId); if (!agent) { return makeError(request.id, ErrorCode.SessionNotFound, 'No agent for this connection'); } // Queue by session ID so multiple connections sharing a session are serialised. // Falls back to connectionId if session lookup fails (shouldn't happen). const sessionId = deps.sessionBridge.getSessionId(connectionId); const laneId = sessionId ?? connectionId; // Enqueue the work — if the lane is idle it runs immediately, // otherwise it waits for earlier requests on the same session to finish. const requestId = request.id.toString(); deps.metrics?.startRequest(requestId, { sessionId: laneId, channel: 'ws' }); return deps.laneQueue.enqueue(laneId, async () => { deps.sessionBridge.setBusy(connectionId, true); const commandInput = params.metadata?.isCommand && typeof params.metadata.command === 'string' ? `/${params.metadata.command}${params.metadata.commandArgs ? ` ${params.metadata.commandArgs}` : ''}` : params.message; if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { const sessionId = deps.sessionBridge.getSessionId(connectionId); const commandResult = await deps.commandRegistry.execute(commandInput, { channel: 'ws', senderId: connectionId, sessionId: sessionId ?? `ws:${connectionId}`, rawInput: commandInput, services: { getStatus: () => `Gateway session active. Current model tier: ${agent.getModelTier()}`, getUsage: () => { const usage = agent.getUsage(); const lines = [ '**Token Usage**', '', `Primary: ${usage.primary.inputTokens.toLocaleString()} in / ${usage.primary.outputTokens.toLocaleString()} out (${usage.primary.calls} calls)`, ]; const delegationEntries = Object.entries(usage.delegation); if (delegationEntries.length > 0) { lines.push(''); lines.push('Delegation:'); for (const [tier, stats] of delegationEntries) { lines.push(` ${tier}: ${stats.inputTokens.toLocaleString()} in / ${stats.outputTokens.toLocaleString()} out (${stats.calls} calls)`); } } lines.push(''); lines.push(`**Total:** ${usage.total.inputTokens.toLocaleString()} in / ${usage.total.outputTokens.toLocaleString()} out (${usage.total.calls} calls)`); if (usage.total.estimatedCost > 0) { lines.push(`**Estimated cost:** $${usage.total.estimatedCost.toFixed(4)}`); } return lines.join('\n'); }, getModel: () => `Current model tier: ${agent.getModelTier()}`, setModel: (tier) => { const validTiers: ModelTier[] = ['fast', 'default', 'complex', 'local']; const modelTier = tier as ModelTier; if (!validTiers.includes(modelTier)) { return `Invalid tier: ${tier}. Available: ${validTiers.join(', ')}`; } agent.setModelTier(modelTier); if (sessionId && deps.sessionManager) { deps.sessionManager.setSessionConfig('ws', sessionId, 'modelTier', modelTier); } return `Switched to model tier: ${modelTier}`; }, compact: async () => { const result = await agent.compact(); if (result && result.compactedCount > 0) { return `Compacted ${result.compactedCount} messages: ${result.tokensBefore} → ${result.tokensAfter} tokens`; } return 'Nothing to compact.'; }, reset: () => { agent.reset(); if (sessionId && deps.sessionManager) { deps.sessionManager.deleteSessionConfig('ws', sessionId, 'modelTier'); } return 'Session reset.'; }, }, }); if (commandResult.handled) { send(makeEvent(request.id, 'done', { content: commandResult.text })); return; } } // Set up tool use callback to emit streaming events deps.sessionBridge.setOnToolUse(connectionId, (event) => { if (event.type === 'start') { send(makeEvent(request.id, 'tool_start', { tool: event.tool, args: event.args })); } else if (event.type === 'end') { send(makeEvent(request.id, 'tool_end', { tool: event.tool, result: event.result ? { success: event.result.success, output: event.result.output, error: event.result.error, } : undefined, })); // Record tool failures as error events if (event.result && !event.result.success) { deps.metrics?.incrementErrors(); deps.metrics?.recordEvent({ timestamp: Date.now(), level: 'error', source: 'tool', message: `Tool '${event.tool}' failed: ${event.result.error ?? 'unknown error'}`, context: { sessionId: laneId, tool: event.tool }, }); } } }); try { // Convert gateway attachments to channel attachments const attachments: Attachment[] | undefined = params.attachments?.map(a => ({ mimeType: a.mimeType, data: a.data, url: a.url, filename: a.filename, })); const response = await agent.process(params.message!, attachments); deps.metrics?.incrementMessages(); send(makeEvent(request.id, 'done', { content: response })); } catch (err) { const message = err instanceof Error ? err.message : 'Unknown error'; deps.metrics?.incrementErrors(); deps.metrics?.recordEvent({ timestamp: Date.now(), level: 'error', source: 'agent.send', message, context: { sessionId: laneId }, }); send(makeEvent(request.id, 'error', { code: ErrorCode.InternalError, message })); } finally { deps.sessionBridge.setBusy(connectionId, false); deps.sessionBridge.setOnToolUse(connectionId, undefined); deps.metrics?.endRequest(requestId); } }); }, 'agent.cancel': async (request: GatewayRequest): Promise => { // Cancel is a placeholder — proper cancellation requires abort controller support in NativeAgent. // For now, just report whether the agent was busy. const params = request.params as { connectionId?: string } | undefined; const connectionId = params?.connectionId as string; if (!connectionId) { return makeError(request.id, ErrorCode.InvalidRequest, 'connectionId is required'); } const wasBusy = deps.sessionBridge.isBusy(connectionId); // TODO: Wire AbortController into NativeAgent for actual cancellation return { id: request.id, result: { cancelled: wasBusy } }; }, }; }