From 9b76c75e820efca9fc2b5e1cefddf057c33d2b60 Mon Sep 17 00:00:00 2001 From: William Valentin Date: Mon, 16 Feb 2026 13:21:15 -0800 Subject: [PATCH] feat(audit): record user action events across gateway and channels --- README.md | 16 +++++++ docs/plans/state.json | 33 ++++++++++++++ src/audit/logger.ts | 6 +++ src/audit/types.ts | 13 +++++- src/daemon/routing.test.ts | 72 ++++++++++++++++++++++++++++++ src/daemon/routing.ts | 14 ++++++ src/gateway/handlers/agent.test.ts | 25 +++++++++++ src/gateway/handlers/agent.ts | 14 ++++++ 8 files changed, 192 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a860931..ea9d906 100644 --- a/README.md +++ b/README.md @@ -822,6 +822,22 @@ When the selected backend is unavailable (for example embedding provider errors) | `top_k` | no | Max QMD results returned by `memory.search` (default: `8`) | | `min_score` | no | Minimum relevance score (0.0-1.0) for QMD matches (default: `0.15`) | +## Session End Summary + +Optionally summarize conversations when a WebSocket session ends and append the summary to memory. + +```yaml +sessions: + end_summary: + enabled: true + tier: fast + memory_namespace: session/summaries +``` + +## Audit Trail + +Flynn writes structured audit events to `audit.path`, including tool execution, session lifecycle, and user actions (`user.action`) from both channel and gateway requests. + ## Gateway Lock Single-client mode for the WebSocket gateway. When enabled, only one WebSocket connection is allowed at a time. Additional connections are rejected with close code `4003`. diff --git a/docs/plans/state.json b/docs/plans/state.json index a72fc94..8f19797 100644 --- a/docs/plans/state.json +++ b/docs/plans/state.json @@ -3,6 +3,39 @@ "updated_at": "2026-02-16", "description": "Tracks the status of all Flynn plans and implementation phases", "plans": { + "backup-session-summary-audit-trail": { + "status": "completed", + "date": "2026-02-16", + "updated": "2026-02-16", + "summary": "Implemented operator resilience features: backup snapshots with MinIO upload support (`flynn backup` + optional daemon scheduler), optional end-of-session summarization for WebSocket sessions with memory persistence, and explicit `user.action` audit events across channel and gateway message entry points.", + "files_modified": [ + "src/config/schema.ts", + "src/config/schema.test.ts", + "src/config/index.ts", + "src/daemon/index.ts", + "src/daemon/routing.ts", + "src/daemon/routing.test.ts", + "src/gateway/session-bridge.ts", + "src/gateway/session-bridge.test.ts", + "src/gateway/handlers/agent.ts", + "src/gateway/handlers/agent.test.ts", + "src/audit/types.ts", + "src/audit/logger.ts", + "src/session/index.ts", + "src/session/endSummary.ts", + "src/session/endSummary.test.ts", + "src/backup/run.ts", + "src/backup/run.test.ts", + "src/backup/index.ts", + "src/cli/backup.ts", + "src/cli/index.ts", + "src/cli/index.test.ts", + "config/default.yaml", + "README.md", + "scripts/backup-to-minio.sh" + ], + "test_status": "pnpm test:run src/backup/run.test.ts src/session/endSummary.test.ts src/config/schema.test.ts src/gateway/session-bridge.test.ts src/gateway/handlers/agent.test.ts src/daemon/routing.test.ts src/cli/index.test.ts + pnpm typecheck passing" + }, "docs-agent-oriented-diagrams-pass": { "status": "completed", "date": "2026-02-15", diff --git a/src/audit/logger.ts b/src/audit/logger.ts index 04d3791..0ac9d0e 100644 --- a/src/audit/logger.ts +++ b/src/audit/logger.ts @@ -16,6 +16,7 @@ import type { SessionMessageEvent, SessionDeleteEvent, SessionCompactEvent, + UserActionEvent, CronTriggerEvent, WebhookReceiveEvent, HeartbeatCycleEvent, @@ -174,6 +175,11 @@ export class AuditLogger { this.write({ level: 'debug', event_type: 'session.compact', event: event as unknown as Record }); } + userAction(event: UserActionEvent): void { + if (!this.shouldLog('sessions', 'info')) {return;} + this.write({ level: 'info', event_type: 'user.action', event: event as unknown as Record }); + } + sessionTransfer(from: string, to: string, messageCount: number): void { if (!this.shouldLog('sessions', 'debug')) {return;} this.write({ diff --git a/src/audit/types.ts b/src/audit/types.ts index 65ab319..d74bae6 100644 --- a/src/audit/types.ts +++ b/src/audit/types.ts @@ -10,7 +10,7 @@ export type AuditEventType = // Skills installer | 'skills.installer.execution_blocked' | 'skills.installer.command_result' | 'skills.registry_install' // Session lifecycle - | 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' + | 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'user.action' // Automation - Cron | 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove' // Automation - Webhook @@ -182,6 +182,17 @@ export interface SessionCompactEvent { tokens_after: number; } +export interface UserActionEvent { + session_id: string; + channel: string; + sender: string; + source: 'gateway' | 'channel'; + action_type: 'message' | 'command'; + content_length: number; + attachments_count?: number; + command?: string; +} + export interface CronTriggerEvent { job_name: string; schedule: string; diff --git a/src/daemon/routing.test.ts b/src/daemon/routing.test.ts index fea70ed..2214e48 100644 --- a/src/daemon/routing.test.ts +++ b/src/daemon/routing.test.ts @@ -8,6 +8,7 @@ import { CommandRegistry, registerBuiltinCommands } from '../commands/index.js'; import { ComponentRegistry } from '../intents/index.js'; import { RoutingPolicy } from '../routing/index.js'; import type { OutboundMessage } from '../channels/index.js'; +import { initAuditLogger } from '../audit/index.js'; type MessageRouterDeps = Parameters[0]; type MessageRouterInput = Parameters['handler']>[0]; @@ -144,6 +145,77 @@ describe('daemon command fast-path integration', () => { expect(session.deleteConfig).toHaveBeenCalledWith('modelTier'); }); + it('emits user.action audit events for channel messages', async () => { + const mockAuditLogger = { + userAction: vi.fn(), + }; + initAuditLogger(mockAuditLogger as any); + + const session = { + id: 'telegram:user-audit', + addMessage: vi.fn(), + getHistory: vi.fn(() => []), + clear: vi.fn(), + replaceHistory: vi.fn(), + getConfig: vi.fn(() => undefined), + setConfig: vi.fn(), + deleteConfig: vi.fn(), + }; + + const commandRegistry = new CommandRegistry(); + registerBuiltinCommands(commandRegistry); + + const router = createMessageRouter({ + sessionManager: { + getSession: vi.fn(() => session), + } as unknown as MessageRouterDeps['sessionManager'], + modelRouter: { + getAvailableTiers: () => ['fast', 'default', 'complex', 'local'], + getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }), + getLabel: (tier: string) => tier, + } as unknown as MessageRouterDeps['modelRouter'], + systemPrompt: 'test prompt', + toolRegistry: { + clone() { return this; }, + register: vi.fn(), + } as unknown as MessageRouterDeps['toolRegistry'], + toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'], + config: { + agents: { + primary_tier: 'default', + delegation: { + compaction: 'fast', + memory_extraction: 'fast', + classification: 'fast', + tool_summarisation: 'fast', + complex_reasoning: 'complex', + }, + max_delegation_depth: 3, + max_iterations: 10, + }, + compaction: { enabled: false }, + models: { default: { provider: 'anthropic', model: 'claude' } }, + } as unknown as MessageRouterDeps['config'], + commandRegistry, + }); + + await router.handler({ + id: 'm-audit', + channel: 'telegram', + senderId: 'user-audit', + text: '/reset', + metadata: { isCommand: true, command: 'reset' }, + } as unknown as MessageRouterInput, vi.fn(async (_: OutboundMessage) => {})); + + expect(mockAuditLogger.userAction).toHaveBeenCalledWith( + expect.objectContaining({ + source: 'channel', + action_type: 'command', + channel: 'telegram', + }), + ); + }); + it('handles model command via fast-path and persists tier override', async () => { const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process'); const setModelTierSpy = vi.spyOn(AgentOrchestrator.prototype, 'setModelTier'); diff --git a/src/daemon/routing.ts b/src/daemon/routing.ts index 39605cd..8632fda 100644 --- a/src/daemon/routing.ts +++ b/src/daemon/routing.ts @@ -350,6 +350,20 @@ export function createMessageRouter(deps: { const commandInput = msg.metadata?.isCommand && typeof msg.metadata.command === 'string' ? `/${msg.metadata.command}${msg.metadata.commandArgs ? ` ${msg.metadata.commandArgs}` : ''}` : incomingText; + const metadataCommand = typeof msg.metadata?.command === 'string' ? msg.metadata.command : undefined; + const parsedCommand = msg.metadata?.isCommand + ? metadataCommand + : commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined; + auditLogger?.userAction({ + session_id: `${msg.channel}:${msg.senderId}`, + channel: msg.channel, + sender: msg.senderId, + source: 'channel', + action_type: parsedCommand ? 'command' : 'message', + content_length: commandInput.length, + attachments_count: msg.attachments?.length ?? 0, + command: parsedCommand, + }); if (deps.commandRegistry && deps.commandRegistry.isCommand(commandInput)) { const session = deps.sessionManager.getSession(msg.channel, msg.senderId); diff --git a/src/gateway/handlers/agent.test.ts b/src/gateway/handlers/agent.test.ts index dfdb462..96f8670 100644 --- a/src/gateway/handlers/agent.test.ts +++ b/src/gateway/handlers/agent.test.ts @@ -4,6 +4,7 @@ import { LaneQueue, LaneQueueRejectedError } from '../lane-queue.js'; import { createAgentHandlers } from './agent.js'; import type { AgentHandlerDeps } from './agent.js'; import { CommandRegistry, registerBuiltinCommands } from '../../commands/index.js'; +import { initAuditLogger } from '../../audit/index.js'; describe('createAgentHandlers command fast-path', () => { const mockAgent = { @@ -35,6 +36,9 @@ describe('createAgentHandlers command fast-path', () => { const commandRegistry = new CommandRegistry(); registerBuiltinCommands(commandRegistry); + const mockAuditLogger = { + userAction: vi.fn(), + }; const handlers = createAgentHandlers({ sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'], @@ -45,6 +49,7 @@ describe('createAgentHandlers command fast-path', () => { beforeEach(() => { vi.clearAllMocks(); + initAuditLogger(mockAuditLogger as any); mockAgent.process.mockResolvedValue('agent response'); mockAgent.compact.mockResolvedValue(null); }); @@ -125,6 +130,26 @@ describe('createAgentHandlers command fast-path', () => { expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response'); }); + it('emits user.action audit events for gateway requests', async () => { + const sent: OutboundMessage[] = []; + const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); + const req: GatewayRequest = { + id: 7, + method: 'agent.send', + params: { message: 'hello there', connectionId: 'conn-1' }, + }; + + await handlers['agent.send'](req, send); + + expect(mockAuditLogger.userAction).toHaveBeenCalledWith( + expect.objectContaining({ + source: 'gateway', + action_type: 'message', + sender: 'conn-1', + }), + ); + }); + it('handles /queue command via fast-path and persists queue session config', async () => { const sent: OutboundMessage[] = []; const send = vi.fn((msg: OutboundMessage) => sent.push(msg)); diff --git a/src/gateway/handlers/agent.ts b/src/gateway/handlers/agent.ts index 8a5bb77..c7bf68c 100644 --- a/src/gateway/handlers/agent.ts +++ b/src/gateway/handlers/agent.ts @@ -72,6 +72,20 @@ export function createAgentHandlers(deps: AgentHandlerDeps) { ? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}` : (safeParams.message ?? ''); + const parsedCommand = safeParams.metadata?.isCommand + ? safeParams.metadata.command + : commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined; + auditLogger?.userAction({ + session_id: sessionId ?? `ws:${connectionId}`, + channel: 'ws', + sender: connectionId, + source: 'gateway', + action_type: parsedCommand ? 'command' : 'message', + content_length: commandInput.length, + attachments_count: safeParams.attachments?.length ?? 0, + command: parsedCommand, + }); + if (commandInput && deps.commandRegistry?.isCommand(commandInput)) { const sessionId = deps.sessionBridge.getSessionId(connectionId); const commandResult = await deps.commandRegistry.execute(commandInput, {