Files
flynn/docs/plans/2026-02-07-tier2-implementation-plan.md
T
William Valentin b50c140d25 feat: add Docker support and inbound webhooks (Tier 2)
- Dockerfile: multi-stage build (node:22-alpine), better-sqlite3 native deps handled
- .dockerignore + docker-compose.yml for deployment
- FLYNN_DATA_DIR env var support in daemon, CLI, and TUI
- WebhookHandler: ChannelAdapter for HTTP POST /webhooks/:name
- Per-webhook HMAC auth, template rendering ({{body}}, {{json.field}})
- Config schema: automation.webhooks array with name/secret/message/output
- Gateway routes webhook requests before static files (bypasses gateway auth)
- 23 new tests for webhook functionality, 874 total tests passing
2026-02-07 14:36:05 -08:00

1261 lines
40 KiB
Markdown

# Tier 2 Implementation Plan
**Date**: 2026-02-07
**Features**: Inbound Webhooks, Dockerfile, Heartbeat, Vector Memory Search
**Status**: Planning
---
## 1. Inbound Webhooks
**Complexity**: Medium
### Overview
HTTP endpoint (`POST /webhooks/:name`) that triggers agent processing, following the same pattern as `CronScheduler`: implements `ChannelAdapter`, produces `InboundMessage`, routes responses to a configured output channel.
### Config Schema Additions
Add to `src/config/schema.ts`:
```typescript
const webhookSchema = z.object({
name: z.string().min(1, 'Webhook name is required'),
/** Optional per-webhook HMAC secret for payload signature verification. */
secret: z.string().optional(),
/** Message template — use {{body}} for raw body, {{json.fieldName}} for JSON fields. */
message: z.string().default('{{body}}'),
output: z.object({
channel: z.string().min(1),
peer: z.string().min(1),
}),
enabled: z.boolean().default(true),
});
```
Extend `automationSchema`:
```typescript
const automationSchema = z.object({
cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]),
}).default({});
```
Add type export:
```typescript
export type WebhookConfig = z.infer<typeof webhookSchema>;
```
### Files to Create
| File | Purpose |
|------|---------|
| `src/automation/webhooks.ts` | `WebhookHandler` class implementing `ChannelAdapter` |
| `src/automation/webhooks.test.ts` | Unit tests |
### Files to Modify
| File | Change |
|------|--------|
| `src/config/schema.ts` | Add `webhookSchema`, extend `automationSchema`, add type export |
| `src/automation/index.ts` | Export `WebhookHandler` |
| `src/gateway/server.ts` | Route `POST /webhooks/:name` to the `WebhookHandler` before static/404 |
| `src/daemon/index.ts` | Instantiate `WebhookHandler`, register with `ChannelRegistry`, wire to gateway |
### Key Class: `WebhookHandler`
```typescript
// src/automation/webhooks.ts
import { createHmac, timingSafeEqual } from 'crypto';
import type { IncomingMessage, ServerResponse } from 'http';
import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js';
import type { WebhookConfig } from '../config/schema.js';
/** Minimal interface for the parts of ChannelRegistry we need. */
interface ChannelLookup {
get(name: string): { send(peerId: string, message: OutboundMessage): Promise<void> } | undefined;
}
export class WebhookHandler implements ChannelAdapter {
readonly name = 'webhook';
private _status: ChannelStatus = 'disconnected';
private messageHandler?: (msg: InboundMessage) => void;
private webhooks: Map<string, WebhookConfig> = new Map();
constructor(
private readonly webhookConfigs: WebhookConfig[],
private readonly channelLookup: ChannelLookup,
) {
for (const wh of webhookConfigs) {
this.webhooks.set(wh.name, wh);
}
}
// ChannelAdapter interface methods...
get status(): ChannelStatus { return this._status; }
async connect(): Promise<void> { this._status = 'connected'; }
async disconnect(): Promise<void> { this._status = 'disconnected'; }
onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; }
async send(peerId: string, message: OutboundMessage): Promise<void> {
// Route to output channel (same pattern as CronScheduler.send)
const webhook = this.webhooks.get(peerId);
if (!webhook) return;
const outputAdapter = this.channelLookup.get(webhook.output.channel);
if (!outputAdapter) return;
await outputAdapter.send(webhook.output.peer, message);
}
/**
* Handle incoming HTTP request. Called by the gateway HTTP handler.
* Returns true if the request was handled (webhook found), false otherwise.
*/
async handleRequest(webhookName: string, req: IncomingMessage, res: ServerResponse): Promise<boolean> {
const webhook = this.webhooks.get(webhookName);
if (!webhook || !webhook.enabled) {
return false; // Not found, let gateway return 404
}
// Read body
const body = await readBody(req);
// Verify HMAC signature if secret configured
if (webhook.secret) {
const signature = req.headers['x-webhook-signature'] as string | undefined;
if (!verifyHmac(body, webhook.secret, signature)) {
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid signature' }));
return true;
}
}
// Build message text from template
const messageText = renderTemplate(webhook.message, body);
// Produce InboundMessage
const msg: InboundMessage = {
id: `webhook-${webhookName}-${Date.now()}`,
channel: 'webhook',
senderId: webhookName,
senderName: `webhook:${webhookName}`,
text: messageText,
timestamp: Date.now(),
metadata: { webhookName, contentType: req.headers['content-type'] },
};
this.messageHandler?.(msg);
res.writeHead(202, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ accepted: true, id: msg.id }));
return true;
}
getWebhookNames(): string[] {
return Array.from(this.webhooks.keys());
}
}
```
### Integration in Gateway HTTP Handler
In `GatewayServer.handleHttpRequest()`, add route matching **before** the static file serving:
```typescript
// Check for webhook routes: POST /webhooks/:name
if (req.method === 'POST' && req.url) {
const match = req.url.match(/^\/webhooks\/([a-zA-Z0-9_-]+)/);
if (match && this.webhookHandler) {
const handled = await this.webhookHandler.handleRequest(match[1], req, res);
if (handled) return;
}
}
```
The `webhookHandler` reference is passed into `GatewayServerConfig` as an optional field, similar to how `channelRegistry` is passed.
### Integration in Daemon Startup
In `src/daemon/index.ts`, after the cron scheduler block (~line 812-817):
```typescript
// Register webhook handler (if any webhooks configured)
let webhookHandler: WebhookHandler | undefined;
if (config.automation.webhooks.length > 0) {
webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry);
channelRegistry.register(webhookHandler);
console.log(`Registered ${config.automation.webhooks.length} webhook(s)`);
}
```
Then pass `webhookHandler` to the `GatewayServer` constructor.
### HMAC Verification
Standard approach: `X-Webhook-Signature: sha256=<hex>` header, verified via `crypto.createHmac('sha256', secret).update(body).digest('hex')` with timing-safe comparison.
Also support gateway token auth as an alternative (already handled by the gateway auth middleware for HTTP).
### Template Rendering
Simple template engine for the `message` field:
- `{{body}}` — raw request body string
- `{{json.fieldName}}` — extracted JSON field (dot notation for nested)
- Default: `{{body}}` if no template specified
### Test Plan
`src/automation/webhooks.test.ts`:
- Implements ChannelAdapter interface (name, status)
- `handleRequest()` produces InboundMessage with correct fields
- HMAC verification rejects invalid signatures
- HMAC verification passes valid signatures
- Returns false for unknown webhook names
- Returns false for disabled webhooks
- `send()` routes to output channel (same as cron test pattern)
- Template rendering with `{{body}}` and `{{json.field}}`
- `getWebhookNames()` lists all webhook names
### Config Example
```yaml
automation:
webhooks:
- name: github-push
secret: "whsec_..."
message: "GitHub push to {{json.repository.full_name}}: {{json.head_commit.message}}"
output:
channel: telegram
peer: "123456"
- name: alertmanager
message: "Alert: {{json.alerts.0.annotations.summary}}"
output:
channel: telegram
peer: "123456"
```
---
## 2. Dockerfile
**Complexity**: Simple
### Overview
Multi-stage Docker build for the Flynn daemon. No TypeScript source code changes required — this is purely build/deployment infrastructure.
### Files to Create
| File | Purpose |
|------|---------|
| `Dockerfile` | Multi-stage build |
| `.dockerignore` | Exclude dev artifacts |
| `docker-compose.yml` | Example deployment with volumes |
### Dockerfile Design
```dockerfile
# ── Stage 1: Build ──────────────────────────────────────────────
FROM node:22-alpine AS builder
# Install pnpm
RUN corepack enable && corepack prepare pnpm@latest --activate
WORKDIR /app
# Install dependencies first (layer caching)
COPY package.json pnpm-lock.yaml ./
RUN pnpm install --frozen-lockfile
# Copy source and build
COPY tsconfig.json ./
COPY src/ src/
# better-sqlite3 needs build tools for native compilation
RUN apk add --no-cache python3 make g++ && \
pnpm build
# ── Stage 2: Runtime ────────────────────────────────────────────
FROM node:22-alpine AS runtime
RUN corepack enable && corepack prepare pnpm@latest --activate
WORKDIR /app
# Copy package files and install production deps only
COPY package.json pnpm-lock.yaml ./
RUN apk add --no-cache python3 make g++ && \
pnpm install --frozen-lockfile --prod && \
apk del python3 make g++
# Copy compiled output from builder
COPY --from=builder /app/dist dist/
# Copy gateway UI assets (if they exist)
COPY --from=builder /app/src/gateway/ui dist/gateway/ui/
# Copy prompt templates and bundled skills
COPY prompts/ prompts/
COPY skills/ skills/
# Create data directories
RUN mkdir -p /data/memory /data/sessions /config
# Runtime config
ENV NODE_ENV=production
ENV FLYNN_CONFIG=/config/config.yaml
ENV FLYNN_DATA_DIR=/data
EXPOSE 18800
# Health check
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD wget -qO- http://localhost:18800/ || exit 1
ENTRYPOINT ["node", "dist/cli/index.js"]
CMD ["start"]
```
### Key Considerations
**better-sqlite3 native deps**: This is the trickiest part. `better-sqlite3` requires native compilation. Options:
1. Install build tools (`python3`, `make`, `g++`) in the runtime stage, compile, then remove — this is the safest approach and keeps the image functional.
2. Use `--build-from-source` flag and carry the built `.node` binary from the builder stage. This requires matching the Node.js version and Alpine version exactly between stages (which we do since both use `node:22-alpine`).
3. Alternatively, use `better-sqlite3`'s prebuilt binaries if they ship for Alpine/musl — check at build time.
**Recommended approach**: Option 2 — copy `node_modules` from builder stage since both stages use identical base image:
```dockerfile
# In runtime stage, instead of separate pnpm install:
COPY --from=builder /app/node_modules node_modules/
```
This eliminates the need for build tools in the runtime image entirely and produces a smaller image.
### .dockerignore
```
node_modules/
dist/
.git/
.gitignore
*.log
.env
.env.*
.worktrees/
docs/
src/**/*.test.ts
vitest.config.*
eslint.config.*
tsconfig.json
*.md
!README.md
```
### docker-compose.yml
```yaml
version: '3.8'
services:
flynn:
build: .
image: flynn:latest
container_name: flynn
restart: unless-stopped
ports:
- "18800:18800"
volumes:
- ./config.yaml:/config/config.yaml:ro
- flynn-data:/data
- flynn-memory:/data/memory
environment:
- FLYNN_CONFIG=/config/config.yaml
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
# Add other provider keys as needed
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:18800/"]
interval: 30s
timeout: 5s
retries: 3
volumes:
flynn-data:
flynn-memory:
```
### Daemon Changes for Docker
The daemon currently defaults data dir to `~/.local/share/flynn`. For Docker, we need to respect an environment variable override. Check if `src/daemon/index.ts` already handles this.
Looking at line 525: `const dataDir = resolve(homedir(), '.local/share/flynn');`
**Modification needed** in `src/daemon/index.ts`:
```typescript
// Support FLYNN_DATA_DIR env var for Docker deployments
const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn');
```
Similarly, config loading in `src/config/loader.ts` should respect `FLYNN_CONFIG` env var if not already.
### Files to Modify
| File | Change |
|------|--------|
| `src/daemon/index.ts` | Support `FLYNN_DATA_DIR` environment variable |
| `src/config/loader.ts` | Support `FLYNN_CONFIG` environment variable (verify) |
### Test Plan
No unit tests needed (Docker build is tested by building). Integration test:
- `docker build -t flynn:test .`
- `docker run --rm flynn:test --help` (verify CLI works)
- `docker compose up -d` with test config (verify startup)
---
## 3. Heartbeat
**Complexity**: Medium
### Overview
Periodic self-check system that validates daemon health and optionally notifies a configured channel on failure. Fits naturally as an extension to the `src/automation/` module, alongside cron.
### Config Schema Additions
Add to `src/config/schema.ts`:
```typescript
const heartbeatCheckSchema = z.enum([
'gateway', // Is the HTTP/WS server responsive?
'model', // Can we reach the default model provider?
'channels', // Are all channel adapters connected?
'memory', // Is the memory store readable/writable?
'disk', // Is disk space above threshold?
]);
const heartbeatSchema = z.object({
enabled: z.boolean().default(false),
/** Check interval (e.g. '60s', '5m', '1h'). Default: '5m'. */
interval: z.string().default('5m'),
/** Which checks to run. Default: all. */
checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']),
/** Optional notification on failure. */
notify: z.object({
channel: z.string().min(1),
peer: z.string().min(1),
}).optional(),
/** Number of consecutive failures before notifying. Default: 2. */
failure_threshold: z.number().min(1).max(10).default(2),
/** Disk space warning threshold in MB. Default: 100. */
disk_threshold_mb: z.number().min(10).default(100),
}).default({});
```
Extend `automationSchema`:
```typescript
const automationSchema = z.object({
cron: z.array(cronJobSchema).default([]),
webhooks: z.array(webhookSchema).default([]),
heartbeat: heartbeatSchema,
}).default({});
```
Add type export:
```typescript
export type HeartbeatConfig = z.infer<typeof heartbeatSchema>;
```
### Files to Create
| File | Purpose |
|------|---------|
| `src/automation/heartbeat.ts` | `HeartbeatMonitor` class |
| `src/automation/heartbeat.test.ts` | Unit tests |
### Files to Modify
| File | Change |
|------|--------|
| `src/config/schema.ts` | Add `heartbeatSchema`, extend `automationSchema`, add type export |
| `src/automation/index.ts` | Export `HeartbeatMonitor` |
| `src/daemon/index.ts` | Instantiate `HeartbeatMonitor`, register shutdown handler |
### Key Class: `HeartbeatMonitor`
```typescript
// src/automation/heartbeat.ts
import { statfsSync } from 'fs';
import type { HeartbeatConfig } from '../config/schema.js';
import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js';
import type { ModelRouter } from '../models/router.js';
export interface HeartbeatCheckResult {
name: string;
healthy: boolean;
message: string;
durationMs: number;
}
export interface HeartbeatResult {
timestamp: number;
healthy: boolean;
checks: HeartbeatCheckResult[];
}
interface HeartbeatDeps {
config: HeartbeatConfig;
/** Get the gateway HTTP server for health probing. */
getGatewayPort?: () => number;
/** Model router for provider health check. */
modelRouter?: ModelRouter;
/** Channel registry for adapter status checks. */
getChannels?: () => Array<{ name: string; status: ChannelStatus }>;
/** Memory store dir for read/write check. */
memoryDir?: string;
/** Data dir for disk space check. */
dataDir?: string;
/** Send notification to a channel. */
notify?: (channel: string, peer: string, message: OutboundMessage) => Promise<void>;
}
export class HeartbeatMonitor {
private timer?: ReturnType<typeof setInterval>;
private consecutiveFailures = 0;
private lastResult?: HeartbeatResult;
constructor(private readonly deps: HeartbeatDeps) {}
start(): void {
if (!this.deps.config.enabled) return;
const intervalMs = parseInterval(this.deps.config.interval);
this.timer = setInterval(() => this.runChecks(), intervalMs);
// Run first check after a short delay (let services finish starting)
setTimeout(() => this.runChecks(), 5000);
}
stop(): void {
if (this.timer) {
clearInterval(this.timer);
this.timer = undefined;
}
}
/** Run all configured checks and optionally notify on failure. */
async runChecks(): Promise<HeartbeatResult> {
const checks: HeartbeatCheckResult[] = [];
const enabledChecks = this.deps.config.checks;
for (const checkName of enabledChecks) {
const start = Date.now();
try {
const result = await this.runSingleCheck(checkName);
checks.push({ ...result, durationMs: Date.now() - start });
} catch (error) {
checks.push({
name: checkName,
healthy: false,
message: error instanceof Error ? error.message : String(error),
durationMs: Date.now() - start,
});
}
}
const healthy = checks.every(c => c.healthy);
const result: HeartbeatResult = { timestamp: Date.now(), healthy, checks };
this.lastResult = result;
if (!healthy) {
this.consecutiveFailures++;
if (this.consecutiveFailures >= this.deps.config.failure_threshold) {
await this.sendNotification(result);
}
} else {
// Reset on recovery
if (this.consecutiveFailures >= this.deps.config.failure_threshold) {
await this.sendRecoveryNotification(result);
}
this.consecutiveFailures = 0;
}
return result;
}
getLastResult(): HeartbeatResult | undefined {
return this.lastResult;
}
private async runSingleCheck(name: string): Promise<Omit<HeartbeatCheckResult, 'durationMs'>> {
switch (name) {
case 'gateway': return this.checkGateway();
case 'model': return this.checkModel();
case 'channels': return this.checkChannels();
case 'memory': return this.checkMemory();
case 'disk': return this.checkDisk();
default: return { name, healthy: false, message: `Unknown check: ${name}` };
}
}
// Individual check implementations...
}
```
### Individual Check Implementations
1. **Gateway check**: HTTP GET to `http://localhost:<port>/` — expects non-error response.
2. **Model check**: Call `modelRouter.healthCheck()` or attempt a minimal completion with max_tokens=1. Note: `ModelRouter` doesn't currently have a health check method, so we'd add a lightweight one that just verifies the API key/connection. Alternatively, use a simpler approach: verify the model client can be instantiated without errors, or do a `models.list()` API call for providers that support it.
3. **Channels check**: Iterate registered channels, verify all have `status === 'connected'`.
4. **Memory check**: Attempt `store.read('global')` and verify no exceptions.
5. **Disk check**: Use `statfsSync()` on the data directory, calculate free space in MB, compare to threshold.
### Notification Format
```
⚠️ Flynn Heartbeat Failure
Failed checks:
- gateway: Connection refused on port 18800
- model: API key invalid (401)
Consecutive failures: 3/2 (threshold)
```
And on recovery:
```
✅ Flynn Heartbeat Recovery
All checks passing after 3 consecutive failures.
```
### Integration in Daemon Startup
In `src/daemon/index.ts`, after all services are started (after `gateway.start()`):
```typescript
// Initialize heartbeat monitor (if enabled)
let heartbeatMonitor: HeartbeatMonitor | undefined;
if (config.automation.heartbeat?.enabled) {
heartbeatMonitor = new HeartbeatMonitor({
config: config.automation.heartbeat,
getGatewayPort: () => config.server.port,
modelRouter,
getChannels: () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })),
memoryDir: memoryDir,
dataDir,
notify: config.automation.heartbeat.notify
? async (channel, peer, message) => {
const adapter = channelRegistry.get(channel);
if (adapter) await adapter.send(peer, message);
}
: undefined,
});
heartbeatMonitor.start();
console.log(`Heartbeat monitor started (interval: ${config.automation.heartbeat.interval})`);
lifecycle.onShutdown(async () => {
heartbeatMonitor!.stop();
console.log('Heartbeat monitor stopped');
});
}
```
### Interval Parsing
Reuse or create a simple parser (similar to `parseDuration` in `src/session/index.ts`):
```typescript
function parseInterval(str: string): number {
const match = str.match(/^(\d+)(s|m|h)$/);
if (!match) return 300_000; // default 5 minutes
const [, value, unit] = match;
const multipliers: Record<string, number> = { s: 1000, m: 60_000, h: 3_600_000 };
return parseInt(value) * multipliers[unit];
}
```
### Exposing via Gateway (Optional Enhancement)
Add a `system.heartbeat` handler to the gateway WS handlers that returns `heartbeatMonitor.getLastResult()`. This lets the web UI or CLI display health status. This is a nice-to-have and can be done as a follow-up.
### Test Plan
`src/automation/heartbeat.test.ts`:
- `start()` does nothing when `enabled: false`
- `runChecks()` runs all configured checks
- `runChecks()` returns healthy=true when all checks pass
- `runChecks()` returns healthy=false when any check fails
- Notification sent after `failure_threshold` consecutive failures
- Recovery notification sent when checks pass after failures
- `stop()` clears the timer
- `getLastResult()` returns most recent result
- Individual check tests:
- `checkChannels()` detects disconnected adapters
- `checkDisk()` detects low disk space
- `checkMemory()` handles missing memory dir
- `parseInterval()` correctly parses '60s', '5m', '1h'
### Config Example
```yaml
automation:
heartbeat:
enabled: true
interval: "5m"
checks: [gateway, model, channels, memory, disk]
notify:
channel: telegram
peer: "123456"
failure_threshold: 2
disk_threshold_mb: 100
```
---
## 4. Vector Memory Search
**Complexity**: Complex
### Overview
Add embedding-based semantic search to the existing `MemoryStore`, enabling hybrid search (vector similarity + keyword). Provider-agnostic embedding generation with graceful fallback to keyword-only search when embeddings are unavailable.
### Architecture Decision: Storage Backend
**Options considered**:
1. **SQLite with manual vector math** — Store embeddings as BLOBs in the existing `better-sqlite3` database, compute cosine similarity in JS. Simple, no new deps.
2. **SQLite with sqlite-vec extension** — Native vector similarity in SQL. Requires native extension compilation, complicates Docker build.
3. **Separate HNSW index (hnswlib-node)** — Purpose-built ANN index. Fast for large collections but adds a native dep.
4. **In-memory float arrays** — Store embeddings as JSON files alongside markdown, load into memory, compute similarity in JS.
**Recommendation**: Option 1 (SQLite with manual vector math). Rationale:
- `better-sqlite3` is already a dependency
- Memory collections are small enough (hundreds to low thousands of chunks) that brute-force cosine similarity in JS is fast enough (<10ms for 1000 vectors)
- No new native dependencies
- Embeddings stored in existing data directory pattern
- Clean upgrade path to sqlite-vec if performance becomes an issue
### Config Schema Additions
Extend `memorySchema` in `src/config/schema.ts`:
```typescript
const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']);
const embeddingSchema = z.object({
enabled: z.boolean().default(false),
provider: embeddingProviderSchema.default('openai'),
model: z.string().default('text-embedding-3-small'),
/** Provider endpoint (required for ollama/llamacpp). */
endpoint: z.string().optional(),
/** API key (required for openai/gemini). */
api_key: z.string().optional(),
/** Embedding vector dimensions. Auto-detected from model if not set. */
dimensions: z.number().optional(),
/** Max tokens per chunk for embedding. Default: 512. */
chunk_size: z.number().min(64).max(8192).default(512),
/** Overlap between chunks in tokens. Default: 50. */
chunk_overlap: z.number().min(0).max(1024).default(50),
/** Number of top results to return from vector search. Default: 5. */
top_k: z.number().min(1).max(50).default(5),
/** Weight for vector results vs keyword results in hybrid scoring (0.0 = keyword only, 1.0 = vector only). Default: 0.7. */
hybrid_weight: z.number().min(0).max(1).default(0.7),
}).default({});
const memorySchema = z.object({
enabled: z.boolean().default(true),
dir: z.string().optional(),
auto_extract: z.boolean().default(true),
max_context_tokens: z.number().min(100).max(10000).default(2000),
embedding: embeddingSchema,
}).default({});
```
Add type exports:
```typescript
export type EmbeddingConfig = z.infer<typeof embeddingSchema>;
```
### Files to Create
| File | Purpose |
|------|---------|
| `src/memory/embeddings.ts` | `EmbeddingProvider` interface + provider implementations |
| `src/memory/embeddings.test.ts` | Unit tests for embedding providers |
| `src/memory/vector-store.ts` | `VectorStore` class — SQLite-backed vector storage |
| `src/memory/vector-store.test.ts` | Unit tests for vector store |
| `src/memory/chunker.ts` | Text chunking utility |
| `src/memory/chunker.test.ts` | Unit tests for chunker |
| `src/memory/hybrid-search.ts` | `HybridSearch` class — combines vector + keyword |
| `src/memory/hybrid-search.test.ts` | Unit tests for hybrid search |
### Files to Modify
| File | Change |
|------|--------|
| `src/config/schema.ts` | Add `embeddingSchema`, extend `memorySchema`, add type export |
| `src/memory/store.ts` | Add `searchHybrid()` method, accept optional `VectorStore` in constructor |
| `src/memory/index.ts` | Export new modules |
| `src/tools/builtin/memory-search.ts` | Use `searchHybrid()` when available, fall back to `search()` |
| `src/daemon/index.ts` | Initialize `EmbeddingProvider`, `VectorStore`, wire into `MemoryStore` |
### Key Interfaces and Classes
#### EmbeddingProvider
```typescript
// src/memory/embeddings.ts
export interface EmbeddingProvider {
/** Generate embeddings for one or more text chunks. */
embed(texts: string[]): Promise<number[][]>;
/** Embedding vector dimensions. */
readonly dimensions: number;
}
export class OpenAIEmbeddingProvider implements EmbeddingProvider {
// Uses openai SDK (already a dep) with embeddings.create()
readonly dimensions: number;
constructor(config: { apiKey?: string; model: string; dimensions?: number }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export class GeminiEmbeddingProvider implements EmbeddingProvider {
// Uses @google/generative-ai SDK (already a dep) with embedContent()
readonly dimensions: number;
constructor(config: { apiKey?: string; model: string }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export class OllamaEmbeddingProvider implements EmbeddingProvider {
// Uses ollama SDK (already a dep) with embeddings()
readonly dimensions: number;
constructor(config: { host?: string; model: string }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export class LlamaCppEmbeddingProvider implements EmbeddingProvider {
// HTTP POST to /embedding endpoint
readonly dimensions: number;
constructor(config: { endpoint: string; model?: string }) { ... }
async embed(texts: string[]): Promise<number[][]> { ... }
}
export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvider { ... }
```
#### Text Chunker
```typescript
// src/memory/chunker.ts
export interface Chunk {
/** The text content of this chunk. */
text: string;
/** Source namespace. */
namespace: string;
/** Starting line number in the source file (1-based). */
startLine: number;
/** Ending line number in the source file (1-based). */
endLine: number;
}
/**
* Split memory content into overlapping chunks suitable for embedding.
*
* Strategy: split on paragraph boundaries (double newline), then merge
* small paragraphs to reach target chunk size, with configurable overlap.
*/
export function chunkText(
content: string,
namespace: string,
options: { chunkSize: number; chunkOverlap: number },
): Chunk[] { ... }
```
#### VectorStore
```typescript
// src/memory/vector-store.ts
import Database from 'better-sqlite3';
export interface VectorSearchResult {
namespace: string;
chunkText: string;
startLine: number;
endLine: number;
similarity: number;
}
export class VectorStore {
private db: Database.Database;
constructor(dbPath: string) {
this.db = new Database(dbPath);
this.initSchema();
}
private initSchema(): void {
this.db.exec(`
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
namespace TEXT NOT NULL,
chunk_text TEXT NOT NULL,
start_line INTEGER NOT NULL,
end_line INTEGER NOT NULL,
embedding BLOB NOT NULL,
created_at INTEGER NOT NULL DEFAULT (unixepoch()),
content_hash TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_embeddings_namespace ON embeddings(namespace);
CREATE INDEX IF NOT EXISTS idx_embeddings_hash ON embeddings(content_hash);
`);
}
/** Store embedding vectors for a set of chunks. */
upsertChunks(chunks: Array<{ namespace: string; text: string; startLine: number; endLine: number; embedding: number[]; contentHash: string }>): void { ... }
/** Delete all embeddings for a namespace (for re-indexing). */
deleteNamespace(namespace: string): void { ... }
/** Check if a namespace+hash combination already exists (skip re-embedding). */
hasContentHash(namespace: string, hash: string): boolean { ... }
/**
* Search for similar vectors using cosine similarity.
* Computed in JS — fine for <10k vectors.
*/
search(queryEmbedding: number[], topK: number): VectorSearchResult[] {
const allRows = this.db.prepare('SELECT * FROM embeddings').all();
// Compute cosine similarity for each, sort, take topK
...
}
close(): void {
this.db.close();
}
}
```
The embedding is stored as a BLOB (Float32Array buffer). Cosine similarity:
```typescript
function cosineSimilarity(a: number[], b: number[]): number {
let dotProduct = 0;
let normA = 0;
let normB = 0;
for (let i = 0; i < a.length; i++) {
dotProduct += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}
```
#### HybridSearch
```typescript
// src/memory/hybrid-search.ts
import type { SearchResult } from './store.js';
import type { VectorSearchResult, VectorStore } from './vector-store.js';
import type { EmbeddingProvider } from './embeddings.js';
import type { MemoryStore } from './store.js';
export interface HybridSearchResult {
namespace: string;
content: string;
context: string;
line: number;
score: number;
source: 'keyword' | 'vector' | 'both';
}
export class HybridSearch {
constructor(
private readonly memoryStore: MemoryStore,
private readonly vectorStore: VectorStore,
private readonly embeddingProvider: EmbeddingProvider,
private readonly hybridWeight: number = 0.7,
) {}
/**
* Search using both keyword and vector similarity, merge and rank results.
*
* Scoring: final_score = (hybrid_weight * vector_score) + ((1 - hybrid_weight) * keyword_score)
* Keyword score: 1.0 for exact match (normalized by result count)
* Vector score: cosine similarity (0.0 - 1.0)
*/
async search(query: string, topK: number = 5): Promise<HybridSearchResult[]> {
// Run both searches in parallel
const [keywordResults, vectorResults] = await Promise.all([
Promise.resolve(this.memoryStore.search(query)),
this.vectorSearch(query, topK * 2), // fetch more for merge
]);
return this.mergeResults(keywordResults, vectorResults, topK);
}
private async vectorSearch(query: string, topK: number): Promise<VectorSearchResult[]> {
const [queryEmbedding] = await this.embeddingProvider.embed([query]);
return this.vectorStore.search(queryEmbedding, topK);
}
private mergeResults(
keyword: SearchResult[],
vector: VectorSearchResult[],
topK: number,
): HybridSearchResult[] {
// Deduplicate by namespace+line, compute hybrid score, sort, take topK
...
}
}
```
### Indexing Pipeline
When does indexing happen? Two approaches:
1. **On write**: When `MemoryStore.write()` is called, also chunk + embed + store. This keeps the index always up to date but adds latency to writes.
2. **Background indexer**: A separate timer that scans for unindexed content (via content hash comparison) and indexes it periodically.
**Recommendation**: Background indexer with write-triggered hint. On `MemoryStore.write()`, mark the namespace as "dirty". The background indexer (runs every 30s or so) processes only dirty namespaces. This keeps writes fast while ensuring the index stays reasonably fresh.
```typescript
// Addition to MemoryStore:
private dirtyNamespaces: Set<string> = new Set();
write(namespace: string, content: string, mode: 'append' | 'replace'): void {
// ... existing write logic ...
this.dirtyNamespaces.add(namespace);
}
getDirtyNamespaces(): string[] {
const ns = Array.from(this.dirtyNamespaces);
this.dirtyNamespaces.clear();
return ns;
}
```
Indexer runs in daemon:
```typescript
// In daemon startup, after memory store init:
if (memoryStore && config.memory.embedding.enabled) {
const embeddingProvider = createEmbeddingProvider(config.memory.embedding);
const vectorStore = new VectorStore(resolve(dataDir, 'vectors.db'));
const hybridSearch = new HybridSearch(memoryStore, vectorStore, embeddingProvider, config.memory.embedding.hybrid_weight);
// Background indexer
const indexInterval = setInterval(async () => {
const dirty = memoryStore.getDirtyNamespaces();
for (const ns of dirty) {
const content = memoryStore.read(ns);
const contentHash = createHash('md5').update(content).digest('hex');
if (vectorStore.hasContentHash(ns, contentHash)) continue;
const chunks = chunkText(content, ns, {
chunkSize: config.memory.embedding.chunk_size,
chunkOverlap: config.memory.embedding.chunk_overlap,
});
const embeddings = await embeddingProvider.embed(chunks.map(c => c.text));
vectorStore.upsertChunks(chunks.map((c, i) => ({
namespace: c.namespace,
text: c.text,
startLine: c.startLine,
endLine: c.endLine,
embedding: embeddings[i],
contentHash,
})));
}
}, 30_000);
lifecycle.onShutdown(async () => {
clearInterval(indexInterval);
vectorStore.close();
});
}
```
### Integration with memory.search Tool
Modify `src/tools/builtin/memory-search.ts` to accept an optional `HybridSearch` instance:
```typescript
export function createMemorySearchTool(store: MemoryStore, hybridSearch?: HybridSearch): Tool {
return {
name: 'memory.search',
description: 'Search across all memory files for a keyword or phrase. Uses semantic search when available, with keyword fallback.',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'The search query (supports semantic meaning when embeddings are enabled)',
},
},
required: ['query'],
},
execute: async (rawArgs: unknown): Promise<ToolResult> => {
const args = rawArgs as { query: string };
try {
if (hybridSearch) {
const results = await hybridSearch.search(args.query);
// Format hybrid results...
} else {
// Existing keyword-only path (unchanged)
const results = store.search(args.query);
// ...existing formatting...
}
} catch (error) {
// On vector search failure, fall back to keyword
const results = store.search(args.query);
// ...format with note about fallback...
}
},
};
}
```
Also update `createMemoryTools()` in `src/tools/builtin/index.ts` to accept the optional `HybridSearch`:
```typescript
export function createMemoryTools(store: MemoryStore, hybridSearch?: HybridSearch): Tool[] {
return [
createMemoryReadTool(store),
createMemoryWriteTool(store),
createMemorySearchTool(store, hybridSearch),
];
}
```
### Graceful Fallback Chain
1. If `embedding.enabled: false` (default) — pure keyword search, no embedding deps loaded
2. If `embedding.enabled: true` but provider unreachable — log warning, fall back to keyword search
3. If embedding succeeds but vector DB corrupted — catch error, fall back to keyword search
4. If embedding succeeds — hybrid search with configurable weight
### Test Plan
**`src/memory/chunker.test.ts`**:
- Splits text into chunks of configured size
- Respects paragraph boundaries
- Generates correct overlap between chunks
- Handles empty content
- Preserves line number tracking
- Single paragraph smaller than chunk size returns one chunk
**`src/memory/embeddings.test.ts`**:
- `createEmbeddingProvider()` creates correct provider for each type
- OpenAI provider calls correct API endpoint (mocked)
- Ollama provider calls correct endpoint (mocked)
- Provider returns correct dimensionality
- Handles API errors gracefully
**`src/memory/vector-store.test.ts`**:
- Creates SQLite database with correct schema
- `upsertChunks()` stores and retrieves embeddings
- `search()` returns results sorted by similarity
- `deleteNamespace()` removes all embeddings for a namespace
- `hasContentHash()` detects existing content
- `close()` cleans up database connection
- Cosine similarity computation is correct
**`src/memory/hybrid-search.test.ts`**:
- Returns keyword results when vector search unavailable
- Merges keyword and vector results with correct weighting
- Deduplicates results from both sources
- Respects `topK` limit
- Handles empty results from either source
- `source` field correctly indicates 'keyword', 'vector', or 'both'
### Config Example
```yaml
memory:
enabled: true
auto_extract: true
max_context_tokens: 2000
embedding:
enabled: true
provider: openai
model: text-embedding-3-small
api_key: "${OPENAI_API_KEY}"
chunk_size: 512
chunk_overlap: 50
top_k: 5
hybrid_weight: 0.7
```
---
## Implementation Order
Recommended sequence (dependencies and risk):
| Order | Feature | Rationale |
|-------|---------|-----------|
| 1 | **Dockerfile** | No code changes, unblocks deployment testing for all other features |
| 2 | **Inbound Webhooks** | Clean pattern (mirrors cron), moderate scope, high utility |
| 3 | **Heartbeat** | Depends on running services for meaningful checks, benefits from Docker for testing |
| 4 | **Vector Memory Search** | Most complex, most files, most risk — do last when other features are stable |
### Estimated Effort
| Feature | New Files | Modified Files | Estimated Lines | Complexity |
|---------|-----------|----------------|-----------------|------------|
| Inbound Webhooks | 2 | 4 | ~300 | Medium |
| Dockerfile | 3 | 1-2 | ~100 | Simple |
| Heartbeat | 2 | 3 | ~350 | Medium |
| Vector Memory Search | 8 | 5 | ~800 | Complex |
| **Total** | **15** | **12-13** | **~1550** | |
---
## Cross-Cutting Concerns
### Config Schema Validation
All new schema additions follow the existing pattern:
- Use `.default({})` for optional sections so the config is valid without them
- Export `z.infer<>` type aliases
- Existing tests in `src/config/schema.test.ts` should still pass (no breaking changes)
### Daemon Startup Order
New services slot into the existing startup sequence:
1. Memory store (existing)
2. **Vector store + embedding provider** (new, after memory store)
3. Session store (existing)
4. Tool registry (existing)
5. **Memory tools with hybrid search** (modified)
6. Gateway server (existing)
7. Channel registry (existing)
8. **Webhook handler** (new, registered with channel registry)
9. Cron scheduler (existing)
10. Start all channels (existing)
11. Start gateway (existing)
12. **Heartbeat monitor** (new, after all services started)
### Shutdown Order (LIFO)
Added to lifecycle in reverse:
1. Heartbeat monitor stop
2. Gateway stop (existing)
3. Channel adapters stop (existing, includes webhook handler)
4. Vector store close
5. Process manager stop (existing)
6. Session store close (existing)