Files
flynn/src/session/store.ts
T

353 lines
12 KiB
TypeScript

import Database from 'better-sqlite3';
import type { Message } from '../models/types.js';
import type { PairingStore, ApprovedSender } from '../channels/pairing.js';
import type { HistoryMetadata } from './indexer.js';
/** Parse a duration string like '30d', '7d', '12h' to milliseconds. Returns null if invalid or '0'. */
export function parseDuration(s: string): number | null {
if (s === '0' || s === 'false') {return null;}
const match = s.match(/^(\d+)(h|d)$/);
if (!match) {return null;}
const [, n, unit] = match;
return unit === 'h' ? Number(n) * 3600_000 : Number(n) * 86_400_000;
}
export interface SessionDailyAnalyticsRow {
day: string;
sessions: number;
messages: number;
}
export interface SessionTopAnalyticsRow {
sessionId: string;
messages: number;
lastActivity: number;
}
export interface SessionAnalyticsSnapshot {
daily: SessionDailyAnalyticsRow[];
topSessions: SessionTopAnalyticsRow[];
averageMessagesPerSession: number;
totalSessions: number;
totalMessages: number;
}
export class SessionStore {
private db: Database.Database;
constructor(dbPath: string) {
this.db = new Database(dbPath);
this.init();
}
private init(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id);
CREATE TABLE IF NOT EXISTS pairing_approved (
channel TEXT NOT NULL,
sender_id TEXT NOT NULL,
approved_at INTEGER NOT NULL,
code_used TEXT NOT NULL,
PRIMARY KEY (channel, sender_id)
);
CREATE TABLE IF NOT EXISTS session_config (
session_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
PRIMARY KEY (session_id, key)
);
CREATE INDEX IF NOT EXISTS idx_session_config_session ON session_config(session_id);
`);
const messageColumns = this.db.prepare('PRAGMA table_info(messages)').all() as Array<{ name: string }>;
if (!messageColumns.some(column => column.name === 'metadata')) {
this.db.exec('ALTER TABLE messages ADD COLUMN metadata TEXT');
}
}
addMessage(sessionId: string, message: Message, metadata?: HistoryMetadata): void {
const createdAtSeconds = Math.floor((message.timestamp ?? Date.now()) / 1000);
const stmt = this.db.prepare(
'INSERT INTO messages (session_id, role, content, created_at, metadata) VALUES (?, ?, ?, ?, ?)',
);
stmt.run(sessionId, message.role, message.content, createdAtSeconds, metadata ? JSON.stringify(metadata) : null);
}
getMessages(sessionId: string): Message[] {
const stmt = this.db.prepare(
'SELECT role, content, created_at FROM messages WHERE session_id = ? ORDER BY id ASC',
);
const rows = stmt.all(sessionId) as Array<{ role: string; content: string; created_at: number }>;
return rows.map(row => ({
role: row.role as 'user' | 'assistant',
content: row.content,
timestamp: row.created_at * 1000,
}));
}
/**
* Atomically replace all messages for a session.
* Used by compaction to swap full history with a compacted version.
* Runs in a transaction: delete all → re-insert in order.
*/
replaceMessages(sessionId: string, messages: Message[]): void {
const transaction = this.db.transaction(() => {
// Delete existing messages
this.db.prepare('DELETE FROM messages WHERE session_id = ?').run(sessionId);
// Re-insert in order
const insert = this.db.prepare(
'INSERT INTO messages (session_id, role, content, created_at, metadata) VALUES (?, ?, ?, ?, ?)',
);
for (const msg of messages) {
const createdAtSeconds = Math.floor((msg.timestamp ?? Date.now()) / 1000);
insert.run(sessionId, msg.role, msg.content, createdAtSeconds, null);
}
});
transaction();
}
clearSession(sessionId: string): void {
const transaction = this.db.transaction(() => {
this.db.prepare('DELETE FROM messages WHERE session_id = ?').run(sessionId);
this.db.prepare('DELETE FROM session_config WHERE session_id = ?').run(sessionId);
});
transaction();
}
listSessions(): string[] {
const stmt = this.db.prepare('SELECT DISTINCT session_id FROM messages');
const rows = stmt.all() as Array<{ session_id: string }>;
return rows.map(row => row.session_id);
}
/** Delete all messages for sessions with no activity since the given timestamp. Returns pruned session IDs. */
pruneStale(beforeTimestamp: number): string[] {
const stale = this.db.prepare(`
SELECT session_id FROM messages
GROUP BY session_id
HAVING MAX(created_at) < ?
`).all(beforeTimestamp) as Array<{ session_id: string }>;
if (stale.length === 0) {return [];}
const deleteMessages = this.db.prepare('DELETE FROM messages WHERE session_id = ?');
const deleteConfig = this.db.prepare('DELETE FROM session_config WHERE session_id = ?');
const transaction = this.db.transaction(() => {
for (const { session_id } of stale) {
deleteMessages.run(session_id);
deleteConfig.run(session_id);
}
});
transaction();
return stale.map(r => r.session_id);
}
getPairingStore(): PairingStore {
return {
loadApproved: (): ApprovedSender[] => {
const rows = this.db.prepare(
'SELECT channel, sender_id, approved_at, code_used FROM pairing_approved',
).all() as Array<{ channel: string; sender_id: string; approved_at: number; code_used: string }>;
return rows.map(r => ({
channel: r.channel,
senderId: r.sender_id,
approvedAt: r.approved_at,
codeUsed: r.code_used,
}));
},
saveApproved: (sender: ApprovedSender): void => {
this.db.prepare(`
INSERT OR REPLACE INTO pairing_approved (channel, sender_id, approved_at, code_used)
VALUES (?, ?, ?, ?)
`).run(sender.channel, sender.senderId, sender.approvedAt, sender.codeUsed);
},
removeApproved: (channel: string, senderId: string): void => {
this.db.prepare(
'DELETE FROM pairing_approved WHERE channel = ? AND sender_id = ?',
).run(channel, senderId);
},
};
}
/** Get a single config value for a session. */
getSessionConfig(sessionId: string, key: string): string | undefined {
const stmt = this.db.prepare(
'SELECT value FROM session_config WHERE session_id = ? AND key = ?',
);
const row = stmt.get(sessionId, key) as { value: string } | undefined;
return row?.value;
}
/** Get all config values for a session. */
getAllSessionConfig(sessionId: string): Record<string, string> {
const stmt = this.db.prepare(
'SELECT key, value FROM session_config WHERE session_id = ?',
);
const rows = stmt.all(sessionId) as Array<{ key: string; value: string }>;
const result: Record<string, string> = {};
for (const row of rows) {
result[row.key] = row.value;
}
return result;
}
/** Set a config value for a session. Upserts (INSERT OR REPLACE). */
setSessionConfig(sessionId: string, key: string, value: string): void {
this.db.prepare(
'INSERT OR REPLACE INTO session_config (session_id, key, value) VALUES (?, ?, ?)',
).run(sessionId, key, value);
}
/** Delete a config value for a session. */
deleteSessionConfig(sessionId: string, key: string): void {
this.db.prepare(
'DELETE FROM session_config WHERE session_id = ? AND key = ?',
).run(sessionId, key);
}
/** Delete all config for a session (used when session is cleared/pruned). */
clearSessionConfig(sessionId: string): void {
this.db.prepare(
'DELETE FROM session_config WHERE session_id = ?',
).run(sessionId);
}
close(): void {
this.db.close();
}
getMessagesWithMetadata(sessionId: string): Array<{
id: number;
sessionId: string;
role: 'user' | 'assistant';
content: string;
createdAt: number;
metadata: HistoryMetadata | null;
}> {
const stmt = this.db.prepare(
'SELECT id, session_id, role, content, created_at, metadata FROM messages WHERE session_id = ? ORDER BY id ASC',
);
const rows = stmt.all(sessionId) as Array<{
id: number;
session_id: string;
role: string;
content: string;
created_at: number;
metadata: string | null;
}>;
return rows.map(row => ({
id: row.id,
sessionId: row.session_id,
role: row.role as 'user' | 'assistant',
content: row.content,
createdAt: row.created_at,
metadata: row.metadata ? JSON.parse(row.metadata) as HistoryMetadata : null,
}));
}
getAllMessagesWithMetadata(): Array<{
id: number;
sessionId: string;
role: 'user' | 'assistant';
content: string;
createdAt: number;
metadata: HistoryMetadata | null;
}> {
const stmt = this.db.prepare(
'SELECT id, session_id, role, content, created_at, metadata FROM messages ORDER BY id ASC',
);
const rows = stmt.all() as Array<{
id: number;
session_id: string;
role: string;
content: string;
created_at: number;
metadata: string | null;
}>;
return rows.map(row => ({
id: row.id,
sessionId: row.session_id,
role: row.role as 'user' | 'assistant',
content: row.content,
createdAt: row.created_at,
metadata: row.metadata ? JSON.parse(row.metadata) as HistoryMetadata : null,
}));
}
updateMessageMetadata(messageId: number, metadata: HistoryMetadata): void {
this.db.prepare('UPDATE messages SET metadata = ? WHERE id = ?').run(JSON.stringify(metadata), messageId);
}
getSessionAnalytics(opts: { sinceTimestamp: number; topLimit?: number }): SessionAnalyticsSnapshot {
const since = opts.sinceTimestamp;
const topLimit = opts.topLimit ?? 10;
const dailyRows = this.db.prepare(`
SELECT
date(created_at, 'unixepoch') AS day,
COUNT(DISTINCT session_id) AS sessions,
COUNT(*) AS messages
FROM messages
WHERE created_at >= ?
GROUP BY day
ORDER BY day DESC
`).all(since) as Array<{ day: string; sessions: number; messages: number }>;
const topRows = this.db.prepare(`
SELECT
session_id,
COUNT(*) AS messages,
MAX(created_at) AS last_activity
FROM messages
WHERE created_at >= ?
GROUP BY session_id
ORDER BY messages DESC, last_activity DESC
LIMIT ?
`).all(since, topLimit) as Array<{ session_id: string; messages: number; last_activity: number }>;
const totalMessagesRow = this.db.prepare(`
SELECT COUNT(*) AS total_messages
FROM messages
WHERE created_at >= ?
`).get(since) as { total_messages: number };
const totalSessionsRow = this.db.prepare(`
SELECT COUNT(DISTINCT session_id) AS total_sessions
FROM messages
WHERE created_at >= ?
`).get(since) as { total_sessions: number };
const totalMessages = totalMessagesRow.total_messages ?? 0;
const totalSessions = totalSessionsRow.total_sessions ?? 0;
const averageMessagesPerSession = totalSessions > 0
? Math.round((totalMessages / totalSessions) * 100) / 100
: 0;
return {
daily: dailyRows.map((row) => ({
day: row.day,
sessions: row.sessions,
messages: row.messages,
})),
topSessions: topRows.map((row) => ({
sessionId: row.session_id,
messages: row.messages,
lastActivity: row.last_activity,
})),
averageMessagesPerSession,
totalSessions,
totalMessages,
};
}
}