diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..85c00c0 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,35 @@ +node_modules +dist +.git +.gitignore +.worktrees + +# Logs +*.log + +# Environment files +.env +.env.* +!.env.example + +# Documentation (keep README) +docs/ +*.md +!README.md +!SOUL.md + +# Test files +src/**/*.test.ts +vitest.config.* + +# Lint config +eslint.config.* + +# Editor / IDE +.vscode +.idea +*.swp +*.swo + +# Claude config +.claude diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2a444cf --- /dev/null +++ b/Dockerfile @@ -0,0 +1,72 @@ +# ── Builder stage ────────────────────────────────────────────── +FROM node:22-alpine AS builder + +# Enable corepack for pnpm +RUN corepack enable && corepack prepare pnpm@latest --activate + +# Install native build tools (needed for better-sqlite3) +RUN apk add --no-cache python3 make g++ + +WORKDIR /app + +# Copy dependency manifests first (layer caching) +COPY package.json pnpm-lock.yaml pnpm-workspace.yaml ./ + +# Install dependencies (frozen lockfile for reproducibility) +RUN pnpm install --frozen-lockfile + +# Copy source and config +COPY tsconfig.json ./ +COPY src/ src/ +COPY config/ config/ + +# Build TypeScript +RUN pnpm build + + +# ── Runtime stage ───────────────────────────────────────────── +FROM node:22-alpine + +# Label +LABEL org.opencontainers.image.title="Flynn" \ + org.opencontainers.image.description="Self-hosted personal AI agent" \ + org.opencontainers.image.source="https://github.com/will666/flynn" + +WORKDIR /app + +# Copy node_modules from builder (includes compiled native deps like better-sqlite3) +COPY --from=builder /app/node_modules/ node_modules/ + +# Copy compiled output +COPY --from=builder /app/dist/ dist/ + +# Copy gateway UI static files into dist/gateway/ui so import.meta.dirname +# resolution from dist/daemon/index.js (../gateway/ui) resolves correctly +COPY --from=builder /app/src/gateway/ui/ dist/gateway/ui/ + +# Copy default config +COPY --from=builder /app/config/ config/ + +# Copy package.json (needed for bin resolution / metadata) +COPY --from=builder /app/package.json ./ + +# Copy SOUL.md if it exists (prompt template loaded at runtime) +COPY --from=builder /app/SOUL.md ./ + +# Create data directories +RUN mkdir -p /data/memory /data/sessions /config + +# Environment +ENV NODE_ENV=production \ + FLYNN_CONFIG=/config/config.yaml \ + FLYNN_DATA_DIR=/data + +# Gateway port +EXPOSE 18800 + +# Health check — verify the gateway is responding +HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ + CMD wget -qO- http://localhost:18800/ || exit 1 + +ENTRYPOINT ["node", "dist/cli/index.js"] +CMD ["start"] diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d98b958 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,34 @@ +services: + flynn: + build: . + container_name: flynn + restart: unless-stopped + ports: + - "18800:18800" + volumes: + # Persistent data (sessions DB, memory store) + - flynn-data:/data + # Mount your config file + - ./config/default.yaml:/config/config.yaml:ro + environment: + # Required: at least one model provider API key + - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:-} + # Optional: additional provider keys + - OPENAI_API_KEY=${OPENAI_API_KEY:-} + - OPENROUTER_API_KEY=${OPENROUTER_API_KEY:-} + - GOOGLE_API_KEY=${GOOGLE_API_KEY:-} + # Optional: Telegram integration + - FLYNN_TELEGRAM_TOKEN=${FLYNN_TELEGRAM_TOKEN:-} + # Optional: Discord integration + - DISCORD_BOT_TOKEN=${DISCORD_BOT_TOKEN:-} + # Optional: Gateway auth token + - FLYNN_SERVER_TOKEN=${FLYNN_SERVER_TOKEN:-} + healthcheck: + test: ["CMD", "wget", "-qO-", "http://localhost:18800/"] + interval: 30s + timeout: 5s + start_period: 15s + retries: 3 + +volumes: + flynn-data: diff --git a/docs/plans/2026-02-07-tier2-implementation-plan.md b/docs/plans/2026-02-07-tier2-implementation-plan.md new file mode 100644 index 0000000..f059326 --- /dev/null +++ b/docs/plans/2026-02-07-tier2-implementation-plan.md @@ -0,0 +1,1260 @@ +# Tier 2 Implementation Plan + +**Date**: 2026-02-07 +**Features**: Inbound Webhooks, Dockerfile, Heartbeat, Vector Memory Search +**Status**: Planning + +--- + +## 1. Inbound Webhooks + +**Complexity**: Medium + +### Overview + +HTTP endpoint (`POST /webhooks/:name`) that triggers agent processing, following the same pattern as `CronScheduler`: implements `ChannelAdapter`, produces `InboundMessage`, routes responses to a configured output channel. + +### Config Schema Additions + +Add to `src/config/schema.ts`: + +```typescript +const webhookSchema = z.object({ + name: z.string().min(1, 'Webhook name is required'), + /** Optional per-webhook HMAC secret for payload signature verification. */ + secret: z.string().optional(), + /** Message template — use {{body}} for raw body, {{json.fieldName}} for JSON fields. */ + message: z.string().default('{{body}}'), + output: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }), + enabled: z.boolean().default(true), +}); +``` + +Extend `automationSchema`: + +```typescript +const automationSchema = z.object({ + cron: z.array(cronJobSchema).default([]), + webhooks: z.array(webhookSchema).default([]), +}).default({}); +``` + +Add type export: + +```typescript +export type WebhookConfig = z.infer; +``` + +### Files to Create + +| File | Purpose | +|------|---------| +| `src/automation/webhooks.ts` | `WebhookHandler` class implementing `ChannelAdapter` | +| `src/automation/webhooks.test.ts` | Unit tests | + +### Files to Modify + +| File | Change | +|------|--------| +| `src/config/schema.ts` | Add `webhookSchema`, extend `automationSchema`, add type export | +| `src/automation/index.ts` | Export `WebhookHandler` | +| `src/gateway/server.ts` | Route `POST /webhooks/:name` to the `WebhookHandler` before static/404 | +| `src/daemon/index.ts` | Instantiate `WebhookHandler`, register with `ChannelRegistry`, wire to gateway | + +### Key Class: `WebhookHandler` + +```typescript +// src/automation/webhooks.ts + +import { createHmac, timingSafeEqual } from 'crypto'; +import type { IncomingMessage, ServerResponse } from 'http'; +import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; +import type { WebhookConfig } from '../config/schema.js'; + +/** Minimal interface for the parts of ChannelRegistry we need. */ +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +export class WebhookHandler implements ChannelAdapter { + readonly name = 'webhook'; + private _status: ChannelStatus = 'disconnected'; + private messageHandler?: (msg: InboundMessage) => void; + private webhooks: Map = new Map(); + + constructor( + private readonly webhookConfigs: WebhookConfig[], + private readonly channelLookup: ChannelLookup, + ) { + for (const wh of webhookConfigs) { + this.webhooks.set(wh.name, wh); + } + } + + // ChannelAdapter interface methods... + get status(): ChannelStatus { return this._status; } + async connect(): Promise { this._status = 'connected'; } + async disconnect(): Promise { this._status = 'disconnected'; } + onMessage(handler: (msg: InboundMessage) => void): void { this.messageHandler = handler; } + + async send(peerId: string, message: OutboundMessage): Promise { + // Route to output channel (same pattern as CronScheduler.send) + const webhook = this.webhooks.get(peerId); + if (!webhook) return; + const outputAdapter = this.channelLookup.get(webhook.output.channel); + if (!outputAdapter) return; + await outputAdapter.send(webhook.output.peer, message); + } + + /** + * Handle incoming HTTP request. Called by the gateway HTTP handler. + * Returns true if the request was handled (webhook found), false otherwise. + */ + async handleRequest(webhookName: string, req: IncomingMessage, res: ServerResponse): Promise { + const webhook = this.webhooks.get(webhookName); + if (!webhook || !webhook.enabled) { + return false; // Not found, let gateway return 404 + } + + // Read body + const body = await readBody(req); + + // Verify HMAC signature if secret configured + if (webhook.secret) { + const signature = req.headers['x-webhook-signature'] as string | undefined; + if (!verifyHmac(body, webhook.secret, signature)) { + res.writeHead(401, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Invalid signature' })); + return true; + } + } + + // Build message text from template + const messageText = renderTemplate(webhook.message, body); + + // Produce InboundMessage + const msg: InboundMessage = { + id: `webhook-${webhookName}-${Date.now()}`, + channel: 'webhook', + senderId: webhookName, + senderName: `webhook:${webhookName}`, + text: messageText, + timestamp: Date.now(), + metadata: { webhookName, contentType: req.headers['content-type'] }, + }; + + this.messageHandler?.(msg); + + res.writeHead(202, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ accepted: true, id: msg.id })); + return true; + } + + getWebhookNames(): string[] { + return Array.from(this.webhooks.keys()); + } +} +``` + +### Integration in Gateway HTTP Handler + +In `GatewayServer.handleHttpRequest()`, add route matching **before** the static file serving: + +```typescript +// Check for webhook routes: POST /webhooks/:name +if (req.method === 'POST' && req.url) { + const match = req.url.match(/^\/webhooks\/([a-zA-Z0-9_-]+)/); + if (match && this.webhookHandler) { + const handled = await this.webhookHandler.handleRequest(match[1], req, res); + if (handled) return; + } +} +``` + +The `webhookHandler` reference is passed into `GatewayServerConfig` as an optional field, similar to how `channelRegistry` is passed. + +### Integration in Daemon Startup + +In `src/daemon/index.ts`, after the cron scheduler block (~line 812-817): + +```typescript +// Register webhook handler (if any webhooks configured) +let webhookHandler: WebhookHandler | undefined; +if (config.automation.webhooks.length > 0) { + webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry); + channelRegistry.register(webhookHandler); + console.log(`Registered ${config.automation.webhooks.length} webhook(s)`); +} +``` + +Then pass `webhookHandler` to the `GatewayServer` constructor. + +### HMAC Verification + +Standard approach: `X-Webhook-Signature: sha256=` header, verified via `crypto.createHmac('sha256', secret).update(body).digest('hex')` with timing-safe comparison. + +Also support gateway token auth as an alternative (already handled by the gateway auth middleware for HTTP). + +### Template Rendering + +Simple template engine for the `message` field: +- `{{body}}` — raw request body string +- `{{json.fieldName}}` — extracted JSON field (dot notation for nested) +- Default: `{{body}}` if no template specified + +### Test Plan + +`src/automation/webhooks.test.ts`: +- Implements ChannelAdapter interface (name, status) +- `handleRequest()` produces InboundMessage with correct fields +- HMAC verification rejects invalid signatures +- HMAC verification passes valid signatures +- Returns false for unknown webhook names +- Returns false for disabled webhooks +- `send()` routes to output channel (same as cron test pattern) +- Template rendering with `{{body}}` and `{{json.field}}` +- `getWebhookNames()` lists all webhook names + +### Config Example + +```yaml +automation: + webhooks: + - name: github-push + secret: "whsec_..." + message: "GitHub push to {{json.repository.full_name}}: {{json.head_commit.message}}" + output: + channel: telegram + peer: "123456" + - name: alertmanager + message: "Alert: {{json.alerts.0.annotations.summary}}" + output: + channel: telegram + peer: "123456" +``` + +--- + +## 2. Dockerfile + +**Complexity**: Simple + +### Overview + +Multi-stage Docker build for the Flynn daemon. No TypeScript source code changes required — this is purely build/deployment infrastructure. + +### Files to Create + +| File | Purpose | +|------|---------| +| `Dockerfile` | Multi-stage build | +| `.dockerignore` | Exclude dev artifacts | +| `docker-compose.yml` | Example deployment with volumes | + +### Dockerfile Design + +```dockerfile +# ── Stage 1: Build ────────────────────────────────────────────── +FROM node:22-alpine AS builder + +# Install pnpm +RUN corepack enable && corepack prepare pnpm@latest --activate + +WORKDIR /app + +# Install dependencies first (layer caching) +COPY package.json pnpm-lock.yaml ./ +RUN pnpm install --frozen-lockfile + +# Copy source and build +COPY tsconfig.json ./ +COPY src/ src/ + +# better-sqlite3 needs build tools for native compilation +RUN apk add --no-cache python3 make g++ && \ + pnpm build + +# ── Stage 2: Runtime ──────────────────────────────────────────── +FROM node:22-alpine AS runtime + +RUN corepack enable && corepack prepare pnpm@latest --activate + +WORKDIR /app + +# Copy package files and install production deps only +COPY package.json pnpm-lock.yaml ./ +RUN apk add --no-cache python3 make g++ && \ + pnpm install --frozen-lockfile --prod && \ + apk del python3 make g++ + +# Copy compiled output from builder +COPY --from=builder /app/dist dist/ + +# Copy gateway UI assets (if they exist) +COPY --from=builder /app/src/gateway/ui dist/gateway/ui/ + +# Copy prompt templates and bundled skills +COPY prompts/ prompts/ +COPY skills/ skills/ + +# Create data directories +RUN mkdir -p /data/memory /data/sessions /config + +# Runtime config +ENV NODE_ENV=production +ENV FLYNN_CONFIG=/config/config.yaml +ENV FLYNN_DATA_DIR=/data + +EXPOSE 18800 + +# Health check +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD wget -qO- http://localhost:18800/ || exit 1 + +ENTRYPOINT ["node", "dist/cli/index.js"] +CMD ["start"] +``` + +### Key Considerations + +**better-sqlite3 native deps**: This is the trickiest part. `better-sqlite3` requires native compilation. Options: +1. Install build tools (`python3`, `make`, `g++`) in the runtime stage, compile, then remove — this is the safest approach and keeps the image functional. +2. Use `--build-from-source` flag and carry the built `.node` binary from the builder stage. This requires matching the Node.js version and Alpine version exactly between stages (which we do since both use `node:22-alpine`). +3. Alternatively, use `better-sqlite3`'s prebuilt binaries if they ship for Alpine/musl — check at build time. + +**Recommended approach**: Option 2 — copy `node_modules` from builder stage since both stages use identical base image: + +```dockerfile +# In runtime stage, instead of separate pnpm install: +COPY --from=builder /app/node_modules node_modules/ +``` + +This eliminates the need for build tools in the runtime image entirely and produces a smaller image. + +### .dockerignore + +``` +node_modules/ +dist/ +.git/ +.gitignore +*.log +.env +.env.* +.worktrees/ +docs/ +src/**/*.test.ts +vitest.config.* +eslint.config.* +tsconfig.json +*.md +!README.md +``` + +### docker-compose.yml + +```yaml +version: '3.8' +services: + flynn: + build: . + image: flynn:latest + container_name: flynn + restart: unless-stopped + ports: + - "18800:18800" + volumes: + - ./config.yaml:/config/config.yaml:ro + - flynn-data:/data + - flynn-memory:/data/memory + environment: + - FLYNN_CONFIG=/config/config.yaml + - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} + # Add other provider keys as needed + healthcheck: + test: ["CMD", "wget", "-qO-", "http://localhost:18800/"] + interval: 30s + timeout: 5s + retries: 3 + +volumes: + flynn-data: + flynn-memory: +``` + +### Daemon Changes for Docker + +The daemon currently defaults data dir to `~/.local/share/flynn`. For Docker, we need to respect an environment variable override. Check if `src/daemon/index.ts` already handles this. + +Looking at line 525: `const dataDir = resolve(homedir(), '.local/share/flynn');` + +**Modification needed** in `src/daemon/index.ts`: + +```typescript +// Support FLYNN_DATA_DIR env var for Docker deployments +const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn'); +``` + +Similarly, config loading in `src/config/loader.ts` should respect `FLYNN_CONFIG` env var if not already. + +### Files to Modify + +| File | Change | +|------|--------| +| `src/daemon/index.ts` | Support `FLYNN_DATA_DIR` environment variable | +| `src/config/loader.ts` | Support `FLYNN_CONFIG` environment variable (verify) | + +### Test Plan + +No unit tests needed (Docker build is tested by building). Integration test: +- `docker build -t flynn:test .` +- `docker run --rm flynn:test --help` (verify CLI works) +- `docker compose up -d` with test config (verify startup) + +--- + +## 3. Heartbeat + +**Complexity**: Medium + +### Overview + +Periodic self-check system that validates daemon health and optionally notifies a configured channel on failure. Fits naturally as an extension to the `src/automation/` module, alongside cron. + +### Config Schema Additions + +Add to `src/config/schema.ts`: + +```typescript +const heartbeatCheckSchema = z.enum([ + 'gateway', // Is the HTTP/WS server responsive? + 'model', // Can we reach the default model provider? + 'channels', // Are all channel adapters connected? + 'memory', // Is the memory store readable/writable? + 'disk', // Is disk space above threshold? +]); + +const heartbeatSchema = z.object({ + enabled: z.boolean().default(false), + /** Check interval (e.g. '60s', '5m', '1h'). Default: '5m'. */ + interval: z.string().default('5m'), + /** Which checks to run. Default: all. */ + checks: z.array(heartbeatCheckSchema).default(['gateway', 'model', 'channels', 'memory', 'disk']), + /** Optional notification on failure. */ + notify: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }).optional(), + /** Number of consecutive failures before notifying. Default: 2. */ + failure_threshold: z.number().min(1).max(10).default(2), + /** Disk space warning threshold in MB. Default: 100. */ + disk_threshold_mb: z.number().min(10).default(100), +}).default({}); +``` + +Extend `automationSchema`: + +```typescript +const automationSchema = z.object({ + cron: z.array(cronJobSchema).default([]), + webhooks: z.array(webhookSchema).default([]), + heartbeat: heartbeatSchema, +}).default({}); +``` + +Add type export: + +```typescript +export type HeartbeatConfig = z.infer; +``` + +### Files to Create + +| File | Purpose | +|------|---------| +| `src/automation/heartbeat.ts` | `HeartbeatMonitor` class | +| `src/automation/heartbeat.test.ts` | Unit tests | + +### Files to Modify + +| File | Change | +|------|--------| +| `src/config/schema.ts` | Add `heartbeatSchema`, extend `automationSchema`, add type export | +| `src/automation/index.ts` | Export `HeartbeatMonitor` | +| `src/daemon/index.ts` | Instantiate `HeartbeatMonitor`, register shutdown handler | + +### Key Class: `HeartbeatMonitor` + +```typescript +// src/automation/heartbeat.ts + +import { statfsSync } from 'fs'; +import type { HeartbeatConfig } from '../config/schema.js'; +import type { ChannelAdapter, ChannelStatus, OutboundMessage } from '../channels/types.js'; +import type { ModelRouter } from '../models/router.js'; + +export interface HeartbeatCheckResult { + name: string; + healthy: boolean; + message: string; + durationMs: number; +} + +export interface HeartbeatResult { + timestamp: number; + healthy: boolean; + checks: HeartbeatCheckResult[]; +} + +interface HeartbeatDeps { + config: HeartbeatConfig; + /** Get the gateway HTTP server for health probing. */ + getGatewayPort?: () => number; + /** Model router for provider health check. */ + modelRouter?: ModelRouter; + /** Channel registry for adapter status checks. */ + getChannels?: () => Array<{ name: string; status: ChannelStatus }>; + /** Memory store dir for read/write check. */ + memoryDir?: string; + /** Data dir for disk space check. */ + dataDir?: string; + /** Send notification to a channel. */ + notify?: (channel: string, peer: string, message: OutboundMessage) => Promise; +} + +export class HeartbeatMonitor { + private timer?: ReturnType; + private consecutiveFailures = 0; + private lastResult?: HeartbeatResult; + + constructor(private readonly deps: HeartbeatDeps) {} + + start(): void { + if (!this.deps.config.enabled) return; + const intervalMs = parseInterval(this.deps.config.interval); + this.timer = setInterval(() => this.runChecks(), intervalMs); + // Run first check after a short delay (let services finish starting) + setTimeout(() => this.runChecks(), 5000); + } + + stop(): void { + if (this.timer) { + clearInterval(this.timer); + this.timer = undefined; + } + } + + /** Run all configured checks and optionally notify on failure. */ + async runChecks(): Promise { + const checks: HeartbeatCheckResult[] = []; + const enabledChecks = this.deps.config.checks; + + for (const checkName of enabledChecks) { + const start = Date.now(); + try { + const result = await this.runSingleCheck(checkName); + checks.push({ ...result, durationMs: Date.now() - start }); + } catch (error) { + checks.push({ + name: checkName, + healthy: false, + message: error instanceof Error ? error.message : String(error), + durationMs: Date.now() - start, + }); + } + } + + const healthy = checks.every(c => c.healthy); + const result: HeartbeatResult = { timestamp: Date.now(), healthy, checks }; + this.lastResult = result; + + if (!healthy) { + this.consecutiveFailures++; + if (this.consecutiveFailures >= this.deps.config.failure_threshold) { + await this.sendNotification(result); + } + } else { + // Reset on recovery + if (this.consecutiveFailures >= this.deps.config.failure_threshold) { + await this.sendRecoveryNotification(result); + } + this.consecutiveFailures = 0; + } + + return result; + } + + getLastResult(): HeartbeatResult | undefined { + return this.lastResult; + } + + private async runSingleCheck(name: string): Promise> { + switch (name) { + case 'gateway': return this.checkGateway(); + case 'model': return this.checkModel(); + case 'channels': return this.checkChannels(); + case 'memory': return this.checkMemory(); + case 'disk': return this.checkDisk(); + default: return { name, healthy: false, message: `Unknown check: ${name}` }; + } + } + + // Individual check implementations... +} +``` + +### Individual Check Implementations + +1. **Gateway check**: HTTP GET to `http://localhost:/` — expects non-error response. +2. **Model check**: Call `modelRouter.healthCheck()` or attempt a minimal completion with max_tokens=1. Note: `ModelRouter` doesn't currently have a health check method, so we'd add a lightweight one that just verifies the API key/connection. Alternatively, use a simpler approach: verify the model client can be instantiated without errors, or do a `models.list()` API call for providers that support it. +3. **Channels check**: Iterate registered channels, verify all have `status === 'connected'`. +4. **Memory check**: Attempt `store.read('global')` and verify no exceptions. +5. **Disk check**: Use `statfsSync()` on the data directory, calculate free space in MB, compare to threshold. + +### Notification Format + +``` +⚠️ Flynn Heartbeat Failure + +Failed checks: +- gateway: Connection refused on port 18800 +- model: API key invalid (401) + +Consecutive failures: 3/2 (threshold) +``` + +And on recovery: + +``` +✅ Flynn Heartbeat Recovery + +All checks passing after 3 consecutive failures. +``` + +### Integration in Daemon Startup + +In `src/daemon/index.ts`, after all services are started (after `gateway.start()`): + +```typescript +// Initialize heartbeat monitor (if enabled) +let heartbeatMonitor: HeartbeatMonitor | undefined; +if (config.automation.heartbeat?.enabled) { + heartbeatMonitor = new HeartbeatMonitor({ + config: config.automation.heartbeat, + getGatewayPort: () => config.server.port, + modelRouter, + getChannels: () => channelRegistry.list().map(a => ({ name: a.name, status: a.status })), + memoryDir: memoryDir, + dataDir, + notify: config.automation.heartbeat.notify + ? async (channel, peer, message) => { + const adapter = channelRegistry.get(channel); + if (adapter) await adapter.send(peer, message); + } + : undefined, + }); + heartbeatMonitor.start(); + console.log(`Heartbeat monitor started (interval: ${config.automation.heartbeat.interval})`); + + lifecycle.onShutdown(async () => { + heartbeatMonitor!.stop(); + console.log('Heartbeat monitor stopped'); + }); +} +``` + +### Interval Parsing + +Reuse or create a simple parser (similar to `parseDuration` in `src/session/index.ts`): + +```typescript +function parseInterval(str: string): number { + const match = str.match(/^(\d+)(s|m|h)$/); + if (!match) return 300_000; // default 5 minutes + const [, value, unit] = match; + const multipliers: Record = { s: 1000, m: 60_000, h: 3_600_000 }; + return parseInt(value) * multipliers[unit]; +} +``` + +### Exposing via Gateway (Optional Enhancement) + +Add a `system.heartbeat` handler to the gateway WS handlers that returns `heartbeatMonitor.getLastResult()`. This lets the web UI or CLI display health status. This is a nice-to-have and can be done as a follow-up. + +### Test Plan + +`src/automation/heartbeat.test.ts`: +- `start()` does nothing when `enabled: false` +- `runChecks()` runs all configured checks +- `runChecks()` returns healthy=true when all checks pass +- `runChecks()` returns healthy=false when any check fails +- Notification sent after `failure_threshold` consecutive failures +- Recovery notification sent when checks pass after failures +- `stop()` clears the timer +- `getLastResult()` returns most recent result +- Individual check tests: + - `checkChannels()` detects disconnected adapters + - `checkDisk()` detects low disk space + - `checkMemory()` handles missing memory dir +- `parseInterval()` correctly parses '60s', '5m', '1h' + +### Config Example + +```yaml +automation: + heartbeat: + enabled: true + interval: "5m" + checks: [gateway, model, channels, memory, disk] + notify: + channel: telegram + peer: "123456" + failure_threshold: 2 + disk_threshold_mb: 100 +``` + +--- + +## 4. Vector Memory Search + +**Complexity**: Complex + +### Overview + +Add embedding-based semantic search to the existing `MemoryStore`, enabling hybrid search (vector similarity + keyword). Provider-agnostic embedding generation with graceful fallback to keyword-only search when embeddings are unavailable. + +### Architecture Decision: Storage Backend + +**Options considered**: +1. **SQLite with manual vector math** — Store embeddings as BLOBs in the existing `better-sqlite3` database, compute cosine similarity in JS. Simple, no new deps. +2. **SQLite with sqlite-vec extension** — Native vector similarity in SQL. Requires native extension compilation, complicates Docker build. +3. **Separate HNSW index (hnswlib-node)** — Purpose-built ANN index. Fast for large collections but adds a native dep. +4. **In-memory float arrays** — Store embeddings as JSON files alongside markdown, load into memory, compute similarity in JS. + +**Recommendation**: Option 1 (SQLite with manual vector math). Rationale: +- `better-sqlite3` is already a dependency +- Memory collections are small enough (hundreds to low thousands of chunks) that brute-force cosine similarity in JS is fast enough (<10ms for 1000 vectors) +- No new native dependencies +- Embeddings stored in existing data directory pattern +- Clean upgrade path to sqlite-vec if performance becomes an issue + +### Config Schema Additions + +Extend `memorySchema` in `src/config/schema.ts`: + +```typescript +const embeddingProviderSchema = z.enum(['openai', 'gemini', 'ollama', 'llamacpp']); + +const embeddingSchema = z.object({ + enabled: z.boolean().default(false), + provider: embeddingProviderSchema.default('openai'), + model: z.string().default('text-embedding-3-small'), + /** Provider endpoint (required for ollama/llamacpp). */ + endpoint: z.string().optional(), + /** API key (required for openai/gemini). */ + api_key: z.string().optional(), + /** Embedding vector dimensions. Auto-detected from model if not set. */ + dimensions: z.number().optional(), + /** Max tokens per chunk for embedding. Default: 512. */ + chunk_size: z.number().min(64).max(8192).default(512), + /** Overlap between chunks in tokens. Default: 50. */ + chunk_overlap: z.number().min(0).max(1024).default(50), + /** Number of top results to return from vector search. Default: 5. */ + top_k: z.number().min(1).max(50).default(5), + /** Weight for vector results vs keyword results in hybrid scoring (0.0 = keyword only, 1.0 = vector only). Default: 0.7. */ + hybrid_weight: z.number().min(0).max(1).default(0.7), +}).default({}); + +const memorySchema = z.object({ + enabled: z.boolean().default(true), + dir: z.string().optional(), + auto_extract: z.boolean().default(true), + max_context_tokens: z.number().min(100).max(10000).default(2000), + embedding: embeddingSchema, +}).default({}); +``` + +Add type exports: + +```typescript +export type EmbeddingConfig = z.infer; +``` + +### Files to Create + +| File | Purpose | +|------|---------| +| `src/memory/embeddings.ts` | `EmbeddingProvider` interface + provider implementations | +| `src/memory/embeddings.test.ts` | Unit tests for embedding providers | +| `src/memory/vector-store.ts` | `VectorStore` class — SQLite-backed vector storage | +| `src/memory/vector-store.test.ts` | Unit tests for vector store | +| `src/memory/chunker.ts` | Text chunking utility | +| `src/memory/chunker.test.ts` | Unit tests for chunker | +| `src/memory/hybrid-search.ts` | `HybridSearch` class — combines vector + keyword | +| `src/memory/hybrid-search.test.ts` | Unit tests for hybrid search | + +### Files to Modify + +| File | Change | +|------|--------| +| `src/config/schema.ts` | Add `embeddingSchema`, extend `memorySchema`, add type export | +| `src/memory/store.ts` | Add `searchHybrid()` method, accept optional `VectorStore` in constructor | +| `src/memory/index.ts` | Export new modules | +| `src/tools/builtin/memory-search.ts` | Use `searchHybrid()` when available, fall back to `search()` | +| `src/daemon/index.ts` | Initialize `EmbeddingProvider`, `VectorStore`, wire into `MemoryStore` | + +### Key Interfaces and Classes + +#### EmbeddingProvider + +```typescript +// src/memory/embeddings.ts + +export interface EmbeddingProvider { + /** Generate embeddings for one or more text chunks. */ + embed(texts: string[]): Promise; + /** Embedding vector dimensions. */ + readonly dimensions: number; +} + +export class OpenAIEmbeddingProvider implements EmbeddingProvider { + // Uses openai SDK (already a dep) with embeddings.create() + readonly dimensions: number; + constructor(config: { apiKey?: string; model: string; dimensions?: number }) { ... } + async embed(texts: string[]): Promise { ... } +} + +export class GeminiEmbeddingProvider implements EmbeddingProvider { + // Uses @google/generative-ai SDK (already a dep) with embedContent() + readonly dimensions: number; + constructor(config: { apiKey?: string; model: string }) { ... } + async embed(texts: string[]): Promise { ... } +} + +export class OllamaEmbeddingProvider implements EmbeddingProvider { + // Uses ollama SDK (already a dep) with embeddings() + readonly dimensions: number; + constructor(config: { host?: string; model: string }) { ... } + async embed(texts: string[]): Promise { ... } +} + +export class LlamaCppEmbeddingProvider implements EmbeddingProvider { + // HTTP POST to /embedding endpoint + readonly dimensions: number; + constructor(config: { endpoint: string; model?: string }) { ... } + async embed(texts: string[]): Promise { ... } +} + +export function createEmbeddingProvider(config: EmbeddingConfig): EmbeddingProvider { ... } +``` + +#### Text Chunker + +```typescript +// src/memory/chunker.ts + +export interface Chunk { + /** The text content of this chunk. */ + text: string; + /** Source namespace. */ + namespace: string; + /** Starting line number in the source file (1-based). */ + startLine: number; + /** Ending line number in the source file (1-based). */ + endLine: number; +} + +/** + * Split memory content into overlapping chunks suitable for embedding. + * + * Strategy: split on paragraph boundaries (double newline), then merge + * small paragraphs to reach target chunk size, with configurable overlap. + */ +export function chunkText( + content: string, + namespace: string, + options: { chunkSize: number; chunkOverlap: number }, +): Chunk[] { ... } +``` + +#### VectorStore + +```typescript +// src/memory/vector-store.ts + +import Database from 'better-sqlite3'; + +export interface VectorSearchResult { + namespace: string; + chunkText: string; + startLine: number; + endLine: number; + similarity: number; +} + +export class VectorStore { + private db: Database.Database; + + constructor(dbPath: string) { + this.db = new Database(dbPath); + this.initSchema(); + } + + private initSchema(): void { + this.db.exec(` + CREATE TABLE IF NOT EXISTS embeddings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + namespace TEXT NOT NULL, + chunk_text TEXT NOT NULL, + start_line INTEGER NOT NULL, + end_line INTEGER NOT NULL, + embedding BLOB NOT NULL, + created_at INTEGER NOT NULL DEFAULT (unixepoch()), + content_hash TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_embeddings_namespace ON embeddings(namespace); + CREATE INDEX IF NOT EXISTS idx_embeddings_hash ON embeddings(content_hash); + `); + } + + /** Store embedding vectors for a set of chunks. */ + upsertChunks(chunks: Array<{ namespace: string; text: string; startLine: number; endLine: number; embedding: number[]; contentHash: string }>): void { ... } + + /** Delete all embeddings for a namespace (for re-indexing). */ + deleteNamespace(namespace: string): void { ... } + + /** Check if a namespace+hash combination already exists (skip re-embedding). */ + hasContentHash(namespace: string, hash: string): boolean { ... } + + /** + * Search for similar vectors using cosine similarity. + * Computed in JS — fine for <10k vectors. + */ + search(queryEmbedding: number[], topK: number): VectorSearchResult[] { + const allRows = this.db.prepare('SELECT * FROM embeddings').all(); + // Compute cosine similarity for each, sort, take topK + ... + } + + close(): void { + this.db.close(); + } +} +``` + +The embedding is stored as a BLOB (Float32Array buffer). Cosine similarity: + +```typescript +function cosineSimilarity(a: number[], b: number[]): number { + let dotProduct = 0; + let normA = 0; + let normB = 0; + for (let i = 0; i < a.length; i++) { + dotProduct += a[i] * b[i]; + normA += a[i] * a[i]; + normB += b[i] * b[i]; + } + return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)); +} +``` + +#### HybridSearch + +```typescript +// src/memory/hybrid-search.ts + +import type { SearchResult } from './store.js'; +import type { VectorSearchResult, VectorStore } from './vector-store.js'; +import type { EmbeddingProvider } from './embeddings.js'; +import type { MemoryStore } from './store.js'; + +export interface HybridSearchResult { + namespace: string; + content: string; + context: string; + line: number; + score: number; + source: 'keyword' | 'vector' | 'both'; +} + +export class HybridSearch { + constructor( + private readonly memoryStore: MemoryStore, + private readonly vectorStore: VectorStore, + private readonly embeddingProvider: EmbeddingProvider, + private readonly hybridWeight: number = 0.7, + ) {} + + /** + * Search using both keyword and vector similarity, merge and rank results. + * + * Scoring: final_score = (hybrid_weight * vector_score) + ((1 - hybrid_weight) * keyword_score) + * Keyword score: 1.0 for exact match (normalized by result count) + * Vector score: cosine similarity (0.0 - 1.0) + */ + async search(query: string, topK: number = 5): Promise { + // Run both searches in parallel + const [keywordResults, vectorResults] = await Promise.all([ + Promise.resolve(this.memoryStore.search(query)), + this.vectorSearch(query, topK * 2), // fetch more for merge + ]); + + return this.mergeResults(keywordResults, vectorResults, topK); + } + + private async vectorSearch(query: string, topK: number): Promise { + const [queryEmbedding] = await this.embeddingProvider.embed([query]); + return this.vectorStore.search(queryEmbedding, topK); + } + + private mergeResults( + keyword: SearchResult[], + vector: VectorSearchResult[], + topK: number, + ): HybridSearchResult[] { + // Deduplicate by namespace+line, compute hybrid score, sort, take topK + ... + } +} +``` + +### Indexing Pipeline + +When does indexing happen? Two approaches: + +1. **On write**: When `MemoryStore.write()` is called, also chunk + embed + store. This keeps the index always up to date but adds latency to writes. +2. **Background indexer**: A separate timer that scans for unindexed content (via content hash comparison) and indexes it periodically. + +**Recommendation**: Background indexer with write-triggered hint. On `MemoryStore.write()`, mark the namespace as "dirty". The background indexer (runs every 30s or so) processes only dirty namespaces. This keeps writes fast while ensuring the index stays reasonably fresh. + +```typescript +// Addition to MemoryStore: +private dirtyNamespaces: Set = new Set(); + +write(namespace: string, content: string, mode: 'append' | 'replace'): void { + // ... existing write logic ... + this.dirtyNamespaces.add(namespace); +} + +getDirtyNamespaces(): string[] { + const ns = Array.from(this.dirtyNamespaces); + this.dirtyNamespaces.clear(); + return ns; +} +``` + +Indexer runs in daemon: + +```typescript +// In daemon startup, after memory store init: +if (memoryStore && config.memory.embedding.enabled) { + const embeddingProvider = createEmbeddingProvider(config.memory.embedding); + const vectorStore = new VectorStore(resolve(dataDir, 'vectors.db')); + const hybridSearch = new HybridSearch(memoryStore, vectorStore, embeddingProvider, config.memory.embedding.hybrid_weight); + + // Background indexer + const indexInterval = setInterval(async () => { + const dirty = memoryStore.getDirtyNamespaces(); + for (const ns of dirty) { + const content = memoryStore.read(ns); + const contentHash = createHash('md5').update(content).digest('hex'); + if (vectorStore.hasContentHash(ns, contentHash)) continue; + + const chunks = chunkText(content, ns, { + chunkSize: config.memory.embedding.chunk_size, + chunkOverlap: config.memory.embedding.chunk_overlap, + }); + const embeddings = await embeddingProvider.embed(chunks.map(c => c.text)); + vectorStore.upsertChunks(chunks.map((c, i) => ({ + namespace: c.namespace, + text: c.text, + startLine: c.startLine, + endLine: c.endLine, + embedding: embeddings[i], + contentHash, + }))); + } + }, 30_000); + + lifecycle.onShutdown(async () => { + clearInterval(indexInterval); + vectorStore.close(); + }); +} +``` + +### Integration with memory.search Tool + +Modify `src/tools/builtin/memory-search.ts` to accept an optional `HybridSearch` instance: + +```typescript +export function createMemorySearchTool(store: MemoryStore, hybridSearch?: HybridSearch): Tool { + return { + name: 'memory.search', + description: 'Search across all memory files for a keyword or phrase. Uses semantic search when available, with keyword fallback.', + inputSchema: { + type: 'object', + properties: { + query: { + type: 'string', + description: 'The search query (supports semantic meaning when embeddings are enabled)', + }, + }, + required: ['query'], + }, + execute: async (rawArgs: unknown): Promise => { + const args = rawArgs as { query: string }; + + try { + if (hybridSearch) { + const results = await hybridSearch.search(args.query); + // Format hybrid results... + } else { + // Existing keyword-only path (unchanged) + const results = store.search(args.query); + // ...existing formatting... + } + } catch (error) { + // On vector search failure, fall back to keyword + const results = store.search(args.query); + // ...format with note about fallback... + } + }, + }; +} +``` + +Also update `createMemoryTools()` in `src/tools/builtin/index.ts` to accept the optional `HybridSearch`: + +```typescript +export function createMemoryTools(store: MemoryStore, hybridSearch?: HybridSearch): Tool[] { + return [ + createMemoryReadTool(store), + createMemoryWriteTool(store), + createMemorySearchTool(store, hybridSearch), + ]; +} +``` + +### Graceful Fallback Chain + +1. If `embedding.enabled: false` (default) — pure keyword search, no embedding deps loaded +2. If `embedding.enabled: true` but provider unreachable — log warning, fall back to keyword search +3. If embedding succeeds but vector DB corrupted — catch error, fall back to keyword search +4. If embedding succeeds — hybrid search with configurable weight + +### Test Plan + +**`src/memory/chunker.test.ts`**: +- Splits text into chunks of configured size +- Respects paragraph boundaries +- Generates correct overlap between chunks +- Handles empty content +- Preserves line number tracking +- Single paragraph smaller than chunk size returns one chunk + +**`src/memory/embeddings.test.ts`**: +- `createEmbeddingProvider()` creates correct provider for each type +- OpenAI provider calls correct API endpoint (mocked) +- Ollama provider calls correct endpoint (mocked) +- Provider returns correct dimensionality +- Handles API errors gracefully + +**`src/memory/vector-store.test.ts`**: +- Creates SQLite database with correct schema +- `upsertChunks()` stores and retrieves embeddings +- `search()` returns results sorted by similarity +- `deleteNamespace()` removes all embeddings for a namespace +- `hasContentHash()` detects existing content +- `close()` cleans up database connection +- Cosine similarity computation is correct + +**`src/memory/hybrid-search.test.ts`**: +- Returns keyword results when vector search unavailable +- Merges keyword and vector results with correct weighting +- Deduplicates results from both sources +- Respects `topK` limit +- Handles empty results from either source +- `source` field correctly indicates 'keyword', 'vector', or 'both' + +### Config Example + +```yaml +memory: + enabled: true + auto_extract: true + max_context_tokens: 2000 + embedding: + enabled: true + provider: openai + model: text-embedding-3-small + api_key: "${OPENAI_API_KEY}" + chunk_size: 512 + chunk_overlap: 50 + top_k: 5 + hybrid_weight: 0.7 +``` + +--- + +## Implementation Order + +Recommended sequence (dependencies and risk): + +| Order | Feature | Rationale | +|-------|---------|-----------| +| 1 | **Dockerfile** | No code changes, unblocks deployment testing for all other features | +| 2 | **Inbound Webhooks** | Clean pattern (mirrors cron), moderate scope, high utility | +| 3 | **Heartbeat** | Depends on running services for meaningful checks, benefits from Docker for testing | +| 4 | **Vector Memory Search** | Most complex, most files, most risk — do last when other features are stable | + +### Estimated Effort + +| Feature | New Files | Modified Files | Estimated Lines | Complexity | +|---------|-----------|----------------|-----------------|------------| +| Inbound Webhooks | 2 | 4 | ~300 | Medium | +| Dockerfile | 3 | 1-2 | ~100 | Simple | +| Heartbeat | 2 | 3 | ~350 | Medium | +| Vector Memory Search | 8 | 5 | ~800 | Complex | +| **Total** | **15** | **12-13** | **~1550** | | + +--- + +## Cross-Cutting Concerns + +### Config Schema Validation + +All new schema additions follow the existing pattern: +- Use `.default({})` for optional sections so the config is valid without them +- Export `z.infer<>` type aliases +- Existing tests in `src/config/schema.test.ts` should still pass (no breaking changes) + +### Daemon Startup Order + +New services slot into the existing startup sequence: + +1. Memory store (existing) +2. **Vector store + embedding provider** (new, after memory store) +3. Session store (existing) +4. Tool registry (existing) +5. **Memory tools with hybrid search** (modified) +6. Gateway server (existing) +7. Channel registry (existing) +8. **Webhook handler** (new, registered with channel registry) +9. Cron scheduler (existing) +10. Start all channels (existing) +11. Start gateway (existing) +12. **Heartbeat monitor** (new, after all services started) + +### Shutdown Order (LIFO) + +Added to lifecycle in reverse: +1. Heartbeat monitor stop +2. Gateway stop (existing) +3. Channel adapters stop (existing, includes webhook handler) +4. Vector store close +5. Process manager stop (existing) +6. Session store close (existing) diff --git a/src/automation/index.ts b/src/automation/index.ts index b751da3..304f9dd 100644 --- a/src/automation/index.ts +++ b/src/automation/index.ts @@ -1 +1,2 @@ export { CronScheduler } from './cron.js'; +export { WebhookHandler } from './webhooks.js'; diff --git a/src/automation/webhooks.test.ts b/src/automation/webhooks.test.ts new file mode 100644 index 0000000..3fba3e6 --- /dev/null +++ b/src/automation/webhooks.test.ts @@ -0,0 +1,307 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { WebhookHandler, _verifyHmac, _renderTemplate } from './webhooks.js'; +import type { WebhookConfig } from '../config/schema.js'; +import type { InboundMessage } from '../channels/types.js'; +import type { IncomingMessage, ServerResponse } from 'http'; +import { createHmac } from 'crypto'; +import { EventEmitter } from 'events'; + +function makeWebhook(overrides?: Partial): WebhookConfig { + return { + name: 'test-hook', + message: '{{body}}', + output: { channel: 'telegram', peer: '123' }, + enabled: true, + ...overrides, + }; +} + +/** Create a mock IncomingMessage that emits the given body. */ +function mockRequest(body: string, headers: Record = {}): IncomingMessage { + const emitter = new EventEmitter(); + (emitter as any).headers = headers; + // Simulate data arriving next tick + process.nextTick(() => { + emitter.emit('data', Buffer.from(body)); + emitter.emit('end'); + }); + return emitter as unknown as IncomingMessage; +} + +/** Create a mock ServerResponse that captures writeHead and end calls. */ +function mockResponse(): ServerResponse & { statusCode_: number; body_: string; headers_: Record } { + const res: any = { + statusCode_: 0, + body_: '', + headers_: {}, + writeHead(code: number, headers?: Record) { + res.statusCode_ = code; + if (headers) res.headers_ = headers; + return res; + }, + end(body?: string) { + res.body_ = body ?? ''; + return res; + }, + }; + return res; +} + +describe('WebhookHandler', () => { + let handler: WebhookHandler; + let mockChannelRegistry: { get: ReturnType }; + + beforeEach(() => { + mockChannelRegistry = { + get: vi.fn(), + }; + }); + + afterEach(async () => { + if (handler) { + await handler.disconnect(); + } + }); + + it('implements ChannelAdapter interface', () => { + handler = new WebhookHandler([], mockChannelRegistry as any); + expect(handler.name).toBe('webhook'); + expect(handler.status).toBe('disconnected'); + }); + + it('status changes to connected after connect()', async () => { + handler = new WebhookHandler([], mockChannelRegistry as any); + await handler.connect(); + expect(handler.status).toBe('connected'); + }); + + it('status changes to disconnected after disconnect()', async () => { + handler = new WebhookHandler([], mockChannelRegistry as any); + await handler.connect(); + await handler.disconnect(); + expect(handler.status).toBe('disconnected'); + }); + + it('lists registered webhook names', () => { + const webhooks = [ + makeWebhook({ name: 'hook-a' }), + makeWebhook({ name: 'hook-b', enabled: false }), + ]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + + const names = handler.getWebhookNames(); + expect(names).toEqual(['hook-a', 'hook-b']); + }); + + it('handleRequest produces correct InboundMessage', async () => { + const webhooks = [makeWebhook()]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + + const messages: InboundMessage[] = []; + handler.onMessage((msg: InboundMessage) => messages.push(msg)); + await handler.connect(); + + const req = mockRequest('hello world'); + const res = mockResponse(); + + const result = await handler.handleRequest('test-hook', req, res); + + expect(result).toBe(true); + expect(res.statusCode_).toBe(202); + expect(messages).toHaveLength(1); + expect(messages[0].channel).toBe('webhook'); + expect(messages[0].senderId).toBe('test-hook'); + expect(messages[0].text).toBe('hello world'); + }); + + it('returns false for unknown webhook', async () => { + handler = new WebhookHandler([], mockChannelRegistry as any); + await handler.connect(); + + const req = mockRequest('test'); + const res = mockResponse(); + + const result = await handler.handleRequest('nonexistent', req, res); + + expect(result).toBe(false); + expect(res.statusCode_).toBe(404); + }); + + it('returns false for disabled webhook', async () => { + const webhooks = [makeWebhook({ enabled: false })]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + await handler.connect(); + + const req = mockRequest('test'); + const res = mockResponse(); + + const result = await handler.handleRequest('test-hook', req, res); + + expect(result).toBe(false); + expect(res.statusCode_).toBe(404); + }); + + it('verifies valid HMAC signature', async () => { + const secret = 'my-secret-key'; + const webhooks = [makeWebhook({ secret })]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + + const messages: InboundMessage[] = []; + handler.onMessage((msg: InboundMessage) => messages.push(msg)); + await handler.connect(); + + const body = '{"event":"push"}'; + const signature = 'sha256=' + createHmac('sha256', secret).update(body).digest('hex'); + + const req = mockRequest(body, { 'x-webhook-signature': signature }); + const res = mockResponse(); + + const result = await handler.handleRequest('test-hook', req, res); + + expect(result).toBe(true); + expect(res.statusCode_).toBe(202); + expect(messages).toHaveLength(1); + }); + + it('rejects invalid HMAC signature', async () => { + const secret = 'my-secret-key'; + const webhooks = [makeWebhook({ secret })]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + + const messages: InboundMessage[] = []; + handler.onMessage((msg: InboundMessage) => messages.push(msg)); + await handler.connect(); + + const req = mockRequest('{"event":"push"}', { 'x-webhook-signature': 'sha256=invalid' }); + const res = mockResponse(); + + const result = await handler.handleRequest('test-hook', req, res); + + expect(result).toBe(false); + expect(res.statusCode_).toBe(401); + expect(messages).toHaveLength(0); + }); + + it('rejects missing HMAC signature when secret is configured', async () => { + const secret = 'my-secret-key'; + const webhooks = [makeWebhook({ secret })]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + + const messages: InboundMessage[] = []; + handler.onMessage((msg: InboundMessage) => messages.push(msg)); + await handler.connect(); + + const req = mockRequest('{"event":"push"}'); + const res = mockResponse(); + + const result = await handler.handleRequest('test-hook', req, res); + + expect(result).toBe(false); + expect(res.statusCode_).toBe(401); + expect(messages).toHaveLength(0); + }); + + it('forwards response to output channel on send()', async () => { + const mockOutputAdapter = { + send: vi.fn().mockResolvedValue(undefined), + }; + mockChannelRegistry.get.mockReturnValue(mockOutputAdapter); + + const webhooks = [makeWebhook()]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + await handler.connect(); + + await handler.send('test-hook', { text: 'Agent response' }); + + expect(mockChannelRegistry.get).toHaveBeenCalledWith('telegram'); + expect(mockOutputAdapter.send).toHaveBeenCalledWith('123', { text: 'Agent response' }); + }); + + it('logs warning when output channel not found', async () => { + mockChannelRegistry.get.mockReturnValue(undefined); + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const webhooks = [makeWebhook()]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + await handler.connect(); + + await handler.send('test-hook', { text: 'Agent response' }); + + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('Output channel')); + warnSpy.mockRestore(); + }); + + it('logs warning when webhook name not found in send()', async () => { + const warnSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}); + + const webhooks = [makeWebhook()]; + handler = new WebhookHandler(webhooks, mockChannelRegistry as any); + await handler.connect(); + + await handler.send('nonexistent-hook', { text: 'response' }); + + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('No webhook')); + warnSpy.mockRestore(); + }); +}); + +describe('renderTemplate', () => { + it('replaces {{body}} with raw body', () => { + const result = _renderTemplate('Received: {{body}}', 'hello'); + expect(result).toBe('Received: hello'); + }); + + it('replaces {{json.field}} with JSON field value', () => { + const result = _renderTemplate('Event: {{json.action}}', '{"action":"push","repo":"test"}'); + expect(result).toBe('Event: push'); + }); + + it('replaces multiple {{json.field}} placeholders', () => { + const result = _renderTemplate( + '{{json.action}} on {{json.repo}}', + '{"action":"push","repo":"my-repo"}', + ); + expect(result).toBe('push on my-repo'); + }); + + it('returns empty string for missing JSON fields', () => { + const result = _renderTemplate('Value: {{json.missing}}', '{"action":"push"}'); + expect(result).toBe('Value: '); + }); + + it('returns empty string for invalid JSON body with json placeholder', () => { + const result = _renderTemplate('Value: {{json.field}}', 'not-json'); + expect(result).toBe('Value: '); + }); + + it('stringifies non-string JSON values', () => { + const result = _renderTemplate('Count: {{json.count}}', '{"count":42}'); + expect(result).toBe('Count: 42'); + }); + + it('handles template with both {{body}} and {{json.field}}', () => { + const body = '{"action":"deploy"}'; + const result = _renderTemplate('Action: {{json.action}}, Raw: {{body}}', body); + expect(result).toBe('Action: deploy, Raw: {"action":"deploy"}'); + }); +}); + +describe('verifyHmac', () => { + it('returns true for valid signature with sha256= prefix', () => { + const secret = 'test-secret'; + const body = 'test-body'; + const sig = 'sha256=' + createHmac('sha256', secret).update(body).digest('hex'); + expect(_verifyHmac(body, secret, sig)).toBe(true); + }); + + it('returns true for valid signature without prefix', () => { + const secret = 'test-secret'; + const body = 'test-body'; + const sig = createHmac('sha256', secret).update(body).digest('hex'); + expect(_verifyHmac(body, secret, sig)).toBe(true); + }); + + it('returns false for invalid signature', () => { + expect(_verifyHmac('body', 'secret', 'sha256=deadbeef')).toBe(false); + }); +}); diff --git a/src/automation/webhooks.ts b/src/automation/webhooks.ts new file mode 100644 index 0000000..eed77e5 --- /dev/null +++ b/src/automation/webhooks.ts @@ -0,0 +1,171 @@ +import { createHmac, timingSafeEqual } from 'crypto'; +import type { IncomingMessage, ServerResponse } from 'http'; +import type { WebhookConfig } from '../config/schema.js'; +import type { ChannelAdapter, ChannelStatus, InboundMessage, OutboundMessage } from '../channels/types.js'; + +/** Minimal interface for the parts of ChannelRegistry we need. */ +interface ChannelLookup { + get(name: string): { send(peerId: string, message: OutboundMessage): Promise } | undefined; +} + +/** Read the full request body as a string. */ +function readBody(req: IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + req.on('data', (chunk: Buffer) => chunks.push(chunk)); + req.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))); + req.on('error', reject); + }); +} + +/** Verify HMAC-SHA256 signature from the X-Webhook-Signature header. */ +function verifyHmac(body: string, secret: string, signature: string): boolean { + const expected = createHmac('sha256', secret).update(body).digest('hex'); + const sig = signature.startsWith('sha256=') ? signature.slice(7) : signature; + + if (expected.length !== sig.length) return false; + + try { + return timingSafeEqual(Buffer.from(expected, 'hex'), Buffer.from(sig, 'hex')); + } catch { + return false; + } +} + +/** + * Render a message template with {{body}} and {{json.field}} placeholders. + * - {{body}} is replaced with the raw request body. + * - {{json.field}} accesses a top-level field from the parsed JSON body. + */ +function renderTemplate(template: string, body: string): string { + let result = template.replace(/\{\{body\}\}/g, body); + + // Replace {{json.field}} placeholders + let parsed: Record | undefined; + result = result.replace(/\{\{json\.([^}]+)\}\}/g, (_match, field: string) => { + if (!parsed) { + try { + parsed = JSON.parse(body) as Record; + } catch { + return ''; + } + } + const value = parsed[field]; + if (value === undefined || value === null) return ''; + return typeof value === 'string' ? value : JSON.stringify(value); + }); + + return result; +} + +export class WebhookHandler implements ChannelAdapter { + readonly name = 'webhook'; + private _status: ChannelStatus = 'disconnected'; + private messageHandler?: (msg: InboundMessage) => void; + private webhooks: Map = new Map(); + + constructor( + private readonly webhookConfigs: WebhookConfig[], + private readonly channelLookup: ChannelLookup, + ) { + for (const webhook of webhookConfigs) { + this.webhooks.set(webhook.name, webhook); + } + } + + get status(): ChannelStatus { + return this._status; + } + + async connect(): Promise { + this._status = 'connected'; + + const enabledCount = this.webhookConfigs.filter(w => w.enabled).length; + if (enabledCount > 0) { + console.log(`WebhookHandler: ${enabledCount} webhook(s) registered`); + } + } + + async disconnect(): Promise { + this._status = 'disconnected'; + } + + async send(peerId: string, message: OutboundMessage): Promise { + // peerId is the webhook name — look up its output config + const webhook = this.webhooks.get(peerId); + if (!webhook) { + console.warn(`No webhook found for '${peerId}'`); + return; + } + + const outputAdapter = this.channelLookup.get(webhook.output.channel); + if (!outputAdapter) { + console.warn(`Output channel '${webhook.output.channel}' not found for webhook '${peerId}'`); + return; + } + + await outputAdapter.send(webhook.output.peer, message); + } + + onMessage(handler: (msg: InboundMessage) => void): void { + this.messageHandler = handler; + } + + /** + * Handle an incoming HTTP webhook request. + * Returns true if the webhook was found and processed, false otherwise. + */ + async handleRequest(webhookName: string, req: IncomingMessage, res: ServerResponse): Promise { + const webhook = this.webhooks.get(webhookName); + if (!webhook) { + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Unknown webhook' })); + return false; + } + + if (!webhook.enabled) { + res.writeHead(404, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Webhook disabled' })); + return false; + } + + const body = await readBody(req); + + // Verify HMAC if secret is configured + if (webhook.secret) { + const signature = req.headers['x-webhook-signature'] as string | undefined; + if (!signature || !verifyHmac(body, webhook.secret, signature)) { + res.writeHead(401, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Invalid signature' })); + return false; + } + } + + // Render message template + const text = renderTemplate(webhook.message, body); + + const msg: InboundMessage = { + id: `webhook-${webhookName}-${Date.now()}`, + channel: 'webhook', + senderId: webhookName, + senderName: `webhook:${webhookName}`, + text, + timestamp: Date.now(), + metadata: { webhookName, body }, + }; + + this.messageHandler?.(msg); + + res.writeHead(202, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ accepted: true })); + return true; + } + + /** Get list of all webhook names (enabled and disabled). */ + getWebhookNames(): string[] { + return Array.from(this.webhooks.keys()); + } +} + +// Export helpers for testing +export { readBody as _readBody, verifyHmac as _verifyHmac, renderTemplate as _renderTemplate }; diff --git a/src/cli/shared.ts b/src/cli/shared.ts index a1b51c4..860514b 100644 --- a/src/cli/shared.ts +++ b/src/cli/shared.ts @@ -8,9 +8,9 @@ export function getConfigPath(): string { return process.env.FLYNN_CONFIG ?? resolve(homedir(), '.config/flynn/config.yaml'); } -/** Get the data directory path. */ +/** Get the data directory path (FLYNN_DATA_DIR overrides default for Docker/custom deployments). */ export function getDataDir(): string { - return resolve(homedir(), '.local/share/flynn'); + return process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn'); } /** Load config without throwing. Returns { config } or { error }. */ diff --git a/src/cli/tui.ts b/src/cli/tui.ts index d764072..3b0a5c9 100644 --- a/src/cli/tui.ts +++ b/src/cli/tui.ts @@ -49,7 +49,7 @@ export function registerTuiCommand(program: Command): void { const { HookEngine } = await import('../hooks/index.js'); const { createModelRouter } = await import('../daemon/index.js'); - const dataDir = resolve(homedir(), '.local/share/flynn'); + const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn'); mkdirSync(dataDir, { recursive: true }); const sessionStore = new SessionStore(resolve(dataDir, 'sessions.db')); diff --git a/src/config/schema.ts b/src/config/schema.ts index 44a2e1d..fba63ea 100644 --- a/src/config/schema.ts +++ b/src/config/schema.ts @@ -108,8 +108,20 @@ const cronJobSchema = z.object({ timezone: z.string().optional(), }); +const webhookSchema = z.object({ + name: z.string().min(1, 'Webhook name is required'), + secret: z.string().optional(), + message: z.string().default('{{body}}'), + output: z.object({ + channel: z.string().min(1), + peer: z.string().min(1), + }), + enabled: z.boolean().default(true), +}); + const automationSchema = z.object({ cron: z.array(cronJobSchema).default([]), + webhooks: z.array(webhookSchema).default([]), }).default({}); const agentsSchema = z.object({ @@ -299,6 +311,7 @@ export type Config = z.infer; export type TelegramConfig = z.infer; export type ModelConfig = z.infer; export type CronJobConfig = z.infer; +export type WebhookConfig = z.infer; export type AgentsConfig = z.infer; export type CompactionConfig = z.infer; export type MemoryConfig = z.infer; diff --git a/src/daemon/index.ts b/src/daemon/index.ts index 06c37ce..859c9e6 100644 --- a/src/daemon/index.ts +++ b/src/daemon/index.ts @@ -15,7 +15,7 @@ import { MemoryStore } from '../memory/index.js'; import { createMemoryTools } from '../tools/builtin/index.js'; import { GatewayServer } from '../gateway/index.js'; import { ChannelRegistry, TelegramAdapter, WebChatAdapter, DiscordAdapter, SlackAdapter, WhatsAppAdapter } from '../channels/index.js'; -import { CronScheduler } from '../automation/index.js'; +import { CronScheduler, WebhookHandler } from '../automation/index.js'; import type { InboundMessage, OutboundMessage } from '../channels/index.js'; import { McpManager } from '../mcp/index.js'; import { SkillRegistry, SkillInstaller, loadAllSkills } from '../skills/index.js'; @@ -521,8 +521,8 @@ function createMessageRouter(deps: { export async function startDaemon(config: Config): Promise { const lifecycle = new Lifecycle(); - // Ensure data directory exists - const dataDir = resolve(homedir(), '.local/share/flynn'); + // Ensure data directory exists (FLYNN_DATA_DIR overrides default for Docker/custom deployments) + const dataDir = process.env.FLYNN_DATA_DIR ?? resolve(homedir(), '.local/share/flynn'); mkdirSync(dataDir, { recursive: true }); // Initialize memory store @@ -816,6 +816,15 @@ export async function startDaemon(config: Config): Promise { console.log(`Registered ${config.automation.cron.length} cron job(s)`); } + // Register webhook handler adapter (if any webhooks configured) + let webhookHandler: WebhookHandler | undefined; + if (config.automation.webhooks.length > 0) { + webhookHandler = new WebhookHandler(config.automation.webhooks, channelRegistry); + channelRegistry.register(webhookHandler); + gateway.setWebhookHandler(webhookHandler); + console.log(`Registered ${config.automation.webhooks.length} webhook(s)`); + } + // ── Register Tier 1 agent tools ───────────────────────────── // Session management tools (list, history, create, delete) diff --git a/src/gateway/server.ts b/src/gateway/server.ts index b0deba1..494d612 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -24,6 +24,7 @@ import type { SessionManager } from '../session/manager.js'; import type { Config } from '../config/index.js'; import type { ToolRegistry } from '../tools/registry.js'; import type { ToolExecutor } from '../tools/executor.js'; +import type { WebhookHandler } from '../automation/webhooks.js'; export interface GatewayServerConfig { port: number; @@ -42,6 +43,8 @@ export interface GatewayServerConfig { /** Optional callback for system.restart. Should trigger graceful shutdown + process restart. */ restart?: () => Promise; channelRegistry?: { list(): Array<{ readonly name: string; readonly status: string }> }; + /** Optional webhook handler for inbound webhook HTTP routes. */ + webhookHandler?: WebhookHandler; } export class GatewayServer { @@ -207,9 +210,19 @@ export class GatewayServer { /** * Handle incoming HTTP requests. * Optionally applies auth (when authHttp is enabled and a token is configured). - * Delegates to serveStatic for UI files; returns 404 if no UI dir or file not found. + * Routes webhook requests before auth; delegates to serveStatic for UI files. */ private async handleHttpRequest(req: IncomingMessage, res: ServerResponse): Promise { + // Webhook routes bypass gateway auth (they have their own HMAC auth) + if (this.config.webhookHandler && req.method === 'POST' && req.url) { + const match = req.url.match(/^\/webhooks\/([^/?]+)/); + if (match) { + const webhookName = decodeURIComponent(match[1]); + await this.config.webhookHandler.handleRequest(webhookName, req, res); + return; + } + } + // Apply auth to HTTP requests when configured const authConfig = this.config.auth ?? {}; if (this.config.authHttp !== false && authConfig.token) { @@ -281,4 +294,9 @@ export class GatewayServer { getMethods(): string[] { return this.router.listMethods(); } + + /** Set the webhook handler for inbound webhook HTTP routes (late binding). */ + setWebhookHandler(handler: WebhookHandler): void { + this.config.webhookHandler = handler; + } }