From 88731a50e3dd518564f49d5411d109c7e3aad32e Mon Sep 17 00:00:00 2001 From: William Valentin Date: Sat, 7 Feb 2026 14:45:11 -0800 Subject: [PATCH] feat: add heartbeat monitor and vector memory search (Tier 2) Heartbeat: - HeartbeatMonitor with 5 checks: gateway, model, channels, memory, disk - Configurable interval, failure threshold, notification channel - Recovery notifications when health restores - 25 new tests Vector Memory Search: - EmbeddingProvider interface with OpenAI, Gemini, Ollama, LlamaCpp backends - SQLite-backed VectorStore with cosine similarity search - Text chunker with paragraph-aware splitting and overlap - HybridSearch merging keyword + vector results with configurable weight - Background indexer with dirty-namespace tracking - Graceful fallback to keyword search when embeddings unavailable - 51 new tests Config: automation.heartbeat + memory.embedding schema sections Total: 950 tests passing, all types clean --- src/automation/heartbeat.test.ts | 418 +++++++++++++++++++++++++++++ src/automation/heartbeat.ts | 307 +++++++++++++++++++++ src/automation/index.ts | 2 + src/config/schema.ts | 35 +++ src/daemon/index.ts | 81 +++++- src/memory/chunker.test.ts | 106 ++++++++ src/memory/chunker.ts | 163 +++++++++++ src/memory/embeddings.test.ts | 159 +++++++++++ src/memory/embeddings.ts | 182 +++++++++++++ src/memory/hybrid-search.test.ts | 213 +++++++++++++++ src/memory/hybrid-search.ts | 182 +++++++++++++ src/memory/index.ts | 8 + src/memory/store.ts | 23 ++ src/memory/vector-store.test.ts | 209 +++++++++++++++ src/memory/vector-store.ts | 232 ++++++++++++++++ src/tools/builtin/index.ts | 5 +- src/tools/builtin/memory-search.ts | 36 ++- 17 files changed, 2354 insertions(+), 7 deletions(-) create mode 100644 src/automation/heartbeat.test.ts create mode 100644 src/automation/heartbeat.ts create mode 100644 src/memory/chunker.test.ts create mode 100644 src/memory/chunker.ts create mode 100644 src/memory/embeddings.test.ts create mode 100644 src/memory/embeddings.ts create mode 100644 src/memory/hybrid-search.test.ts create mode 100644 src/memory/hybrid-search.ts create mode 100644 src/memory/vector-store.test.ts create mode 100644 src/memory/vector-store.ts diff --git a/src/automation/heartbeat.test.ts b/src/automation/heartbeat.test.ts new file mode 100644 index 0000000..c988ced --- /dev/null +++ b/src/automation/heartbeat.test.ts @@ -0,0 +1,418 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { HeartbeatMonitor, parseInterval } from './heartbeat.js'; +import type { HeartbeatDeps } from './heartbeat.js'; +import type { HeartbeatConfig } from '../config/schema.js'; + +function makeConfig(overrides?: Partial): HeartbeatConfig { + return { + enabled: true, + interval: '5m', + checks: ['gateway', 'model', 'channels', 'memory', 'disk'], + failure_threshold: 2, + disk_threshold_mb: 100, + ...overrides, + }; +} + +function makeDeps(overrides?: Partial): HeartbeatDeps { + return { + config: makeConfig(), + getGatewayPort: () => 18800, + modelRouter: { getTier: () => 'default' }, + channelLister: { + list: () => [ + { name: 'telegram', status: 'connected' } as any, + { name: 'webchat', status: 'connected' } as any, + ], + }, + memoryDir: '/tmp/flynn-test-memory', + dataDir: '/tmp', + channelLookup: { get: vi.fn() }, + ...overrides, + }; +} + +describe('parseInterval', () => { + it('parses seconds', () => { + expect(parseInterval('60s')).toBe(60000); + expect(parseInterval('1s')).toBe(1000); + }); + + it('parses minutes', () => { + expect(parseInterval('5m')).toBe(300000); + expect(parseInterval('1m')).toBe(60000); + }); + + it('parses hours', () => { + expect(parseInterval('1h')).toBe(3600000); + expect(parseInterval('2h')).toBe(7200000); + }); + + it('treats bare numbers as seconds', () => { + expect(parseInterval('30')).toBe(30000); + }); + + it('throws on invalid format', () => { + expect(() => parseInterval('abc')).toThrow('Invalid interval format'); + expect(() => parseInterval('')).toThrow('Invalid interval format'); + }); +}); + +describe('HeartbeatMonitor', () => { + let monitor: HeartbeatMonitor; + + afterEach(() => { + monitor?.stop(); + }); + + it('start() does nothing when enabled: false', () => { + const deps = makeDeps({ config: makeConfig({ enabled: false }) }); + monitor = new HeartbeatMonitor(deps); + + const setIntervalSpy = vi.spyOn(global, 'setInterval'); + monitor.start(); + + expect(setIntervalSpy).not.toHaveBeenCalled(); + setIntervalSpy.mockRestore(); + }); + + it('start() sets an interval when enabled', () => { + const deps = makeDeps({ config: makeConfig({ enabled: true, checks: [] }) }); + monitor = new HeartbeatMonitor(deps); + + const setIntervalSpy = vi.spyOn(global, 'setInterval'); + monitor.start(); + + expect(setIntervalSpy).toHaveBeenCalledWith(expect.any(Function), 300000); + setIntervalSpy.mockRestore(); + }); + + it('stop() clears the timer', () => { + const deps = makeDeps({ config: makeConfig({ enabled: true, checks: [] }) }); + monitor = new HeartbeatMonitor(deps); + + const clearIntervalSpy = vi.spyOn(global, 'clearInterval'); + monitor.start(); + monitor.stop(); + + expect(clearIntervalSpy).toHaveBeenCalled(); + clearIntervalSpy.mockRestore(); + }); + + it('runChecks() runs all configured checks', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['model', 'channels'] }), + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + + expect(result.checks).toHaveLength(2); + expect(result.checks[0].name).toBe('model'); + expect(result.checks[1].name).toBe('channels'); + }); + + it('returns healthy=true when all checks pass', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['model', 'channels'] }), + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + + expect(result.healthy).toBe(true); + expect(result.checks.every((c) => c.healthy)).toBe(true); + }); + + it('returns healthy=false when any check fails', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['model', 'channels'] }), + modelRouter: undefined, // model check will fail + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + + expect(result.healthy).toBe(false); + const modelCheck = result.checks.find((c) => c.name === 'model'); + expect(modelCheck?.healthy).toBe(false); + }); + + it('getLastResult() returns most recent result', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['model'] }), + }); + monitor = new HeartbeatMonitor(deps); + + expect(monitor.getLastResult()).toBeUndefined(); + + await monitor.runChecks(); + + const lastResult = monitor.getLastResult(); + expect(lastResult).toBeDefined(); + expect(lastResult!.checks).toHaveLength(1); + expect(lastResult!.timestamp).toBeGreaterThan(0); + }); + + it('notification sent after failure_threshold consecutive failures', async () => { + const mockSend = vi.fn().mockResolvedValue(undefined); + const mockGet = vi.fn().mockReturnValue({ send: mockSend }); + + const deps = makeDeps({ + config: makeConfig({ + checks: ['model'], + failure_threshold: 2, + notify: { channel: 'telegram', peer: '123' }, + }), + modelRouter: undefined, // will fail + channelLookup: { get: mockGet }, + }); + monitor = new HeartbeatMonitor(deps); + + // First failure — below threshold, no notification + await monitor.runChecks(); + expect(mockSend).not.toHaveBeenCalled(); + + // Second failure — meets threshold, should notify + await monitor.runChecks(); + expect(mockSend).toHaveBeenCalledTimes(1); + expect(mockSend).toHaveBeenCalledWith('123', expect.objectContaining({ + text: expect.stringContaining('FAILING'), + })); + }); + + it('does not send duplicate failure notifications', async () => { + const mockSend = vi.fn().mockResolvedValue(undefined); + const mockGet = vi.fn().mockReturnValue({ send: mockSend }); + + const deps = makeDeps({ + config: makeConfig({ + checks: ['model'], + failure_threshold: 1, + notify: { channel: 'telegram', peer: '123' }, + }), + modelRouter: undefined, + channelLookup: { get: mockGet }, + }); + monitor = new HeartbeatMonitor(deps); + + await monitor.runChecks(); + await monitor.runChecks(); + await monitor.runChecks(); + + // Only one failure notification sent + expect(mockSend).toHaveBeenCalledTimes(1); + }); + + it('recovery notification sent when checks pass after failures', async () => { + const mockSend = vi.fn().mockResolvedValue(undefined); + const mockGet = vi.fn().mockReturnValue({ send: mockSend }); + + let modelRouter: { getTier(): string } | undefined = undefined; + + const deps = makeDeps({ + config: makeConfig({ + checks: ['model'], + failure_threshold: 1, + notify: { channel: 'telegram', peer: '123' }, + }), + modelRouter, + channelLookup: { get: mockGet }, + }); + monitor = new HeartbeatMonitor(deps); + + // Trigger failure notification + await monitor.runChecks(); + expect(mockSend).toHaveBeenCalledTimes(1); + + // "Fix" the model router by replacing deps (use Object.assign to mutate) + Object.assign(deps, { modelRouter: { getTier: () => 'default' } }); + // We need a new monitor since deps is captured + monitor.stop(); + monitor = new HeartbeatMonitor(deps); + + // But the new monitor doesn't have the failure state, so let's test differently: + // Use a single monitor and manipulate the deps object's modelRouter + const mutableDeps = { + config: makeConfig({ + checks: ['model'], + failure_threshold: 1, + notify: { channel: 'telegram', peer: '123' }, + }), + getGatewayPort: () => 18800, + modelRouter: undefined as { getTier(): string } | undefined, + channelLister: { list: () => [] }, + memoryDir: undefined, + dataDir: '/tmp', + channelLookup: { get: mockGet }, + }; + mockSend.mockClear(); + + const monitor2 = new HeartbeatMonitor(mutableDeps); + + // Fail + await monitor2.runChecks(); + expect(mockSend).toHaveBeenCalledTimes(1); // failure notification + + // Now "recover" + mutableDeps.modelRouter = { getTier: () => 'default' }; + // Need to re-create since deps is captured in constructor + // Actually, deps is stored by reference, so mutation works if we mutate the object + await monitor2.runChecks(); + expect(mockSend).toHaveBeenCalledTimes(2); // recovery notification + expect(mockSend).toHaveBeenLastCalledWith('123', expect.objectContaining({ + text: expect.stringContaining('RECOVERED'), + })); + + monitor2.stop(); + }); + + it('no notification when notify config is not set', async () => { + const deps = makeDeps({ + config: makeConfig({ + checks: ['model'], + failure_threshold: 1, + // no notify + }), + modelRouter: undefined, + }); + monitor = new HeartbeatMonitor(deps); + + // Should not throw + await monitor.runChecks(); + await monitor.runChecks(); + }); + + // ── Individual check tests ─────────────────────────────────── + + describe('model check', () => { + it('passes when model router is available', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['model'] }), + modelRouter: { getTier: () => 'fast' }, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'model')!; + expect(check.healthy).toBe(true); + expect(check.message).toContain('fast'); + }); + + it('fails when model router is undefined', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['model'] }), + modelRouter: undefined, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'model')!; + expect(check.healthy).toBe(false); + }); + }); + + describe('channels check', () => { + it('passes when at least one channel is connected', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['channels'] }), + channelLister: { + list: () => [ + { name: 'telegram', status: 'connected' } as any, + { name: 'webchat', status: 'disconnected' } as any, + ], + }, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'channels')!; + expect(check.healthy).toBe(true); + expect(check.message).toContain('1/2 connected'); + expect(check.message).toContain('webchat'); + }); + + it('fails when no channels are connected', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['channels'] }), + channelLister: { + list: () => [ + { name: 'telegram', status: 'disconnected' } as any, + ], + }, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'channels')!; + expect(check.healthy).toBe(false); + }); + }); + + describe('memory check', () => { + it('passes when memory is disabled', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['memory'] }), + memoryDir: undefined, + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'memory')!; + expect(check.healthy).toBe(true); + expect(check.message).toContain('disabled'); + }); + + it('fails when memory dir is not accessible', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['memory'] }), + memoryDir: '/nonexistent/path/that/does/not/exist', + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'memory')!; + expect(check.healthy).toBe(false); + }); + }); + + describe('disk check', () => { + it('passes when enough disk space available', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['disk'], disk_threshold_mb: 1 }), + dataDir: '/tmp', + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'disk')!; + expect(check.healthy).toBe(true); + expect(check.message).toContain('MB available'); + }); + + it('fails when disk space is below threshold', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['disk'], disk_threshold_mb: 999999999 }), + dataDir: '/tmp', + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'disk')!; + expect(check.healthy).toBe(false); + expect(check.message).toContain('Low disk space'); + }); + + it('fails when dataDir does not exist', async () => { + const deps = makeDeps({ + config: makeConfig({ checks: ['disk'] }), + dataDir: '/nonexistent/path/that/does/not/exist', + }); + monitor = new HeartbeatMonitor(deps); + + const result = await monitor.runChecks(); + const check = result.checks.find((c) => c.name === 'disk')!; + expect(check.healthy).toBe(false); + }); + }); +}); diff --git a/src/automation/heartbeat.ts b/src/automation/heartbeat.ts new file mode 100644 index 0000000..414c797 --- /dev/null +++ b/src/automation/heartbeat.ts @@ -0,0 +1,307 @@ +import { statfsSync, accessSync, constants as fsConstants } from 'fs'; +import { request } from 'http'; +import type { HeartbeatConfig, HeartbeatCheck } from '../config/schema.js'; +import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js'; + +/** Result of a single health check. */ +export interface CheckResult { + name: HeartbeatCheck; + healthy: boolean; + message: string; + durationMs: number; +} + +/** Result of a full heartbeat cycle. */ +export interface HeartbeatResult { + healthy: boolean; + checks: CheckResult[]; + timestamp: number; +} + +/** Minimal interface for sending notifications via a channel. */ +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +/** Minimal interface for listing channel adapters. */ +interface ChannelLister { + list(): ChannelAdapter[]; +} + +/** Dependencies injected into HeartbeatMonitor. */ +export interface HeartbeatDeps { + config: HeartbeatConfig; + getGatewayPort: () => number; + modelRouter: { getTier(): string } | undefined; + channelLister: ChannelLister; + memoryDir: string | undefined; + dataDir: string; + channelLookup: ChannelLookup; +} + +/** + * Parse a human-friendly interval string into milliseconds. + * Supports: '60s', '5m', '1h'. Bare numbers are treated as seconds. + */ +export function parseInterval(interval: string): number { + const match = interval.trim().match(/^(\d+(?:\.\d+)?)\s*(s|m|h)?$/i); + if (!match) { + throw new Error(`Invalid interval format: '${interval}'. Use e.g. '60s', '5m', '1h'.`); + } + + const value = parseFloat(match[1]); + const unit = (match[2] ?? 's').toLowerCase(); + + switch (unit) { + case 's': return Math.round(value * 1000); + case 'm': return Math.round(value * 60 * 1000); + case 'h': return Math.round(value * 60 * 60 * 1000); + default: return Math.round(value * 1000); + } +} + +export class HeartbeatMonitor { + private timer: ReturnType | undefined; + private lastResult: HeartbeatResult | undefined; + private consecutiveFailures = 0; + private notifiedFailure = false; + private readonly deps: HeartbeatDeps; + + constructor(deps: HeartbeatDeps) { + this.deps = deps; + } + + /** Start the heartbeat monitor. Does nothing if disabled. */ + start(): void { + if (!this.deps.config.enabled) return; + + const intervalMs = parseInterval(this.deps.config.interval); + console.log(`HeartbeatMonitor: starting (interval=${this.deps.config.interval}, checks=[${this.deps.config.checks.join(', ')}])`); + + this.timer = setInterval(() => { + this.runChecks().catch((err) => { + console.error('HeartbeatMonitor: unexpected error during check cycle:', err); + }); + }, intervalMs); + + // Also run immediately on start + this.runChecks().catch((err) => { + console.error('HeartbeatMonitor: unexpected error during initial check:', err); + }); + } + + /** Stop the heartbeat monitor. */ + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + /** Run all configured checks and return the result. */ + async runChecks(): Promise { + const checks: CheckResult[] = []; + + for (const check of this.deps.config.checks) { + const start = Date.now(); + let result: CheckResult; + + try { + switch (check) { + case 'gateway': + result = await this.checkGateway(start); + break; + case 'model': + result = this.checkModel(start); + break; + case 'channels': + result = this.checkChannels(start); + break; + case 'memory': + result = this.checkMemory(start); + break; + case 'disk': + result = this.checkDisk(start); + break; + default: + result = { name: check, healthy: false, message: `Unknown check: ${check}`, durationMs: Date.now() - start }; + } + } catch (err) { + result = { + name: check, + healthy: false, + message: err instanceof Error ? err.message : 'Unknown error', + durationMs: Date.now() - start, + }; + } + + checks.push(result); + } + + const healthy = checks.every((c) => c.healthy); + const heartbeatResult: HeartbeatResult = { + healthy, + checks, + timestamp: Date.now(), + }; + + this.lastResult = heartbeatResult; + + // Failure tracking and notification + if (!healthy) { + this.consecutiveFailures++; + if (this.consecutiveFailures >= this.deps.config.failure_threshold && !this.notifiedFailure) { + this.notifiedFailure = true; + const failedChecks = checks.filter((c) => !c.healthy).map((c) => `${c.name}: ${c.message}`); + await this.notify(`Heartbeat FAILING (${this.consecutiveFailures} consecutive failures):\n${failedChecks.join('\n')}`); + } + } else { + if (this.notifiedFailure) { + // Recovery notification + await this.notify(`Heartbeat RECOVERED after ${this.consecutiveFailures} consecutive failure(s). All checks passing.`); + } + this.consecutiveFailures = 0; + this.notifiedFailure = false; + } + + return heartbeatResult; + } + + /** Get the most recent heartbeat result. */ + getLastResult(): HeartbeatResult | undefined { + return this.lastResult; + } + + // ── Individual checks ────────────────────────────────────────── + + private async checkGateway(start: number): Promise { + const port = this.deps.getGatewayPort(); + + return new Promise((resolve) => { + const req = request( + { hostname: '127.0.0.1', port, path: '/api/health', method: 'GET', timeout: 5000 }, + (res) => { + // Consume the response body + res.resume(); + const healthy = res.statusCode !== undefined && res.statusCode >= 200 && res.statusCode < 400; + resolve({ + name: 'gateway', + healthy, + message: healthy ? `HTTP ${res.statusCode}` : `HTTP ${res.statusCode ?? 'no response'}`, + durationMs: Date.now() - start, + }); + }, + ); + + req.on('error', (err) => { + resolve({ + name: 'gateway', + healthy: false, + message: err.message, + durationMs: Date.now() - start, + }); + }); + + req.on('timeout', () => { + req.destroy(); + resolve({ + name: 'gateway', + healthy: false, + message: 'Request timed out', + durationMs: Date.now() - start, + }); + }); + + req.end(); + }); + } + + private checkModel(start: number): CheckResult { + // Lightweight check: verify the model router is present and has a tier set + const router = this.deps.modelRouter; + if (!router) { + return { name: 'model', healthy: false, message: 'Model router not available', durationMs: Date.now() - start }; + } + + const tier = router.getTier(); + return { name: 'model', healthy: true, message: `Active tier: ${tier}`, durationMs: Date.now() - start }; + } + + private checkChannels(start: number): CheckResult { + const adapters = this.deps.channelLister.list(); + const connected = adapters.filter((a) => a.status === 'connected'); + const disconnected = adapters.filter((a) => a.status !== 'connected'); + + // Healthy if at least one adapter is connected + const healthy = connected.length > 0; + const details = `${connected.length}/${adapters.length} connected`; + const message = disconnected.length > 0 + ? `${details} (disconnected: ${disconnected.map((a) => a.name).join(', ')})` + : details; + + return { name: 'channels', healthy, message, durationMs: Date.now() - start }; + } + + private checkMemory(start: number): CheckResult { + const memoryDir = this.deps.memoryDir; + if (!memoryDir) { + return { name: 'memory', healthy: true, message: 'Memory store disabled', durationMs: Date.now() - start }; + } + + try { + accessSync(memoryDir, fsConstants.R_OK | fsConstants.W_OK); + return { name: 'memory', healthy: true, message: 'Directory accessible', durationMs: Date.now() - start }; + } catch (err) { + return { + name: 'memory', + healthy: false, + message: err instanceof Error ? err.message : 'Directory not accessible', + durationMs: Date.now() - start, + }; + } + } + + private checkDisk(start: number): CheckResult { + try { + const stats = statfsSync(this.deps.dataDir); + const availableMb = (stats.bavail * stats.bsize) / (1024 * 1024); + const thresholdMb = this.deps.config.disk_threshold_mb; + const healthy = availableMb >= thresholdMb; + + return { + name: 'disk', + healthy, + message: healthy + ? `${Math.round(availableMb)} MB available` + : `Low disk space: ${Math.round(availableMb)} MB available (threshold: ${thresholdMb} MB)`, + durationMs: Date.now() - start, + }; + } catch (err) { + return { + name: 'disk', + healthy: false, + message: err instanceof Error ? err.message : 'Failed to check disk', + durationMs: Date.now() - start, + }; + } + } + + // ── Notification ─────────────────────────────────────────────── + + private async notify(text: string): Promise { + const notifyConfig = this.deps.config.notify; + if (!notifyConfig) return; + + const adapter = this.deps.channelLookup.get(notifyConfig.channel); + if (!adapter) { + console.warn(`HeartbeatMonitor: notification channel '${notifyConfig.channel}' not found`); + return; + } + + try { + await adapter.send(notifyConfig.peer, { text }); + } catch (err) { + console.error('HeartbeatMonitor: failed to send notification:', err); + } + } +} diff --git a/src/automation/index.ts b/src/automation/index.ts index 304f9dd..2c1e3d0 100644 --- a/src/automation/index.ts +++ b/src/automation/index.ts @@ -1,2 +1,4 @@ export { CronScheduler } from './cron.js'; export { WebhookHandler } from './webhooks.js'; +export { HeartbeatMonitor, parseInterval } from './heartbeat.js'; +export type { HeartbeatResult, HeartbeatDeps, CheckResult } from './heartbeat.js'; diff --git a/src/config/schema.ts b/src/config/schema.ts index fba63ea..791e4b2 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -119,9 +119,24 @@ const webhookSchema = z.object({ enabled: z.boolean().default(true), }); +const heartbeatCheckSchema = z.enum(['gateway', 'model', 'channels', 'memory', 'disk']); + +const heartbeatSchema = z.object({ + enabled: z.boolean().default(false), + interval: z.string().default('5m'), + checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']), + notify: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }).optional(), + failure_threshold: z.number().min(1).max(10).default(2), + disk_threshold_mb: z.number().min(10).default(100), +}).default({}); + const automationSchema = z.object({ cron: z.array(cronJobSchema).default([]), webhooks: z.array(webhookSchema).default([]), + heartbeat: heartbeatSchema, }).default({}); const agentsSchema = z.object({ @@ -143,11 +158,27 @@ const agentsSchema = z.object({ max_delegation_depth: z.number().min(1).max(10).default(3), }).default({}); +const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']); + +const embeddingSchema = z.object({ + enabled: z.boolean().default(false), + provider: embeddingProviderSchema.default('openai'), + model: z.string().default('text-embedding-3-small'), + endpoint: z.string().optional(), + api_key: z.string().optional(), + dimensions: z.number().optional(), + chunk_size: z.number().min(64).max(8192).default(512), + chunk_overlap: z.number().min(0).max(1024).default(50), + top_k: z.number().min(1).max(50).default(5), + hybrid_weight: z.number().min(0).max(1).default(0.7), +}).default({}); + const memorySchema = z.object({ enabled: z.boolean().default(true), dir: z.string().optional(), // Default: ~/.local/share/flynn/memory auto_extract: z.boolean().default(true), max_context_tokens: z.number().min(100).max(10000).default(2000), + embedding: embeddingSchema, }).default({}); const compactionSchema = z.object({ @@ -333,3 +364,7 @@ export type RoutingConfig = z.infer; export type ServerConfig = z.infer; export type SessionsConfig = z.infer; export type ThinkingConfig = z.infer; +export type HeartbeatConfig = z.infer; +export type HeartbeatCheck = z.infer; +export type EmbeddingConfig = z.infer; +export type EmbeddingProvider = z.infer; diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 859c9e6..db67519 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -12,10 +12,12 @@ import { HookEngine } from '../hooks/index.js'; import { ToolRegistry, ToolExecutor, ToolPolicy, allBuiltinTools, createWebSearchTools, createProcessTools, ProcessManager, BrowserManager, createBrowserTools, createMediaSendTool, createSessionTools, createAgentsListTool, createMessageSendTool, createCronTools } from '../tools/index.js'; import type { Tool } from '../tools/types.js'; import { MemoryStore } from '../memory/index.js'; +import { VectorStore, HybridSearch, createEmbeddingProvider, chunkText, contentHash } from '../memory/index.js'; +import type { EmbeddingProvider as EmbeddingProviderInterface } from '../memory/index.js'; import { createMemoryTools } from '../tools/builtin/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js'; -import { CronScheduler, WebhookHandler } from '../automation/index.js'; +import { CronScheduler, WebhookHandler, HeartbeatMonitor } from '../automation/index.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; @@ -568,8 +570,65 @@ export async function startDaemon(config: Config): Promise { } // Register memory tools if memory is enabled + let hybridSearch: HybridSearch | undefined; + + if (memoryStore && config.memory.embedding.enabled) { + try { + const embeddingProvider: EmbeddingProviderInterface = createEmbeddingProvider(config.memory.embedding); + const vectorStore = new VectorStore(resolve(dataDir, 'vectors.db')); + hybridSearch = new HybridSearch( + memoryStore, + vectorStore, + embeddingProvider, + config.memory.embedding.hybrid_weight, + ); + + // Background indexer: re-embed dirty namespaces every 30 seconds + const indexerInterval = setInterval(async () => { + const dirtyNamespaces = memoryStore.getDirtyNamespaces(); + for (const ns of dirtyNamespaces) { + try { + const content = memoryStore.read(ns); + if (content.length === 0) { + vectorStore.deleteNamespace(ns); + continue; + } + + const hash = contentHash(content); + if (vectorStore.hasContentHash(ns, hash)) continue; + + const chunks = chunkText(content, ns, { + chunkSize: config.memory.embedding.chunk_size, + chunkOverlap: config.memory.embedding.chunk_overlap, + }); + + if (chunks.length > 0) { + const embeddings = await embeddingProvider.embed(chunks.map((c) => c.text)); + vectorStore.upsertChunks(chunks, embeddings, hash); + } + } catch (err) { + console.error(`Failed to index namespace "${ns}":`, err); + } + } + }, 30_000); + + // Initial full index — mark all existing namespaces as dirty + memoryStore.markAllDirty(); + + lifecycle.onShutdown(async () => { + clearInterval(indexerInterval); + vectorStore.close(); + console.log('Vector store closed'); + }); + + console.log(`Vector memory search enabled (provider=${config.memory.embedding.provider}, model=${config.memory.embedding.model})`); + } catch (err) { + console.error('Failed to initialize vector search:', err); + } + } + if (memoryStore) { - for (const tool of createMemoryTools(memoryStore)) { + for (const tool of createMemoryTools(memoryStore, hybridSearch)) { toolRegistry.register(tool); } } @@ -878,6 +937,24 @@ export async function startDaemon(config: Config): Promise { await gateway.start(); + // ── Heartbeat Monitor ────────────────────────────────────────── + const heartbeatMonitor = new HeartbeatMonitor({ + config: config.automation.heartbeat, + getGatewayPort: () => config.server.port, + modelRouter, + channelLister: channelRegistry, + memoryDir: config.memory.enabled ? memoryDir : undefined, + dataDir, + channelLookup: channelRegistry, + }); + + heartbeatMonitor.start(); + + lifecycle.onShutdown(async () => { + heartbeatMonitor.stop(); + console.log('Heartbeat monitor stopped'); + }); + console.log('Flynn daemon started'); return { diff --git a/src/memory/chunker.test.ts b/src/memory/chunker.test.ts new file mode 100644 index 0000000..ed02974 --- /dev/null +++ b/src/memory/chunker.test.ts @@ -0,0 +1,106 @@ +import { describe, it, expect } from 'vitest'; +import { chunkText } from './chunker.js'; +import type { Chunk } from './chunker.js'; + +describe('chunkText', () => { + it('returns empty array for empty content', () => { + expect(chunkText('', 'test')).toEqual([]); + expect(chunkText(' \n\n ', 'test')).toEqual([]); + }); + + it('returns single chunk for small content', () => { + const content = 'Hello world\nSecond line'; + const chunks = chunkText(content, 'notes', { chunkSize: 1000, chunkOverlap: 0 }); + + expect(chunks).toHaveLength(1); + expect(chunks[0].text).toBe('Hello world\nSecond line'); + expect(chunks[0].namespace).toBe('notes'); + expect(chunks[0].startLine).toBe(1); + expect(chunks[0].endLine).toBe(2); + }); + + it('splits on paragraph boundaries (double newline)', () => { + const content = 'Paragraph one line one\nParagraph one line two\n\nParagraph two line one\nParagraph two line two'; + const chunks = chunkText(content, 'test', { chunkSize: 30, chunkOverlap: 0 }); + + // Should split into two chunks at the paragraph boundary + expect(chunks.length).toBeGreaterThanOrEqual(2); + expect(chunks[0].text).toContain('Paragraph one'); + expect(chunks[1].text).toContain('Paragraph two'); + }); + + it('merges small paragraphs to reach target chunk size', () => { + const content = 'A\n\nB\n\nC\n\nD'; + const chunks = chunkText(content, 'test', { chunkSize: 100, chunkOverlap: 0 }); + + // All paragraphs are tiny, so they should all fit in one chunk + expect(chunks).toHaveLength(1); + expect(chunks[0].text).toContain('A'); + expect(chunks[0].text).toContain('D'); + }); + + it('tracks line numbers accurately', () => { + const content = 'Line one\n\nLine three\n\nLine five'; + const chunks = chunkText(content, 'test', { chunkSize: 10, chunkOverlap: 0 }); + + // First chunk should start at line 1 + expect(chunks[0].startLine).toBe(1); + expect(chunks[0].endLine).toBe(1); + + // Line three is on actual line 3 + const lineThreeChunk = chunks.find((c) => c.text.includes('Line three')); + expect(lineThreeChunk).toBeDefined(); + expect(lineThreeChunk!.startLine).toBe(3); + + // Line five is on actual line 5 + const lineFiveChunk = chunks.find((c) => c.text.includes('Line five')); + expect(lineFiveChunk).toBeDefined(); + expect(lineFiveChunk!.startLine).toBe(5); + }); + + it('includes overlap between consecutive chunks', () => { + // Create content with clear paragraphs that force splitting + const para1 = 'First paragraph with enough text to matter'; + const para2 = 'Second paragraph with some more text'; + const para3 = 'Third paragraph and final content here'; + const content = `${para1}\n\n${para2}\n\n${para3}`; + + // Use a chunk size that forces splitting, with overlap + const chunks = chunkText(content, 'test', { chunkSize: 50, chunkOverlap: 40 }); + + // With overlap, later chunks should contain content from previous paragraphs + if (chunks.length >= 2) { + // Check that there's some content overlap between consecutive chunks + const lastChunk = chunks[chunks.length - 1]; + const prevChunk = chunks[chunks.length - 2]; + // Either chunks share content or at least have proper sequencing + expect(lastChunk.startLine).toBeLessThanOrEqual(prevChunk.endLine + 5); + } + }); + + it('preserves namespace in all chunks', () => { + const content = 'Para one\n\nPara two\n\nPara three'; + const chunks = chunkText(content, 'sessions/abc123', { chunkSize: 10, chunkOverlap: 0 }); + + for (const chunk of chunks) { + expect(chunk.namespace).toBe('sessions/abc123'); + } + }); + + it('handles content with multiple consecutive blank lines', () => { + const content = 'First\n\n\n\nSecond'; + const chunks = chunkText(content, 'test', { chunkSize: 1000, chunkOverlap: 0 }); + + expect(chunks.length).toBeGreaterThanOrEqual(1); + expect(chunks.some((c) => c.text.includes('First'))).toBe(true); + expect(chunks.some((c) => c.text.includes('Second'))).toBe(true); + }); + + it('handles single-line content', () => { + const chunks = chunkText('single line', 'test', { chunkSize: 100, chunkOverlap: 0 }); + expect(chunks).toHaveLength(1); + expect(chunks[0].text).toBe('single line'); + expect(chunks[0].startLine).toBe(1); + expect(chunks[0].endLine).toBe(1); + }); +}); diff --git a/src/memory/chunker.ts b/src/memory/chunker.ts new file mode 100644 index 0000000..62412c8 --- /dev/null +++ b/src/memory/chunker.ts @@ -0,0 +1,163 @@ +/** + * Text chunker that splits markdown content into overlapping chunks + * for embedding generation. + */ + +/** + * A single chunk of text extracted from a memory namespace. + */ +export interface Chunk { + /** The chunk text content. */ + text: string; + /** The memory namespace this chunk came from. */ + namespace: string; + /** 1-based start line number in the original content. */ + startLine: number; + /** 1-based end line number in the original content. */ + endLine: number; +} + +export interface ChunkOptions { + /** Target chunk size in characters. */ + chunkSize: number; + /** Number of overlapping characters between consecutive chunks. */ + chunkOverlap: number; +} + +const DEFAULT_CHUNK_OPTIONS: ChunkOptions = { + chunkSize: 512, + chunkOverlap: 50, +}; + +/** + * Split content into overlapping chunks suitable for embedding. + * + * Strategy: + * 1. Split on paragraph boundaries (double newline). + * 2. Merge small paragraphs to reach target chunk size. + * 3. Track line numbers accurately through splits. + * 4. Add overlap from previous chunk for context continuity. + */ +export function chunkText( + content: string, + namespace: string, + options?: Partial, +): Chunk[] { + const opts = { ...DEFAULT_CHUNK_OPTIONS, ...options }; + + if (content.trim().length === 0) { + return []; + } + + const lines = content.split('\n'); + + // Build paragraph groups: each paragraph is a contiguous set of lines + // separated by blank lines (double newline boundaries). + const paragraphs: { text: string; startLine: number; endLine: number }[] = []; + let currentLines: string[] = []; + let currentStart = 1; // 1-based + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + const lineNum = i + 1; // 1-based + + if (line.trim() === '' && currentLines.length > 0) { + // End of a paragraph + paragraphs.push({ + text: currentLines.join('\n'), + startLine: currentStart, + endLine: lineNum - 1, + }); + currentLines = []; + currentStart = lineNum + 1; + } else if (line.trim() !== '') { + if (currentLines.length === 0) { + currentStart = lineNum; + } + currentLines.push(line); + } else { + // Empty line and no current paragraph — advance start + currentStart = lineNum + 1; + } + } + + // Flush remaining + if (currentLines.length > 0) { + paragraphs.push({ + text: currentLines.join('\n'), + startLine: currentStart, + endLine: lines.length, + }); + } + + if (paragraphs.length === 0) { + return []; + } + + // Merge paragraphs into chunks, respecting the target size + const chunks: Chunk[] = []; + let chunkParagraphs: typeof paragraphs = []; + let chunkLength = 0; + + for (const para of paragraphs) { + const paraLength = para.text.length; + + // If adding this paragraph would exceed the target, flush current chunk + if (chunkLength > 0 && chunkLength + paraLength + 1 > opts.chunkSize) { + chunks.push(buildChunk(chunkParagraphs, namespace)); + // Start a new chunk — include overlap from previous chunk + const overlapChunk = getOverlapParagraphs(chunkParagraphs, opts.chunkOverlap); + chunkParagraphs = overlapChunk; + chunkLength = overlapChunk.reduce((sum, p) => sum + p.text.length, 0); + } + + chunkParagraphs.push(para); + chunkLength += paraLength + (chunkLength > 0 ? 1 : 0); // +1 for separator + } + + // Flush remaining + if (chunkParagraphs.length > 0) { + chunks.push(buildChunk(chunkParagraphs, namespace)); + } + + return chunks; +} + +/** Build a Chunk from a list of paragraph entries. */ +function buildChunk( + paragraphs: { text: string; startLine: number; endLine: number }[], + namespace: string, +): Chunk { + return { + text: paragraphs.map((p) => p.text).join('\n\n'), + namespace, + startLine: paragraphs[0].startLine, + endLine: paragraphs[paragraphs.length - 1].endLine, + }; +} + +/** + * Get trailing paragraphs from the previous chunk for overlap. + * Takes paragraphs from the end until we've accumulated enough characters. + */ +function getOverlapParagraphs( + paragraphs: { text: string; startLine: number; endLine: number }[], + overlapChars: number, +): { text: string; startLine: number; endLine: number }[] { + if (overlapChars <= 0) { + return []; + } + + const result: typeof paragraphs = []; + let totalChars = 0; + + for (let i = paragraphs.length - 1; i >= 0; i--) { + totalChars += paragraphs[i].text.length; + result.unshift(paragraphs[i]); + if (totalChars >= overlapChars) { + break; + } + } + + return result; +} diff --git a/src/memory/embeddings.test.ts b/src/memory/embeddings.test.ts new file mode 100644 index 0000000..e7588f6 --- /dev/null +++ b/src/memory/embeddings.test.ts @@ -0,0 +1,159 @@ +import { describe, it, expect } from 'vitest'; +import { + createEmbeddingProvider, + OpenAIEmbeddingProvider, + GeminiEmbeddingProvider, + OllamaEmbeddingProvider, + LlamaCppEmbeddingProvider, +} from './embeddings.js'; +import type { EmbeddingConfig } from '../config/schema.js'; + +describe('createEmbeddingProvider', () => { + const baseConfig: EmbeddingConfig = { + enabled: true, + provider: 'openai', + model: 'text-embedding-3-small', + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + + it('creates OpenAI provider', () => { + const provider = createEmbeddingProvider({ ...baseConfig, provider: 'openai' }); + expect(provider).toBeInstanceOf(OpenAIEmbeddingProvider); + }); + + it('creates Gemini provider', () => { + const provider = createEmbeddingProvider({ ...baseConfig, provider: 'gemini' }); + expect(provider).toBeInstanceOf(GeminiEmbeddingProvider); + }); + + it('creates Ollama provider', () => { + const provider = createEmbeddingProvider({ ...baseConfig, provider: 'ollama' }); + expect(provider).toBeInstanceOf(OllamaEmbeddingProvider); + }); + + it('creates LlamaCpp provider', () => { + const provider = createEmbeddingProvider({ ...baseConfig, provider: 'llamacpp' }); + expect(provider).toBeInstanceOf(LlamaCppEmbeddingProvider); + }); + + it('throws on unknown provider', () => { + expect(() => createEmbeddingProvider({ ...baseConfig, provider: 'unknown' as never })).toThrow('Unknown embedding provider'); + }); +}); + +describe('OpenAIEmbeddingProvider', () => { + it('reports configured dimensions', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'openai', + model: 'text-embedding-3-small', + dimensions: 512, + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + const provider = new OpenAIEmbeddingProvider(config); + expect(provider.dimensions).toBe(512); + }); + + it('defaults to 1536 dimensions', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'openai', + model: 'text-embedding-3-small', + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + const provider = new OpenAIEmbeddingProvider(config); + expect(provider.dimensions).toBe(1536); + }); +}); + +describe('GeminiEmbeddingProvider', () => { + it('reports configured dimensions', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'gemini', + model: 'text-embedding-004', + dimensions: 256, + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + const provider = new GeminiEmbeddingProvider(config); + expect(provider.dimensions).toBe(256); + }); + + it('defaults to 768 dimensions', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'gemini', + model: 'text-embedding-004', + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + const provider = new GeminiEmbeddingProvider(config); + expect(provider.dimensions).toBe(768); + }); +}); + +describe('OllamaEmbeddingProvider', () => { + it('reports configured dimensions', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'ollama', + model: 'nomic-embed-text', + dimensions: 384, + endpoint: 'http://localhost:11434', + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + const provider = new OllamaEmbeddingProvider(config); + expect(provider.dimensions).toBe(384); + }); +}); + +describe('LlamaCppEmbeddingProvider', () => { + it('reports configured dimensions', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'llamacpp', + model: 'unused', + dimensions: 768, + endpoint: 'http://localhost:8080', + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + const provider = new LlamaCppEmbeddingProvider(config); + expect(provider.dimensions).toBe(768); + }); + + it('defaults endpoint to localhost:8080', () => { + const config: EmbeddingConfig = { + enabled: true, + provider: 'llamacpp', + model: 'unused', + dimensions: 768, + chunk_size: 512, + chunk_overlap: 50, + top_k: 5, + hybrid_weight: 0.7, + }; + // Provider should be constructable without endpoint + const provider = new LlamaCppEmbeddingProvider(config); + expect(provider.dimensions).toBe(768); + }); +}); diff --git a/src/memory/embeddings.ts b/src/memory/embeddings.ts new file mode 100644 index 0000000..22e485d --- /dev/null +++ b/src/memory/embeddings.ts @@ -0,0 +1,182 @@ +/** + * Embedding provider interface and implementations for multiple backends. + */ + +import type { EmbeddingConfig } from '../config/schema.js'; + +/** + * Interface for embedding providers that convert text to vectors. + */ +export interface EmbeddingProvider { + /** Generate embeddings for one or more texts. Returns one vector per text. */ + embed(texts: string[]): Promise; + /** The dimensionality of the embedding vectors. */ + dimensions: number; +} + +// --------------------------------------------------------------------------- +// OpenAI +// --------------------------------------------------------------------------- + +export class OpenAIEmbeddingProvider implements EmbeddingProvider { + private _model: string; + private _dimensions: number; + private _apiKey: string; + private _endpoint?: string; + + constructor(config: EmbeddingConfig) { + this._model = config.model; + this._dimensions = config.dimensions ?? 1536; + this._apiKey = config.api_key ?? process.env.OPENAI_API_KEY ?? ''; + this._endpoint = config.endpoint; + } + + get dimensions(): number { + return this._dimensions; + } + + async embed(texts: string[]): Promise { + const { default: OpenAI } = await import('openai'); + const client = new OpenAI({ + apiKey: this._apiKey, + ...(this._endpoint ? { baseURL: this._endpoint } : {}), + }); + + const response = await client.embeddings.create({ + model: this._model, + input: texts, + ...(this._dimensions ? { dimensions: this._dimensions } : {}), + }); + + // Sort by index to ensure order matches input + const sorted = response.data.sort((a, b) => a.index - b.index); + return sorted.map((item) => item.embedding); + } +} + +// --------------------------------------------------------------------------- +// Gemini +// --------------------------------------------------------------------------- + +export class GeminiEmbeddingProvider implements EmbeddingProvider { + private _model: string; + private _dimensions: number; + private _apiKey: string; + + constructor(config: EmbeddingConfig) { + this._model = config.model; + this._dimensions = config.dimensions ?? 768; + this._apiKey = config.api_key ?? process.env.GOOGLE_API_KEY ?? ''; + } + + get dimensions(): number { + return this._dimensions; + } + + async embed(texts: string[]): Promise { + const { GoogleGenerativeAI } = await import('@google/generative-ai'); + const genAI = new GoogleGenerativeAI(this._apiKey); + const model = genAI.getGenerativeModel({ model: this._model }); + + // Use batchEmbedContents for efficiency + const requests = texts.map((text) => ({ + content: { role: 'user' as const, parts: [{ text }] }, + })); + + const response = await model.batchEmbedContents({ requests }); + return response.embeddings.map((e) => e.values); + } +} + +// --------------------------------------------------------------------------- +// Ollama +// --------------------------------------------------------------------------- + +export class OllamaEmbeddingProvider implements EmbeddingProvider { + private _model: string; + private _dimensions: number; + private _host?: string; + + constructor(config: EmbeddingConfig) { + this._model = config.model; + this._dimensions = config.dimensions ?? 768; + this._host = config.endpoint; + } + + get dimensions(): number { + return this._dimensions; + } + + async embed(texts: string[]): Promise { + const { Ollama } = await import('ollama'); + const client = new Ollama({ host: this._host }); + + const response = await client.embed({ + model: this._model, + input: texts, + }); + + return response.embeddings; + } +} + +// --------------------------------------------------------------------------- +// LlamaCpp +// --------------------------------------------------------------------------- + +export class LlamaCppEmbeddingProvider implements EmbeddingProvider { + private _dimensions: number; + private _endpoint: string; + + constructor(config: EmbeddingConfig) { + this._dimensions = config.dimensions ?? 768; + this._endpoint = config.endpoint ?? 'http://localhost:8080'; + } + + get dimensions(): number { + return this._dimensions; + } + + async embed(texts: string[]): Promise { + const results: number[][] = []; + + for (const text of texts) { + const response = await fetch(`${this._endpoint}/embedding`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ content: text }), + }); + + if (!response.ok) { + throw new Error(`LlamaCpp embedding request failed: ${response.status} ${response.statusText}`); + } + + const data = (await response.json()) as { embedding: number[] }; + results.push(data.embedding); + } + + return results; + } +} + +// --------------------------------------------------------------------------- +// Factory +// --------------------------------------------------------------------------- + +/** + * Create an embedding provider from config. + */ +export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvider { + switch (config.provider) { + case 'openai': + return new OpenAIEmbeddingProvider(config); + case 'gemini': + return new GeminiEmbeddingProvider(config); + case 'ollama': + return new OllamaEmbeddingProvider(config); + case 'llamacpp': + return new LlamaCppEmbeddingProvider(config); + default: + throw new Error(`Unknown embedding provider: ${(config as Record).provider}`); + } +} diff --git a/src/memory/hybrid-search.test.ts b/src/memory/hybrid-search.test.ts new file mode 100644 index 0000000..4434747 --- /dev/null +++ b/src/memory/hybrid-search.test.ts @@ -0,0 +1,213 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { HybridSearch } from './hybrid-search.js'; +import type { HybridSearchResult } from './hybrid-search.js'; +import type { MemoryStore, SearchResult } from './store.js'; +import type { VectorStore, VectorSearchResult } from './vector-store.js'; +import type { EmbeddingProvider } from './embeddings.js'; + +/** + * Create a mock MemoryStore with keyword search results. + */ +function mockMemoryStore(results: SearchResult[]): MemoryStore { + return { + search: vi.fn(() => results), + read: vi.fn(() => ''), + write: vi.fn(), + listNamespaces: vi.fn(() => []), + getContextForPrompt: vi.fn(() => ''), + getDirtyNamespaces: vi.fn(() => []), + markAllDirty: vi.fn(), + } as unknown as MemoryStore; +} + +/** + * Create a mock VectorStore that returns given results. + */ +function mockVectorStore(results: VectorSearchResult[]): VectorStore { + return { + search: vi.fn(() => results), + upsertChunks: vi.fn(), + deleteNamespace: vi.fn(), + hasContentHash: vi.fn(() => false), + count: vi.fn(() => 0), + close: vi.fn(), + } as unknown as VectorStore; +} + +/** + * Create a mock embedding provider that returns fixed embeddings. + */ +function mockEmbeddingProvider(dims: number = 4): EmbeddingProvider { + return { + dimensions: dims, + embed: vi.fn(async (texts: string[]) => + texts.map(() => new Array(dims).fill(0.1)), + ), + }; +} + +describe('HybridSearch', () => { + describe('search', () => { + it('returns keyword results when no vector results exist', async () => { + const keywordResults: SearchResult[] = [ + { namespace: 'notes', line: 5, content: 'fox jumped', context: 'the fox jumped over' }, + ]; + + const hybrid = new HybridSearch( + mockMemoryStore(keywordResults), + mockVectorStore([]), + mockEmbeddingProvider(), + 0.7, + ); + + const results = await hybrid.search('fox'); + expect(results.length).toBe(1); + expect(results[0].namespace).toBe('notes'); + expect(results[0].source).toBe('keyword'); + }); + + it('returns vector results when no keyword results exist', async () => { + const vectorResults: VectorSearchResult[] = [ + { namespace: 'journal', chunkText: 'semantic match', startLine: 10, endLine: 20, score: 0.9 }, + ]; + + const hybrid = new HybridSearch( + mockMemoryStore([]), + mockVectorStore(vectorResults), + mockEmbeddingProvider(), + 0.7, + ); + + const results = await hybrid.search('meaning'); + expect(results.length).toBe(1); + expect(results[0].namespace).toBe('journal'); + expect(results[0].source).toBe('vector'); + }); + + it('merges keyword and vector results', async () => { + const keywordResults: SearchResult[] = [ + { namespace: 'notes', line: 5, content: 'fox keyword', context: 'the fox keyword hit' }, + ]; + const vectorResults: VectorSearchResult[] = [ + { namespace: 'journal', chunkText: 'fox semantic', startLine: 10, endLine: 20, score: 0.85 }, + ]; + + const hybrid = new HybridSearch( + mockMemoryStore(keywordResults), + mockVectorStore(vectorResults), + mockEmbeddingProvider(), + 0.7, + ); + + const results = await hybrid.search('fox'); + expect(results.length).toBe(2); + + const namespaces = results.map((r) => r.namespace); + expect(namespaces).toContain('notes'); + expect(namespaces).toContain('journal'); + }); + + it('deduplicates results from same namespace and nearby lines', async () => { + // Both keyword and vector find something at the same location + const keywordResults: SearchResult[] = [ + { namespace: 'notes', line: 5, content: 'fox hit', context: 'context' }, + ]; + const vectorResults: VectorSearchResult[] = [ + { namespace: 'notes', chunkText: 'fox hit too', startLine: 4, endLine: 8, score: 0.9 }, + ]; + + const hybrid = new HybridSearch( + mockMemoryStore(keywordResults), + mockVectorStore(vectorResults), + mockEmbeddingProvider(), + 0.7, + ); + + const results = await hybrid.search('fox'); + // Should be deduplicated to a single "both" result + expect(results.length).toBe(1); + expect(results[0].source).toBe('both'); + }); + + it('applies hybrid weight to scoring', async () => { + const keywordResults: SearchResult[] = [ + { namespace: 'notes', line: 100, content: 'keyword only', context: 'ctx' }, + ]; + const vectorResults: VectorSearchResult[] = [ + { namespace: 'journal', chunkText: 'vector only', startLine: 200, endLine: 210, score: 0.95 }, + ]; + + // High vector weight (0.9) + const hybrid = new HybridSearch( + mockMemoryStore(keywordResults), + mockVectorStore(vectorResults), + mockEmbeddingProvider(), + 0.9, + ); + + const results = await hybrid.search('query'); + expect(results.length).toBe(2); + + // Vector result should rank higher with high vector weight + const vectorResult = results.find((r) => r.source === 'vector'); + const keywordResult = results.find((r) => r.source === 'keyword'); + expect(vectorResult).toBeDefined(); + expect(keywordResult).toBeDefined(); + expect(vectorResult!.score).toBeGreaterThan(keywordResult!.score); + }); + + it('falls back to keyword search when vector search fails', async () => { + const keywordResults: SearchResult[] = [ + { namespace: 'notes', line: 1, content: 'fallback', context: 'ctx' }, + ]; + + const failingProvider: EmbeddingProvider = { + dimensions: 4, + embed: vi.fn(async () => { throw new Error('API error'); }), + }; + + const hybrid = new HybridSearch( + mockMemoryStore(keywordResults), + mockVectorStore([]), + failingProvider, + 0.7, + ); + + // Should not throw — should fall back to keyword results + const results = await hybrid.search('test'); + expect(results.length).toBe(1); + expect(results[0].source).toBe('keyword'); + }); + + it('respects topK limit', async () => { + const keywordResults: SearchResult[] = Array.from({ length: 10 }, (_, i) => ({ + namespace: `ns${i}`, + line: i + 1, + content: `result ${i}`, + context: `ctx ${i}`, + })); + + const hybrid = new HybridSearch( + mockMemoryStore(keywordResults), + mockVectorStore([]), + mockEmbeddingProvider(), + 0.5, + ); + + const results = await hybrid.search('query', 3); + expect(results.length).toBe(3); + }); + + it('returns empty array when both searches find nothing', async () => { + const hybrid = new HybridSearch( + mockMemoryStore([]), + mockVectorStore([]), + mockEmbeddingProvider(), + 0.7, + ); + + const results = await hybrid.search('nonexistent'); + expect(results).toEqual([]); + }); + }); +}); diff --git a/src/memory/hybrid-search.ts b/src/memory/hybrid-search.ts new file mode 100644 index 0000000..04c19c5 --- /dev/null +++ b/src/memory/hybrid-search.ts @@ -0,0 +1,182 @@ +/** + * Hybrid search combining vector similarity with keyword matching. + */ + +import type { MemoryStore, SearchResult } from './store.js'; +import type { VectorStore } from './vector-store.js'; +import type { EmbeddingProvider } from './embeddings.js'; + +/** + * A result from hybrid search combining vector and keyword sources. + */ +export interface HybridSearchResult { + /** The memory namespace the result came from. */ + namespace: string; + /** The matched content text. */ + content: string; + /** Surrounding context lines. */ + context: string; + /** 1-based line number of the match. */ + line: number; + /** Combined relevance score (0-1). */ + score: number; + /** Source of the match: keyword, vector, or both. */ + source: 'keyword' | 'vector' | 'both'; +} + +/** + * Combines keyword search from MemoryStore with vector similarity + * search from VectorStore, deduplicating and merging results with + * configurable weighting. + */ +export class HybridSearch { + private _memoryStore: MemoryStore; + private _vectorStore: VectorStore; + private _embeddingProvider: EmbeddingProvider; + private _hybridWeight: number; + + /** + * @param memoryStore - The keyword-based memory store. + * @param vectorStore - The vector embedding store. + * @param embeddingProvider - Provider for generating query embeddings. + * @param hybridWeight - Weight for vector results (0-1). Keyword weight = 1 - hybridWeight. + */ + constructor( + memoryStore: MemoryStore, + vectorStore: VectorStore, + embeddingProvider: EmbeddingProvider, + hybridWeight: number = 0.7, + ) { + this._memoryStore = memoryStore; + this._vectorStore = vectorStore; + this._embeddingProvider = embeddingProvider; + this._hybridWeight = hybridWeight; + } + + /** + * Run hybrid search combining keyword and vector results. + * + * @param query - The search query string. + * @param topK - Maximum number of results to return. + * @returns Merged and deduplicated results sorted by combined score. + */ + async search(query: string, topK: number = 5): Promise { + // Run keyword and vector search in parallel + const [keywordResults, vectorResults] = await Promise.all([ + this._keywordSearch(query), + this._vectorSearch(query, topK * 2), // fetch more for better merging + ]); + + // Merge and deduplicate + return this._mergeResults(keywordResults, vectorResults, topK); + } + + // --------------------------------------------------------------------------- + // Private + // --------------------------------------------------------------------------- + + private _keywordSearch(query: string): Promise { + // MemoryStore.search is synchronous but we wrap in promise for parallel use + return Promise.resolve(this._memoryStore.search(query)); + } + + private async _vectorSearch( + query: string, + topK: number, + ): Promise { + try { + const [queryEmbedding] = await this._embeddingProvider.embed([query]); + const results = this._vectorStore.search(queryEmbedding, topK); + + return results.map((r) => ({ + namespace: r.namespace, + content: r.chunkText, + context: r.chunkText, + line: r.startLine, + score: r.score, + source: 'vector' as const, + })); + } catch (error) { + // Vector search failure should not break search entirely + console.error('Vector search failed, falling back to keyword only:', error); + return []; + } + } + + /** + * Merge keyword and vector results with deduplication. + * + * Deduplication: two results are considered duplicates if they share the + * same namespace and their line numbers are within 3 lines of each other. + */ + private _mergeResults( + keywordResults: SearchResult[], + vectorResults: HybridSearchResult[], + topK: number, + ): HybridSearchResult[] { + // Normalise keyword scores: assign rank-based scores (best match = 1.0) + const maxKeyword = keywordResults.length; + const keywordScored: HybridSearchResult[] = keywordResults.map((r, idx) => ({ + namespace: r.namespace, + content: r.content, + context: r.context, + line: r.line, + score: maxKeyword > 0 ? 1 - idx / (maxKeyword + 1) : 0, + source: 'keyword' as const, + })); + + // Build a combined map keyed by namespace + approximate line + const resultMap = new Map(); + + // Key function: group results within LINE_PROXIMITY lines together + const LINE_PROXIMITY = 3; + const makeKey = (namespace: string, line: number): string => { + const bucket = Math.floor(line / LINE_PROXIMITY); + return `${namespace}:${bucket}`; + }; + + // Add keyword results first + for (const kr of keywordScored) { + const key = makeKey(kr.namespace, kr.line); + const existing = resultMap.get(key); + if (existing) { + // Combine scores + existing.score = (this._hybridWeight * (existing.source === 'vector' || existing.source === 'both' ? existing.score : 0)) + + ((1 - this._hybridWeight) * kr.score); + existing.source = 'both'; + // Prefer the more specific keyword content + existing.content = kr.content; + existing.context = kr.context; + existing.line = kr.line; + } else { + resultMap.set(key, { + ...kr, + score: (1 - this._hybridWeight) * kr.score, + }); + } + } + + // Add/merge vector results + for (const vr of vectorResults) { + const key = makeKey(vr.namespace, vr.line); + const existing = resultMap.get(key); + if (existing) { + if (existing.source === 'keyword') { + existing.score = (this._hybridWeight * vr.score) + existing.score; + existing.source = 'both'; + } + // If already 'both' or 'vector', keep the higher-scoring version + } else { + resultMap.set(key, { + ...vr, + score: this._hybridWeight * vr.score, + }); + } + } + + // Sort by score descending, return top K + const merged = Array.from(resultMap.values()); + merged.sort((a, b) => b.score - a.score); + return merged.slice(0, topK); + } +} diff --git a/src/memory/index.ts b/src/memory/index.ts index f30ef26..5c4625e 100644 --- a/src/memory/index.ts +++ b/src/memory/index.ts @@ -1,2 +1,10 @@ export { MemoryStore } from './store.js'; export type { MemoryStoreConfig, SearchResult } from './store.js'; +export { chunkText } from './chunker.js'; +export type { Chunk, ChunkOptions } from './chunker.js'; +export { createEmbeddingProvider, OpenAIEmbeddingProvider, GeminiEmbeddingProvider, OllamaEmbeddingProvider, LlamaCppEmbeddingProvider } from './embeddings.js'; +export type { EmbeddingProvider } from './embeddings.js'; +export { VectorStore, cosineSimilarity, contentHash } from './vector-store.js'; +export type { VectorSearchResult, EmbeddingRow } from './vector-store.js'; +export { HybridSearch } from './hybrid-search.js'; +export type { HybridSearchResult } from './hybrid-search.js'; diff --git a/src/memory/store.ts b/src/memory/store.ts index 2261c82..781dfedf 100644 --- a/src/memory/store.ts +++ b/src/memory/store.ts @@ -40,6 +40,7 @@ export interface SearchResult { */ export class MemoryStore { private _config: MemoryStoreConfig; + private _dirtyNamespaces: Set = new Set(); constructor(config: MemoryStoreConfig) { this._config = config; @@ -88,6 +89,9 @@ export class MemoryStore { const separator = existing.length > 0 ? '\n' : ''; writeFileSync(filePath, existing + separator + content, 'utf-8'); } + + // Mark namespace as needing re-indexing for vector search + this._dirtyNamespaces.add(namespace); } /** @@ -147,6 +151,25 @@ export class MemoryStore { return this._scanDir(this._config.dir); } + /** + * Return namespaces that have been modified since last call, then clear + * the dirty set. Used by the background indexer to re-embed changed content. + */ + getDirtyNamespaces(): string[] { + const dirty = Array.from(this._dirtyNamespaces); + this._dirtyNamespaces.clear(); + return dirty; + } + + /** + * Mark all existing namespaces as dirty (e.g. for initial full indexing). + */ + markAllDirty(): void { + for (const ns of this.listNamespaces()) { + this._dirtyNamespaces.add(ns); + } + } + /** * Build memory context suitable for injection into a system prompt. * diff --git a/src/memory/vector-store.test.ts b/src/memory/vector-store.test.ts new file mode 100644 index 0000000..bc03d76 --- /dev/null +++ b/src/memory/vector-store.test.ts @@ -0,0 +1,209 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { VectorStore, cosineSimilarity, contentHash } from './vector-store.js'; +import type { Chunk } from './chunker.js'; +import { mkdtempSync, rmSync } from 'fs'; +import { join } from 'path'; +import { tmpdir } from 'os'; + +describe('cosineSimilarity', () => { + it('returns 1 for identical vectors', () => { + const v = [1, 2, 3]; + expect(cosineSimilarity(v, v)).toBeCloseTo(1.0, 5); + }); + + it('returns 0 for orthogonal vectors', () => { + expect(cosineSimilarity([1, 0], [0, 1])).toBeCloseTo(0.0, 5); + }); + + it('returns -1 for opposite vectors', () => { + expect(cosineSimilarity([1, 0], [-1, 0])).toBeCloseTo(-1.0, 5); + }); + + it('handles Float32Array inputs', () => { + const a = new Float32Array([0.5, 0.5]); + const b = new Float32Array([0.5, 0.5]); + expect(cosineSimilarity(a, b)).toBeCloseTo(1.0, 4); + }); + + it('throws on dimension mismatch', () => { + expect(() => cosineSimilarity([1, 2], [1, 2, 3])).toThrow('dimension mismatch'); + }); + + it('returns 0 for zero vectors', () => { + expect(cosineSimilarity([0, 0], [1, 1])).toBe(0); + }); +}); + +describe('contentHash', () => { + it('returns consistent hash for same input', () => { + const hash1 = contentHash('hello world'); + const hash2 = contentHash('hello world'); + expect(hash1).toBe(hash2); + }); + + it('returns different hash for different input', () => { + expect(contentHash('hello')).not.toBe(contentHash('world')); + }); + + it('returns an 8-character hex string', () => { + const hash = contentHash('test'); + expect(hash).toMatch(/^[0-9a-f]{8}$/); + }); +}); + +describe('VectorStore', () => { + let dir: string; + let store: VectorStore; + + beforeEach(() => { + dir = mkdtempSync(join(tmpdir(), 'flynn-vector-test-')); + store = new VectorStore(join(dir, 'vectors.db')); + }); + + afterEach(() => { + store.close(); + rmSync(dir, { recursive: true, force: true }); + }); + + const makeChunks = (namespace: string, texts: string[]): Chunk[] => + texts.map((text, i) => ({ + text, + namespace, + startLine: i * 10 + 1, + endLine: (i + 1) * 10, + })); + + const makeFakeEmbeddings = (count: number, dims: number = 4): number[][] => + Array.from({ length: count }, (_, i) => { + // Create somewhat distinct embeddings using simple patterns + const vec = new Array(dims).fill(0); + vec[i % dims] = 1.0; + return vec; + }); + + describe('upsertChunks', () => { + it('inserts chunks and embeddings', () => { + const chunks = makeChunks('notes', ['chunk one', 'chunk two']); + const embeddings = makeFakeEmbeddings(2); + + store.upsertChunks(chunks, embeddings, 'hash1'); + expect(store.count()).toBe(2); + }); + + it('replaces existing chunks for same namespace', () => { + const chunks1 = makeChunks('notes', ['old chunk']); + const embeddings1 = makeFakeEmbeddings(1); + store.upsertChunks(chunks1, embeddings1, 'hash1'); + expect(store.count()).toBe(1); + + const chunks2 = makeChunks('notes', ['new chunk 1', 'new chunk 2']); + const embeddings2 = makeFakeEmbeddings(2); + store.upsertChunks(chunks2, embeddings2, 'hash2'); + expect(store.count()).toBe(2); + }); + + it('keeps other namespaces intact when upserting', () => { + store.upsertChunks( + makeChunks('ns1', ['chunk 1']), + makeFakeEmbeddings(1), + 'h1', + ); + store.upsertChunks( + makeChunks('ns2', ['chunk 2']), + makeFakeEmbeddings(1), + 'h2', + ); + expect(store.count()).toBe(2); + + // Upsert ns1 again + store.upsertChunks( + makeChunks('ns1', ['new chunk 1']), + makeFakeEmbeddings(1), + 'h3', + ); + expect(store.count()).toBe(2); // ns2 chunk still there + }); + + it('throws on length mismatch', () => { + const chunks = makeChunks('test', ['a', 'b']); + const embeddings = makeFakeEmbeddings(1); + expect(() => store.upsertChunks(chunks, embeddings, 'h')).toThrow('mismatch'); + }); + + it('does nothing for empty chunks', () => { + store.upsertChunks([], [], 'h'); + expect(store.count()).toBe(0); + }); + }); + + describe('deleteNamespace', () => { + it('removes all embeddings for a namespace', () => { + store.upsertChunks(makeChunks('ns1', ['a']), makeFakeEmbeddings(1), 'h1'); + store.upsertChunks(makeChunks('ns2', ['b']), makeFakeEmbeddings(1), 'h2'); + expect(store.count()).toBe(2); + + store.deleteNamespace('ns1'); + expect(store.count()).toBe(1); + }); + }); + + describe('hasContentHash', () => { + it('returns true when namespace+hash exists', () => { + store.upsertChunks(makeChunks('notes', ['a']), makeFakeEmbeddings(1), 'abc123'); + expect(store.hasContentHash('notes', 'abc123')).toBe(true); + }); + + it('returns false for non-matching hash', () => { + store.upsertChunks(makeChunks('notes', ['a']), makeFakeEmbeddings(1), 'abc123'); + expect(store.hasContentHash('notes', 'different')).toBe(false); + }); + + it('returns false for non-existent namespace', () => { + expect(store.hasContentHash('nonexistent', 'abc')).toBe(false); + }); + }); + + describe('search', () => { + it('returns results sorted by cosine similarity', () => { + // Create embeddings where chunk 1 is close to query and chunk 2 is far + const chunks = makeChunks('test', ['close match', 'far away']); + const embeddings = [ + [0.9, 0.1, 0.0, 0.0], // close to query + [0.0, 0.0, 0.9, 0.1], // far from query + ]; + store.upsertChunks(chunks, embeddings, 'h'); + + const query = [1.0, 0.0, 0.0, 0.0]; // similar to first chunk + const results = store.search(query, 5); + + expect(results.length).toBe(2); + expect(results[0].chunkText).toBe('close match'); + expect(results[0].score).toBeGreaterThan(results[1].score); + }); + + it('respects topK limit', () => { + const chunks = makeChunks('test', ['a', 'b', 'c']); + const embeddings = makeFakeEmbeddings(3); + store.upsertChunks(chunks, embeddings, 'h'); + + const results = store.search([1, 0, 0, 0], 2); + expect(results.length).toBe(2); + }); + + it('returns empty array when no embeddings exist', () => { + const results = store.search([1, 0, 0, 0], 5); + expect(results).toEqual([]); + }); + + it('includes namespace and line info in results', () => { + const chunks = makeChunks('sessions/abc', ['test chunk']); + const embeddings = [[1, 0, 0, 0]]; + store.upsertChunks(chunks, embeddings, 'h'); + + const results = store.search([1, 0, 0, 0], 1); + expect(results[0].namespace).toBe('sessions/abc'); + expect(results[0].startLine).toBe(1); + expect(results[0].endLine).toBe(10); + }); + }); +}); diff --git a/src/memory/vector-store.ts b/src/memory/vector-store.ts new file mode 100644 index 0000000..d051945 --- /dev/null +++ b/src/memory/vector-store.ts @@ -0,0 +1,232 @@ +/** + * SQLite-backed vector storage for embedding chunks. + * Uses better-sqlite3 for synchronous operations and stores + * embeddings as Float32Array BLOBs. + */ + +import Database from 'better-sqlite3'; +import type { Chunk } from './chunker.js'; + +/** + * A single row from the embeddings table. + */ +export interface EmbeddingRow { + id: number; + namespace: string; + chunk_text: string; + start_line: number; + end_line: number; + embedding: Buffer; + created_at: string; + content_hash: string; +} + +/** + * A search result from the vector store. + */ +export interface VectorSearchResult { + namespace: string; + chunkText: string; + startLine: number; + endLine: number; + score: number; +} + +/** + * Compute cosine similarity between two vectors. + * Returns a value between -1 and 1 (1 = identical direction). + */ +export function cosineSimilarity(a: number[] | Float32Array, b: number[] | Float32Array): number { + if (a.length !== b.length) { + throw new Error(`Vector dimension mismatch: ${a.length} vs ${b.length}`); + } + + let dotProduct = 0; + let normA = 0; + let normB = 0; + + for (let i = 0; i < a.length; i++) { + dotProduct += a[i] * b[i]; + normA += a[i] * a[i]; + normB += b[i] * b[i]; + } + + const magnitude = Math.sqrt(normA) * Math.sqrt(normB); + if (magnitude === 0) return 0; + + return dotProduct / magnitude; +} + +/** + * Simple content hash for change detection. + * Uses a fast string hash rather than crypto for speed. + */ +export function contentHash(text: string): string { + // FNV-1a 32-bit hash + let hash = 0x811c9dc5; + for (let i = 0; i < text.length; i++) { + hash ^= text.charCodeAt(i); + hash = Math.imul(hash, 0x01000193); + } + return (hash >>> 0).toString(16).padStart(8, '0'); +} + +/** + * SQLite-backed vector store that persists embedding chunks. + */ +export class VectorStore { + private _db: Database.Database; + + constructor(dbPath: string) { + this._db = new Database(dbPath); + this._db.pragma('journal_mode = WAL'); + this._initSchema(); + } + + // --------------------------------------------------------------------------- + // Schema + // --------------------------------------------------------------------------- + + private _initSchema(): void { + this._db.exec(` + CREATE TABLE IF NOT EXISTS embeddings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + namespace TEXT NOT NULL, + chunk_text TEXT NOT NULL, + start_line INTEGER NOT NULL, + end_line INTEGER NOT NULL, + embedding BLOB NOT NULL, + created_at TEXT NOT NULL DEFAULT (datetime('now')), + content_hash TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_embeddings_namespace + ON embeddings (namespace); + + CREATE INDEX IF NOT EXISTS idx_embeddings_content_hash + ON embeddings (namespace, content_hash); + `); + } + + // --------------------------------------------------------------------------- + // Public API + // --------------------------------------------------------------------------- + + /** + * Upsert chunks for a namespace. + * Deletes all existing embeddings for the namespace, then inserts new ones. + * + * @param chunks - The text chunks with metadata. + * @param embeddings - Corresponding embedding vectors (one per chunk). + * @param hash - Content hash for the entire namespace content. + */ + upsertChunks( + chunks: Chunk[], + embeddings: number[][], + hash: string, + ): void { + if (chunks.length !== embeddings.length) { + throw new Error(`Chunks/embeddings length mismatch: ${chunks.length} vs ${embeddings.length}`); + } + + if (chunks.length === 0) return; + + const namespace = chunks[0].namespace; + + const insertStmt = this._db.prepare(` + INSERT INTO embeddings (namespace, chunk_text, start_line, end_line, embedding, content_hash) + VALUES (?, ?, ?, ?, ?, ?) + `); + + const deleteStmt = this._db.prepare(` + DELETE FROM embeddings WHERE namespace = ? + `); + + const transaction = this._db.transaction(() => { + deleteStmt.run(namespace); + for (let i = 0; i < chunks.length; i++) { + const chunk = chunks[i]; + const embeddingBuffer = Buffer.from(new Float32Array(embeddings[i]).buffer); + insertStmt.run( + chunk.namespace, + chunk.text, + chunk.startLine, + chunk.endLine, + embeddingBuffer, + hash, + ); + } + }); + + transaction(); + } + + /** + * Delete all embeddings for a namespace. + */ + deleteNamespace(namespace: string): void { + this._db.prepare('DELETE FROM embeddings WHERE namespace = ?').run(namespace); + } + + /** + * Check if a namespace already has embeddings with the given content hash. + * Used to skip re-embedding unchanged content. + */ + hasContentHash(namespace: string, hash: string): boolean { + const row = this._db.prepare( + 'SELECT 1 FROM embeddings WHERE namespace = ? AND content_hash = ? LIMIT 1', + ).get(namespace, hash) as { '1': number } | undefined; + return row !== undefined; + } + + /** + * Search for the nearest vectors to the query embedding. + * Uses brute-force cosine similarity (suitable for moderate dataset sizes). + * + * @param queryEmbedding - The query vector. + * @param topK - Maximum number of results to return. + * @returns Ranked results sorted by descending similarity score. + */ + search(queryEmbedding: number[], topK: number): VectorSearchResult[] { + const rows = this._db.prepare( + 'SELECT namespace, chunk_text, start_line, end_line, embedding FROM embeddings', + ).all() as Pick[]; + + const queryArray = new Float32Array(queryEmbedding); + + const scored: VectorSearchResult[] = rows.map((row) => { + const stored = new Float32Array( + row.embedding.buffer, + row.embedding.byteOffset, + row.embedding.byteLength / Float32Array.BYTES_PER_ELEMENT, + ); + + return { + namespace: row.namespace, + chunkText: row.chunk_text, + startLine: row.start_line, + endLine: row.end_line, + score: cosineSimilarity(queryArray, stored), + }; + }); + + // Sort by descending score and return top K + scored.sort((a, b) => b.score - a.score); + return scored.slice(0, topK); + } + + /** + * Get total number of stored embeddings. + */ + count(): number { + const row = this._db.prepare('SELECT COUNT(*) as cnt FROM embeddings').get() as { cnt: number }; + return row.cnt; + } + + /** + * Close the database connection. + */ + close(): void { + this._db.close(); + } +} diff --git a/src/tools/builtin/index.ts b/src/tools/builtin/index.ts index 144785d..aecfa33 100644 --- a/src/tools/builtin/index.ts +++ b/src/tools/builtin/index.ts @@ -22,6 +22,7 @@ export { createCronTools } from './cron.js'; import type { Tool } from '../types.js'; import type { MemoryStore } from '../../memory/store.js'; +import type { HybridSearch } from '../../memory/hybrid-search.js'; import type { WebSearchConfig } from './web-search.js'; import { shellExecTool } from './shell.js'; import { fileReadTool } from './file-read.js'; @@ -47,11 +48,11 @@ export const allBuiltinTools: Tool[] = [ ]; /** Create memory tools that require a MemoryStore instance. */ -export function createMemoryTools(store: MemoryStore): Tool[] { +export function createMemoryTools(store: MemoryStore, hybridSearch?: HybridSearch): Tool[] { return [ createMemoryReadTool(store), createMemoryWriteTool(store), - createMemorySearchTool(store), + createMemorySearchTool(store, hybridSearch), ]; } diff --git a/src/tools/builtin/memory-search.ts b/src/tools/builtin/memory-search.ts index 6df721a..8d727f2 100644 --- a/src/tools/builtin/memory-search.ts +++ b/src/tools/builtin/memory-search.ts @@ -1,5 +1,6 @@ import type { Tool, ToolResult } from '../types.js'; import type { MemoryStore } from '../../memory/store.js'; +import type { HybridSearch } from '../../memory/hybrid-search.js'; interface MemorySearchArgs { query: string; @@ -7,13 +8,15 @@ interface MemorySearchArgs { /** * Creates a memory.search tool bound to the given MemoryStore instance. - * Searches across all memory namespaces for matching content. + * When a HybridSearch instance is provided, uses vector + keyword search; + * otherwise falls back to keyword-only search. */ -export function createMemorySearchTool(store: MemoryStore): Tool { +export function createMemorySearchTool(store: MemoryStore, hybridSearch?: HybridSearch): Tool { return { name: 'memory.search', description: - 'Search across all memory files for a keyword or phrase. Returns matching lines with surrounding context from every namespace.', + 'Search across all memory files for a keyword or phrase. Returns matching lines with surrounding context from every namespace.' + + (hybridSearch ? ' Uses semantic vector search combined with keyword matching for better results.' : ''), inputSchema: { type: 'object', properties: { @@ -28,6 +31,33 @@ export function createMemorySearchTool(store: MemoryStore): Tool { const args = rawArgs as MemorySearchArgs; try { + // Try hybrid search first if available + if (hybridSearch) { + try { + const results = await hybridSearch.search(args.query); + + if (results.length === 0) { + return { success: true, output: `No matches found for "${args.query}".` }; + } + + const formatted = results.map((result) => { + const sourceLabel = result.source === 'both' ? 'keyword+vector' + : result.source === 'vector' ? 'vector' + : 'keyword'; + return `[${result.namespace}:${result.line}] (${sourceLabel}, score: ${result.score.toFixed(3)}) ${result.content}\n context: ${result.context}`; + }).join('\n\n'); + + return { + success: true, + output: `Found ${results.length} match${results.length === 1 ? '' : 'es'} for "${args.query}":\n\n${formatted}`, + }; + } catch (hybridError) { + // Fall back to keyword search on hybrid failure + console.error('Hybrid search failed, falling back to keyword search:', hybridError); + } + } + + // Keyword-only fallback const results = store.search(args.query); if (results.length === 0) {