diff --git a/src/audit/logger.ts b/src/audit/logger.ts index a6d8f35..442cbb9 100644 --- a/src/audit/logger.ts +++ b/src/audit/logger.ts @@ -23,6 +23,7 @@ import type { UserActionEvent, QueuePreemptEvent, BackendRouteEvent, + BackendSuccessEvent, BackendFallbackEvent, CronTriggerEvent, WebhookReceiveEvent, @@ -215,6 +216,11 @@ export class AuditLogger { this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record }); } + backendSuccess(event: BackendSuccessEvent): void { + if (!this.shouldLog('sessions', 'info')) {return;} + this.write({ level: 'info', event_type: 'backend.success', event: event as unknown as Record }); + } + backendFallback(event: BackendFallbackEvent): void { if (!this.shouldLog('sessions', 'warn')) {return;} this.write({ level: 'warn', event_type: 'backend.fallback', event: event as unknown as Record }); diff --git a/src/audit/types.ts b/src/audit/types.ts index d06df43..fab5008 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -12,7 +12,7 @@ export type AuditEventType = // Session lifecycle | 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'session.checkpoint' | 'session.auto_compact' | 'user.action' | 'queue.preempt' - | 'backend.route' | 'backend.fallback' + | 'backend.route' | 'backend.success' | 'backend.fallback' // Automation - Cron | 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove' // Automation - Webhook @@ -236,17 +236,28 @@ export interface BackendRouteEvent { session_id: string; channel: string; sender: string; - selected_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini'; - source: 'agent_override' | 'default_external' | 'native'; + selected_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded'; + source: 'agent_override' | 'default_external' | 'native' | 'forced_native_guard'; + guard_reason?: 'capability_query' | 'pi_no_tools_mode' | 'attachments_present'; +} + +export interface BackendSuccessEvent { + session_id: string; + channel: string; + sender: string; + backend: 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded'; + duration_ms: number; + response_length: number; } export interface BackendFallbackEvent { session_id: string; channel: string; sender: string; - from_backend: 'claude_code' | 'opencode' | 'codex' | 'gemini'; - to_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini'; + from_backend: 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded'; + to_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded'; reason: string; + duration_ms?: number; } export interface CronTriggerEvent { diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index 51501d2..b688455 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -1279,6 +1279,151 @@ describe('daemon external backend integration', () => { expect(processSpy).toHaveBeenCalled(); expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'native fallback response' })); }); + + it('uses pi_embedded backend for plain text canary turns', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process'); + const history: Array<{ role: 'user' | 'assistant'; content: string }> = []; + const session = { + id: 'telegram:pi-canary', + addMessage: vi.fn((msg: { role: 'user' | 'assistant'; content: string }) => { + history.push(msg); + return msg; + }), + getHistory: vi.fn(() => [...history]), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const piBackend = { + name: 'pi_embedded', + process: vi.fn(async () => 'pi embedded response'), + }; + + const router = createMessageRouter({ + sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + backends: { + pi_embedded: { no_tools_mode: false }, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + externalBackends: { pi_embedded: piBackend } as unknown as MessageRouterDeps['externalBackends'], + defaultName: 'pi_embedded', + }); + + const reply = vi.fn(async (_message: OutboundMessage) => {}); + await router.handler({ + id: 'm-pi-canary', + channel: 'telegram', + senderId: 'pi-canary', + text: 'just chat with me', + timestamp: Date.now(), + } as MessageRouterInput, reply); + + expect(piBackend.process).toHaveBeenCalled(); + expect(processSpy).not.toHaveBeenCalled(); + expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'pi embedded response' })); + }); + + it('forces native processing for pi_embedded no-tools mode when prompt appears tool-oriented', async () => { + const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process') + .mockResolvedValue('native guarded response'); + const history: Array<{ role: 'user' | 'assistant'; content: string }> = []; + const session = { + id: 'telegram:pi-no-tools', + addMessage: vi.fn((msg: { role: 'user' | 'assistant'; content: string }) => { + history.push(msg); + return msg; + }), + getHistory: vi.fn(() => [...history]), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const piBackend = { + name: 'pi_embedded', + process: vi.fn(async () => 'pi embedded response'), + }; + + const router = createMessageRouter({ + sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + backends: { + pi_embedded: { no_tools_mode: true }, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + externalBackends: { pi_embedded: piBackend } as unknown as MessageRouterDeps['externalBackends'], + defaultName: 'pi_embedded', + }); + + const reply = vi.fn(async (_message: OutboundMessage) => {}); + await router.handler({ + id: 'm-pi-no-tools', + channel: 'telegram', + senderId: 'pi-no-tools', + text: 'please read the file and run a shell command', + timestamp: Date.now(), + } as MessageRouterInput, reply); + + expect(piBackend.process).not.toHaveBeenCalled(); + expect(processSpy).toHaveBeenCalled(); + expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'native guarded response' })); + }); }); describe('daemon audio routing integration', () => { diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 0b103e2..c2bd1dd 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -164,6 +164,27 @@ function shouldForceNativeForCapabilityQuery(text: string): boolean { ); } +function shouldForceNativeForPiNoTools(text: string): boolean { + const normalized = text.trim().toLowerCase(); + if (!normalized) { + return false; + } + + if ( + /`(?:shell\.exec|file\.(?:read|write|edit|patch|list)|web\.(?:fetch|search)|browser\.)/.test(normalized) + || /\b(?:gmail|calendar|docs|drive|tasks|k8s|docker|minio)\b/.test(normalized) + ) { + return true; + } + + return ( + /\b(?:run|execute)\s+(?:a\s+)?(?:shell|bash|command)\b/.test(normalized) + || /\b(?:read|open|show|edit|write|patch|delete|list)\s+(?:the\s+)?(?:file|files|directory|repo|code)\b/.test(normalized) + || /\b(?:search|fetch|browse|scrape)\s+(?:the\s+)?(?:web|internet|url|site)\b/.test(normalized) + || /\b(?:use|call)\s+(?:a\s+)?tool\b/.test(normalized) + ); +} + function providerAcceptsNativeAudioContentParts(provider: string): boolean { return ( provider === 'openai' @@ -1390,11 +1411,26 @@ export function createMessageRouter(deps: { const requestedBackend = agentConfig?.backend ?? deps.defaultName; const forceNativeForCapabilityQuery = shouldForceNativeForCapabilityQuery(messageText); - const sessionIdForAudit = `${msg.channel}:${msg.senderId}`; + const hasAttachmentsForExternalBackend = Boolean(attachments && attachments.length > 0); const selectedBackend = requestedBackend && requestedBackend !== 'native' ? deps.externalBackends?.[requestedBackend] : undefined; - const selectedBackendForAudit: 'native' | ExternalBackendName = selectedBackend && requestedBackend && !forceNativeForCapabilityQuery + const externalBackendRequested = Boolean(selectedBackend && requestedBackend && requestedBackend !== 'native'); + const forceNativeForPiNoTools = requestedBackend === 'pi_embedded' + && deps.config.backends.pi_embedded.no_tools_mode + && shouldForceNativeForPiNoTools(messageText); + let forcedNativeGuardReason: 'capability_query' | 'pi_no_tools_mode' | 'attachments_present' | undefined; + if (externalBackendRequested) { + if (forceNativeForCapabilityQuery) { + forcedNativeGuardReason = 'capability_query'; + } else if (forceNativeForPiNoTools) { + forcedNativeGuardReason = 'pi_no_tools_mode'; + } else if (hasAttachmentsForExternalBackend) { + forcedNativeGuardReason = 'attachments_present'; + } + } + const sessionIdForAudit = `${msg.channel}:${msg.senderId}`; + const selectedBackendForAudit: 'native' | ExternalBackendName = selectedBackend && requestedBackend && !forcedNativeGuardReason ? requestedBackend : 'native'; @@ -1403,14 +1439,18 @@ export function createMessageRouter(deps: { channel: msg.channel, sender: msg.senderId, selected_backend: selectedBackendForAudit, - source: agentConfig?.backend - ? 'agent_override' - : selectedBackend - ? 'default_external' - : 'native', + source: forcedNativeGuardReason + ? 'forced_native_guard' + : agentConfig?.backend + ? 'agent_override' + : selectedBackend + ? 'default_external' + : 'native', + ...(forcedNativeGuardReason ? { guard_reason: forcedNativeGuardReason } : {}), }); - if (selectedBackend && (!attachments || attachments.length === 0) && !forceNativeForCapabilityQuery) { + if (selectedBackend && !hasAttachmentsForExternalBackend && !forceNativeForCapabilityQuery && !forceNativeForPiNoTools) { + const backendStartedAt = Date.now(); try { const history = toExternalHistory(session.getHistory()); session.addMessage({ role: 'user', content: messageText }); @@ -1418,6 +1458,14 @@ export function createMessageRouter(deps: { prompt: messageText, history, }); + auditLogger?.backendSuccess?.({ + session_id: sessionIdForAudit, + channel: msg.channel, + sender: msg.senderId, + backend: selectedBackend.name, + duration_ms: Date.now() - backendStartedAt, + response_length: response.length, + }); session.addMessage({ role: 'assistant', content: response }); const ttsAttachment = await maybeBuildTtsAttachment(response, msg.channel); await reply({ @@ -1438,6 +1486,7 @@ export function createMessageRouter(deps: { : (selectedBackend.name as ExternalBackendName), to_backend: 'native', reason: detail, + duration_ms: Date.now() - backendStartedAt, }); } }