feat(routing): add pi canary guardrails and backend audit telemetry
This commit is contained in:
@@ -23,6 +23,7 @@ import type {
|
|||||||
UserActionEvent,
|
UserActionEvent,
|
||||||
QueuePreemptEvent,
|
QueuePreemptEvent,
|
||||||
BackendRouteEvent,
|
BackendRouteEvent,
|
||||||
|
BackendSuccessEvent,
|
||||||
BackendFallbackEvent,
|
BackendFallbackEvent,
|
||||||
CronTriggerEvent,
|
CronTriggerEvent,
|
||||||
WebhookReceiveEvent,
|
WebhookReceiveEvent,
|
||||||
@@ -215,6 +216,11 @@ export class AuditLogger {
|
|||||||
this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record<string, unknown> });
|
this.write({ level: 'info', event_type: 'backend.route', event: event as unknown as Record<string, unknown> });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
backendSuccess(event: BackendSuccessEvent): void {
|
||||||
|
if (!this.shouldLog('sessions', 'info')) {return;}
|
||||||
|
this.write({ level: 'info', event_type: 'backend.success', event: event as unknown as Record<string, unknown> });
|
||||||
|
}
|
||||||
|
|
||||||
backendFallback(event: BackendFallbackEvent): void {
|
backendFallback(event: BackendFallbackEvent): void {
|
||||||
if (!this.shouldLog('sessions', 'warn')) {return;}
|
if (!this.shouldLog('sessions', 'warn')) {return;}
|
||||||
this.write({ level: 'warn', event_type: 'backend.fallback', event: event as unknown as Record<string, unknown> });
|
this.write({ level: 'warn', event_type: 'backend.fallback', event: event as unknown as Record<string, unknown> });
|
||||||
|
|||||||
+16
-5
@@ -12,7 +12,7 @@ export type AuditEventType =
|
|||||||
// Session lifecycle
|
// Session lifecycle
|
||||||
| 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'session.checkpoint' | 'session.auto_compact' | 'user.action'
|
| 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'session.checkpoint' | 'session.auto_compact' | 'user.action'
|
||||||
| 'queue.preempt'
|
| 'queue.preempt'
|
||||||
| 'backend.route' | 'backend.fallback'
|
| 'backend.route' | 'backend.success' | 'backend.fallback'
|
||||||
// Automation - Cron
|
// Automation - Cron
|
||||||
| 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove'
|
| 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove'
|
||||||
// Automation - Webhook
|
// Automation - Webhook
|
||||||
@@ -236,17 +236,28 @@ export interface BackendRouteEvent {
|
|||||||
session_id: string;
|
session_id: string;
|
||||||
channel: string;
|
channel: string;
|
||||||
sender: string;
|
sender: string;
|
||||||
selected_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini';
|
selected_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
|
||||||
source: 'agent_override' | 'default_external' | 'native';
|
source: 'agent_override' | 'default_external' | 'native' | 'forced_native_guard';
|
||||||
|
guard_reason?: 'capability_query' | 'pi_no_tools_mode' | 'attachments_present';
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface BackendSuccessEvent {
|
||||||
|
session_id: string;
|
||||||
|
channel: string;
|
||||||
|
sender: string;
|
||||||
|
backend: 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
|
||||||
|
duration_ms: number;
|
||||||
|
response_length: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface BackendFallbackEvent {
|
export interface BackendFallbackEvent {
|
||||||
session_id: string;
|
session_id: string;
|
||||||
channel: string;
|
channel: string;
|
||||||
sender: string;
|
sender: string;
|
||||||
from_backend: 'claude_code' | 'opencode' | 'codex' | 'gemini';
|
from_backend: 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
|
||||||
to_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini';
|
to_backend: 'native' | 'claude_code' | 'opencode' | 'codex' | 'gemini' | 'pi_embedded';
|
||||||
reason: string;
|
reason: string;
|
||||||
|
duration_ms?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface CronTriggerEvent {
|
export interface CronTriggerEvent {
|
||||||
|
|||||||
@@ -1279,6 +1279,151 @@ describe('daemon external backend integration', () => {
|
|||||||
expect(processSpy).toHaveBeenCalled();
|
expect(processSpy).toHaveBeenCalled();
|
||||||
expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'native fallback response' }));
|
expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'native fallback response' }));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('uses pi_embedded backend for plain text canary turns', async () => {
|
||||||
|
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process');
|
||||||
|
const history: Array<{ role: 'user' | 'assistant'; content: string }> = [];
|
||||||
|
const session = {
|
||||||
|
id: 'telegram:pi-canary',
|
||||||
|
addMessage: vi.fn((msg: { role: 'user' | 'assistant'; content: string }) => {
|
||||||
|
history.push(msg);
|
||||||
|
return msg;
|
||||||
|
}),
|
||||||
|
getHistory: vi.fn(() => [...history]),
|
||||||
|
clear: vi.fn(),
|
||||||
|
replaceHistory: vi.fn(),
|
||||||
|
getConfig: vi.fn(() => undefined),
|
||||||
|
setConfig: vi.fn(),
|
||||||
|
deleteConfig: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const piBackend = {
|
||||||
|
name: 'pi_embedded',
|
||||||
|
process: vi.fn(async () => 'pi embedded response'),
|
||||||
|
};
|
||||||
|
|
||||||
|
const router = createMessageRouter({
|
||||||
|
sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'],
|
||||||
|
modelRouter: {
|
||||||
|
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
|
||||||
|
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
|
||||||
|
getLabel: (tier: string) => tier,
|
||||||
|
} as unknown as MessageRouterDeps['modelRouter'],
|
||||||
|
systemPrompt: 'test prompt',
|
||||||
|
toolRegistry: {
|
||||||
|
clone() { return this; },
|
||||||
|
register: vi.fn(),
|
||||||
|
} as unknown as MessageRouterDeps['toolRegistry'],
|
||||||
|
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
|
||||||
|
config: {
|
||||||
|
agents: {
|
||||||
|
primary_tier: 'default',
|
||||||
|
delegation: {
|
||||||
|
compaction: 'fast',
|
||||||
|
memory_extraction: 'fast',
|
||||||
|
classification: 'fast',
|
||||||
|
tool_summarisation: 'fast',
|
||||||
|
complex_reasoning: 'complex',
|
||||||
|
},
|
||||||
|
max_delegation_depth: 3,
|
||||||
|
max_iterations: 10,
|
||||||
|
},
|
||||||
|
backends: {
|
||||||
|
pi_embedded: { no_tools_mode: false },
|
||||||
|
},
|
||||||
|
compaction: { enabled: false },
|
||||||
|
models: { default: { provider: 'anthropic', model: 'claude' } },
|
||||||
|
} as unknown as MessageRouterDeps['config'],
|
||||||
|
externalBackends: { pi_embedded: piBackend } as unknown as MessageRouterDeps['externalBackends'],
|
||||||
|
defaultName: 'pi_embedded',
|
||||||
|
});
|
||||||
|
|
||||||
|
const reply = vi.fn(async (_message: OutboundMessage) => {});
|
||||||
|
await router.handler({
|
||||||
|
id: 'm-pi-canary',
|
||||||
|
channel: 'telegram',
|
||||||
|
senderId: 'pi-canary',
|
||||||
|
text: 'just chat with me',
|
||||||
|
timestamp: Date.now(),
|
||||||
|
} as MessageRouterInput, reply);
|
||||||
|
|
||||||
|
expect(piBackend.process).toHaveBeenCalled();
|
||||||
|
expect(processSpy).not.toHaveBeenCalled();
|
||||||
|
expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'pi embedded response' }));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('forces native processing for pi_embedded no-tools mode when prompt appears tool-oriented', async () => {
|
||||||
|
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process')
|
||||||
|
.mockResolvedValue('native guarded response');
|
||||||
|
const history: Array<{ role: 'user' | 'assistant'; content: string }> = [];
|
||||||
|
const session = {
|
||||||
|
id: 'telegram:pi-no-tools',
|
||||||
|
addMessage: vi.fn((msg: { role: 'user' | 'assistant'; content: string }) => {
|
||||||
|
history.push(msg);
|
||||||
|
return msg;
|
||||||
|
}),
|
||||||
|
getHistory: vi.fn(() => [...history]),
|
||||||
|
clear: vi.fn(),
|
||||||
|
replaceHistory: vi.fn(),
|
||||||
|
getConfig: vi.fn(() => undefined),
|
||||||
|
setConfig: vi.fn(),
|
||||||
|
deleteConfig: vi.fn(),
|
||||||
|
};
|
||||||
|
|
||||||
|
const piBackend = {
|
||||||
|
name: 'pi_embedded',
|
||||||
|
process: vi.fn(async () => 'pi embedded response'),
|
||||||
|
};
|
||||||
|
|
||||||
|
const router = createMessageRouter({
|
||||||
|
sessionManager: { getSession: vi.fn(() => session) } as unknown as MessageRouterDeps['sessionManager'],
|
||||||
|
modelRouter: {
|
||||||
|
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
|
||||||
|
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
|
||||||
|
getLabel: (tier: string) => tier,
|
||||||
|
} as unknown as MessageRouterDeps['modelRouter'],
|
||||||
|
systemPrompt: 'test prompt',
|
||||||
|
toolRegistry: {
|
||||||
|
clone() { return this; },
|
||||||
|
register: vi.fn(),
|
||||||
|
} as unknown as MessageRouterDeps['toolRegistry'],
|
||||||
|
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
|
||||||
|
config: {
|
||||||
|
agents: {
|
||||||
|
primary_tier: 'default',
|
||||||
|
delegation: {
|
||||||
|
compaction: 'fast',
|
||||||
|
memory_extraction: 'fast',
|
||||||
|
classification: 'fast',
|
||||||
|
tool_summarisation: 'fast',
|
||||||
|
complex_reasoning: 'complex',
|
||||||
|
},
|
||||||
|
max_delegation_depth: 3,
|
||||||
|
max_iterations: 10,
|
||||||
|
},
|
||||||
|
backends: {
|
||||||
|
pi_embedded: { no_tools_mode: true },
|
||||||
|
},
|
||||||
|
compaction: { enabled: false },
|
||||||
|
models: { default: { provider: 'anthropic', model: 'claude' } },
|
||||||
|
} as unknown as MessageRouterDeps['config'],
|
||||||
|
externalBackends: { pi_embedded: piBackend } as unknown as MessageRouterDeps['externalBackends'],
|
||||||
|
defaultName: 'pi_embedded',
|
||||||
|
});
|
||||||
|
|
||||||
|
const reply = vi.fn(async (_message: OutboundMessage) => {});
|
||||||
|
await router.handler({
|
||||||
|
id: 'm-pi-no-tools',
|
||||||
|
channel: 'telegram',
|
||||||
|
senderId: 'pi-no-tools',
|
||||||
|
text: 'please read the file and run a shell command',
|
||||||
|
timestamp: Date.now(),
|
||||||
|
} as MessageRouterInput, reply);
|
||||||
|
|
||||||
|
expect(piBackend.process).not.toHaveBeenCalled();
|
||||||
|
expect(processSpy).toHaveBeenCalled();
|
||||||
|
expect(reply).toHaveBeenCalledWith(expect.objectContaining({ text: 'native guarded response' }));
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('daemon audio routing integration', () => {
|
describe('daemon audio routing integration', () => {
|
||||||
|
|||||||
+57
-8
@@ -164,6 +164,27 @@ function shouldForceNativeForCapabilityQuery(text: string): boolean {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function shouldForceNativeForPiNoTools(text: string): boolean {
|
||||||
|
const normalized = text.trim().toLowerCase();
|
||||||
|
if (!normalized) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
/`(?:shell\.exec|file\.(?:read|write|edit|patch|list)|web\.(?:fetch|search)|browser\.)/.test(normalized)
|
||||||
|
|| /\b(?:gmail|calendar|docs|drive|tasks|k8s|docker|minio)\b/.test(normalized)
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (
|
||||||
|
/\b(?:run|execute)\s+(?:a\s+)?(?:shell|bash|command)\b/.test(normalized)
|
||||||
|
|| /\b(?:read|open|show|edit|write|patch|delete|list)\s+(?:the\s+)?(?:file|files|directory|repo|code)\b/.test(normalized)
|
||||||
|
|| /\b(?:search|fetch|browse|scrape)\s+(?:the\s+)?(?:web|internet|url|site)\b/.test(normalized)
|
||||||
|
|| /\b(?:use|call)\s+(?:a\s+)?tool\b/.test(normalized)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
function providerAcceptsNativeAudioContentParts(provider: string): boolean {
|
function providerAcceptsNativeAudioContentParts(provider: string): boolean {
|
||||||
return (
|
return (
|
||||||
provider === 'openai'
|
provider === 'openai'
|
||||||
@@ -1390,11 +1411,26 @@ export function createMessageRouter(deps: {
|
|||||||
|
|
||||||
const requestedBackend = agentConfig?.backend ?? deps.defaultName;
|
const requestedBackend = agentConfig?.backend ?? deps.defaultName;
|
||||||
const forceNativeForCapabilityQuery = shouldForceNativeForCapabilityQuery(messageText);
|
const forceNativeForCapabilityQuery = shouldForceNativeForCapabilityQuery(messageText);
|
||||||
const sessionIdForAudit = `${msg.channel}:${msg.senderId}`;
|
const hasAttachmentsForExternalBackend = Boolean(attachments && attachments.length > 0);
|
||||||
const selectedBackend = requestedBackend && requestedBackend !== 'native'
|
const selectedBackend = requestedBackend && requestedBackend !== 'native'
|
||||||
? deps.externalBackends?.[requestedBackend]
|
? deps.externalBackends?.[requestedBackend]
|
||||||
: undefined;
|
: undefined;
|
||||||
const selectedBackendForAudit: 'native' | ExternalBackendName = selectedBackend && requestedBackend && !forceNativeForCapabilityQuery
|
const externalBackendRequested = Boolean(selectedBackend && requestedBackend && requestedBackend !== 'native');
|
||||||
|
const forceNativeForPiNoTools = requestedBackend === 'pi_embedded'
|
||||||
|
&& deps.config.backends.pi_embedded.no_tools_mode
|
||||||
|
&& shouldForceNativeForPiNoTools(messageText);
|
||||||
|
let forcedNativeGuardReason: 'capability_query' | 'pi_no_tools_mode' | 'attachments_present' | undefined;
|
||||||
|
if (externalBackendRequested) {
|
||||||
|
if (forceNativeForCapabilityQuery) {
|
||||||
|
forcedNativeGuardReason = 'capability_query';
|
||||||
|
} else if (forceNativeForPiNoTools) {
|
||||||
|
forcedNativeGuardReason = 'pi_no_tools_mode';
|
||||||
|
} else if (hasAttachmentsForExternalBackend) {
|
||||||
|
forcedNativeGuardReason = 'attachments_present';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const sessionIdForAudit = `${msg.channel}:${msg.senderId}`;
|
||||||
|
const selectedBackendForAudit: 'native' | ExternalBackendName = selectedBackend && requestedBackend && !forcedNativeGuardReason
|
||||||
? requestedBackend
|
? requestedBackend
|
||||||
: 'native';
|
: 'native';
|
||||||
|
|
||||||
@@ -1403,14 +1439,18 @@ export function createMessageRouter(deps: {
|
|||||||
channel: msg.channel,
|
channel: msg.channel,
|
||||||
sender: msg.senderId,
|
sender: msg.senderId,
|
||||||
selected_backend: selectedBackendForAudit,
|
selected_backend: selectedBackendForAudit,
|
||||||
source: agentConfig?.backend
|
source: forcedNativeGuardReason
|
||||||
? 'agent_override'
|
? 'forced_native_guard'
|
||||||
: selectedBackend
|
: agentConfig?.backend
|
||||||
? 'default_external'
|
? 'agent_override'
|
||||||
: 'native',
|
: selectedBackend
|
||||||
|
? 'default_external'
|
||||||
|
: 'native',
|
||||||
|
...(forcedNativeGuardReason ? { guard_reason: forcedNativeGuardReason } : {}),
|
||||||
});
|
});
|
||||||
|
|
||||||
if (selectedBackend && (!attachments || attachments.length === 0) && !forceNativeForCapabilityQuery) {
|
if (selectedBackend && !hasAttachmentsForExternalBackend && !forceNativeForCapabilityQuery && !forceNativeForPiNoTools) {
|
||||||
|
const backendStartedAt = Date.now();
|
||||||
try {
|
try {
|
||||||
const history = toExternalHistory(session.getHistory());
|
const history = toExternalHistory(session.getHistory());
|
||||||
session.addMessage({ role: 'user', content: messageText });
|
session.addMessage({ role: 'user', content: messageText });
|
||||||
@@ -1418,6 +1458,14 @@ export function createMessageRouter(deps: {
|
|||||||
prompt: messageText,
|
prompt: messageText,
|
||||||
history,
|
history,
|
||||||
});
|
});
|
||||||
|
auditLogger?.backendSuccess?.({
|
||||||
|
session_id: sessionIdForAudit,
|
||||||
|
channel: msg.channel,
|
||||||
|
sender: msg.senderId,
|
||||||
|
backend: selectedBackend.name,
|
||||||
|
duration_ms: Date.now() - backendStartedAt,
|
||||||
|
response_length: response.length,
|
||||||
|
});
|
||||||
session.addMessage({ role: 'assistant', content: response });
|
session.addMessage({ role: 'assistant', content: response });
|
||||||
const ttsAttachment = await maybeBuildTtsAttachment(response, msg.channel);
|
const ttsAttachment = await maybeBuildTtsAttachment(response, msg.channel);
|
||||||
await reply({
|
await reply({
|
||||||
@@ -1438,6 +1486,7 @@ export function createMessageRouter(deps: {
|
|||||||
: (selectedBackend.name as ExternalBackendName),
|
: (selectedBackend.name as ExternalBackendName),
|
||||||
to_backend: 'native',
|
to_backend: 'native',
|
||||||
reason: detail,
|
reason: detail,
|
||||||
|
duration_ms: Date.now() - backendStartedAt,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user