feat(audit): record user action events across gateway and channels

This commit is contained in:
William Valentin
2026-02-16 13:21:15 -08:00
parent 3f627cc1ad
commit 9b76c75e82
8 changed files with 192 additions and 1 deletions
+16
View File
@@ -822,6 +822,22 @@ When the selected backend is unavailable (for example embedding provider errors)
| `top_k` | no | Max QMD results returned by `memory.search` (default: `8`) |
| `min_score` | no | Minimum relevance score (0.0-1.0) for QMD matches (default: `0.15`) |
## Session End Summary
Optionally summarize conversations when a WebSocket session ends and append the summary to memory.
```yaml
sessions:
end_summary:
enabled: true
tier: fast
memory_namespace: session/summaries
```
## Audit Trail
Flynn writes structured audit events to `audit.path`, including tool execution, session lifecycle, and user actions (`user.action`) from both channel and gateway requests.
## Gateway Lock
Single-client mode for the WebSocket gateway. When enabled, only one WebSocket connection is allowed at a time. Additional connections are rejected with close code `4003`.
+33
View File
@@ -3,6 +3,39 @@
"updated_at": "2026-02-16",
"description": "Tracks the status of all Flynn plans and implementation phases",
"plans": {
"backup-session-summary-audit-trail": {
"status": "completed",
"date": "2026-02-16",
"updated": "2026-02-16",
"summary": "Implemented operator resilience features: backup snapshots with MinIO upload support (`flynn backup` + optional daemon scheduler), optional end-of-session summarization for WebSocket sessions with memory persistence, and explicit `user.action` audit events across channel and gateway message entry points.",
"files_modified": [
"src/config/schema.ts",
"src/config/schema.test.ts",
"src/config/index.ts",
"src/daemon/index.ts",
"src/daemon/routing.ts",
"src/daemon/routing.test.ts",
"src/gateway/session-bridge.ts",
"src/gateway/session-bridge.test.ts",
"src/gateway/handlers/agent.ts",
"src/gateway/handlers/agent.test.ts",
"src/audit/types.ts",
"src/audit/logger.ts",
"src/session/index.ts",
"src/session/endSummary.ts",
"src/session/endSummary.test.ts",
"src/backup/run.ts",
"src/backup/run.test.ts",
"src/backup/index.ts",
"src/cli/backup.ts",
"src/cli/index.ts",
"src/cli/index.test.ts",
"config/default.yaml",
"README.md",
"scripts/backup-to-minio.sh"
],
"test_status": "pnpm test:run src/backup/run.test.ts src/session/endSummary.test.ts src/config/schema.test.ts src/gateway/session-bridge.test.ts src/gateway/handlers/agent.test.ts src/daemon/routing.test.ts src/cli/index.test.ts + pnpm typecheck passing"
},
"docs-agent-oriented-diagrams-pass": {
"status": "completed",
"date": "2026-02-15",
+6
View File
@@ -16,6 +16,7 @@ import type {
SessionMessageEvent,
SessionDeleteEvent,
SessionCompactEvent,
UserActionEvent,
CronTriggerEvent,
WebhookReceiveEvent,
HeartbeatCycleEvent,
@@ -174,6 +175,11 @@ export class AuditLogger {
this.write({ level: 'debug', event_type: 'session.compact', event: event as unknown as Record<string, unknown> });
}
userAction(event: UserActionEvent): void {
if (!this.shouldLog('sessions', 'info')) {return;}
this.write({ level: 'info', event_type: 'user.action', event: event as unknown as Record<string, unknown> });
}
sessionTransfer(from: string, to: string, messageCount: number): void {
if (!this.shouldLog('sessions', 'debug')) {return;}
this.write({
+12 -1
View File
@@ -10,7 +10,7 @@ export type AuditEventType =
// Skills installer
| 'skills.installer.execution_blocked' | 'skills.installer.command_result' | 'skills.registry_install'
// Session lifecycle
| 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact'
| 'session.create' | 'session.message' | 'session.delete' | 'session.transfer' | 'session.compact' | 'user.action'
// Automation - Cron
| 'cron.trigger' | 'cron.sent' | 'cron.add' | 'cron.remove'
// Automation - Webhook
@@ -182,6 +182,17 @@ export interface SessionCompactEvent {
tokens_after: number;
}
export interface UserActionEvent {
session_id: string;
channel: string;
sender: string;
source: 'gateway' | 'channel';
action_type: 'message' | 'command';
content_length: number;
attachments_count?: number;
command?: string;
}
export interface CronTriggerEvent {
job_name: string;
schedule: string;
+72
View File
@@ -8,6 +8,7 @@ import { CommandRegistry, registerBuiltinCommands } from '../commands/index.js';
import { ComponentRegistry } from '../intents/index.js';
import { RoutingPolicy } from '../routing/index.js';
import type { OutboundMessage } from '../channels/index.js';
import { initAuditLogger } from '../audit/index.js';
type MessageRouterDeps = Parameters<typeof createMessageRouter>[0];
type MessageRouterInput = Parameters<ReturnType<typeof createMessageRouter>['handler']>[0];
@@ -144,6 +145,77 @@ describe('daemon command fast-path integration', () => {
expect(session.deleteConfig).toHaveBeenCalledWith('modelTier');
});
it('emits user.action audit events for channel messages', async () => {
const mockAuditLogger = {
userAction: vi.fn(),
};
initAuditLogger(mockAuditLogger as any);
const session = {
id: 'telegram:user-audit',
addMessage: vi.fn(),
getHistory: vi.fn(() => []),
clear: vi.fn(),
replaceHistory: vi.fn(),
getConfig: vi.fn(() => undefined),
setConfig: vi.fn(),
deleteConfig: vi.fn(),
};
const commandRegistry = new CommandRegistry();
registerBuiltinCommands(commandRegistry);
const router = createMessageRouter({
sessionManager: {
getSession: vi.fn(() => session),
} as unknown as MessageRouterDeps['sessionManager'],
modelRouter: {
getAvailableTiers: () => ['fast', 'default', 'complex', 'local'],
getAllLabels: () => ({ fast: 'fast', default: 'default', complex: 'complex', local: 'local' }),
getLabel: (tier: string) => tier,
} as unknown as MessageRouterDeps['modelRouter'],
systemPrompt: 'test prompt',
toolRegistry: {
clone() { return this; },
register: vi.fn(),
} as unknown as MessageRouterDeps['toolRegistry'],
toolExecutor: {} as unknown as MessageRouterDeps['toolExecutor'],
config: {
agents: {
primary_tier: 'default',
delegation: {
compaction: 'fast',
memory_extraction: 'fast',
classification: 'fast',
tool_summarisation: 'fast',
complex_reasoning: 'complex',
},
max_delegation_depth: 3,
max_iterations: 10,
},
compaction: { enabled: false },
models: { default: { provider: 'anthropic', model: 'claude' } },
} as unknown as MessageRouterDeps['config'],
commandRegistry,
});
await router.handler({
id: 'm-audit',
channel: 'telegram',
senderId: 'user-audit',
text: '/reset',
metadata: { isCommand: true, command: 'reset' },
} as unknown as MessageRouterInput, vi.fn(async (_: OutboundMessage) => {}));
expect(mockAuditLogger.userAction).toHaveBeenCalledWith(
expect.objectContaining({
source: 'channel',
action_type: 'command',
channel: 'telegram',
}),
);
});
it('handles model command via fast-path and persists tier override', async () => {
const processSpy = vi.spyOn(AgentOrchestrator.prototype, 'process');
const setModelTierSpy = vi.spyOn(AgentOrchestrator.prototype, 'setModelTier');
+14
View File
@@ -350,6 +350,20 @@ export function createMessageRouter(deps: {
const commandInput = msg.metadata?.isCommand && typeof msg.metadata.command === 'string'
? `/${msg.metadata.command}${msg.metadata.commandArgs ? ` ${msg.metadata.commandArgs}` : ''}`
: incomingText;
const metadataCommand = typeof msg.metadata?.command === 'string' ? msg.metadata.command : undefined;
const parsedCommand = msg.metadata?.isCommand
? metadataCommand
: commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined;
auditLogger?.userAction({
session_id: `${msg.channel}:${msg.senderId}`,
channel: msg.channel,
sender: msg.senderId,
source: 'channel',
action_type: parsedCommand ? 'command' : 'message',
content_length: commandInput.length,
attachments_count: msg.attachments?.length ?? 0,
command: parsedCommand,
});
if (deps.commandRegistry && deps.commandRegistry.isCommand(commandInput)) {
const session = deps.sessionManager.getSession(msg.channel, msg.senderId);
+25
View File
@@ -4,6 +4,7 @@ import { LaneQueue, LaneQueueRejectedError } from '../lane-queue.js';
import { createAgentHandlers } from './agent.js';
import type { AgentHandlerDeps } from './agent.js';
import { CommandRegistry, registerBuiltinCommands } from '../../commands/index.js';
import { initAuditLogger } from '../../audit/index.js';
describe('createAgentHandlers command fast-path', () => {
const mockAgent = {
@@ -35,6 +36,9 @@ describe('createAgentHandlers command fast-path', () => {
const commandRegistry = new CommandRegistry();
registerBuiltinCommands(commandRegistry);
const mockAuditLogger = {
userAction: vi.fn(),
};
const handlers = createAgentHandlers({
sessionBridge: sessionBridge as unknown as AgentHandlerDeps['sessionBridge'],
@@ -45,6 +49,7 @@ describe('createAgentHandlers command fast-path', () => {
beforeEach(() => {
vi.clearAllMocks();
initAuditLogger(mockAuditLogger as any);
mockAgent.process.mockResolvedValue('agent response');
mockAgent.compact.mockResolvedValue(null);
});
@@ -125,6 +130,26 @@ describe('createAgentHandlers command fast-path', () => {
expect(((sent[0] as GatewayEvent).data as { content: string }).content).toBe('agent response');
});
it('emits user.action audit events for gateway requests', async () => {
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
const req: GatewayRequest = {
id: 7,
method: 'agent.send',
params: { message: 'hello there', connectionId: 'conn-1' },
};
await handlers['agent.send'](req, send);
expect(mockAuditLogger.userAction).toHaveBeenCalledWith(
expect.objectContaining({
source: 'gateway',
action_type: 'message',
sender: 'conn-1',
}),
);
});
it('handles /queue command via fast-path and persists queue session config', async () => {
const sent: OutboundMessage[] = [];
const send = vi.fn((msg: OutboundMessage) => sent.push(msg));
+14
View File
@@ -72,6 +72,20 @@ export function createAgentHandlers(deps: AgentHandlerDeps) {
? `/${safeParams.metadata.command}${safeParams.metadata.commandArgs ? ` ${safeParams.metadata.commandArgs}` : ''}`
: (safeParams.message ?? '');
const parsedCommand = safeParams.metadata?.isCommand
? safeParams.metadata.command
: commandInput.startsWith('/') ? commandInput.slice(1).split(/\s+/, 1)[0] : undefined;
auditLogger?.userAction({
session_id: sessionId ?? `ws:${connectionId}`,
channel: 'ws',
sender: connectionId,
source: 'gateway',
action_type: parsedCommand ? 'command' : 'message',
content_length: commandInput.length,
attachments_count: safeParams.attachments?.length ?? 0,
command: parsedCommand,
});
if (commandInput && deps.commandRegistry?.isCommand(commandInput)) {
const sessionId = deps.sessionBridge.getSessionId(connectionId);
const commandResult = await deps.commandRegistry.execute(commandInput, {