diff --git a/docs/plans/2026-03-18-swarm-monitor-plan.md b/docs/plans/2026-03-18-swarm-monitor-plan.md new file mode 100644 index 0000000..8665f1c --- /dev/null +++ b/docs/plans/2026-03-18-swarm-monitor-plan.md @@ -0,0 +1,1282 @@ +# Swarm Monitor Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add a `swarm-monitor` binary that polls docker-compose services in `~/lab/swarm`, emits `swarm.snapshot` and `swarm.service.snapshot` events to NATS, and surfaces service status on the dashboard strip and a new unified `/infrastructure` page (replacing `/openclaw`). + +**Architecture:** New `cmd/swarm-monitor/main.go` polls via `docker inspect` exec commands and HTTP probes, emitting two event types per poll. The existing NATS → event-processor → postgres → query-api pipeline requires zero changes. Frontend adds a swarm strip to the dashboard and merges VM cards + service cards on a renamed `/infrastructure` page. + +**Tech Stack:** Go (exec/docker CLI, net/http), vanilla JS, existing NATS publisher pattern + +--- + +### Task 1: Add agentmon labels to docker-compose.yaml + +**Files:** +- Modify: `/home/will/lab/swarm/docker-compose.yaml` + +**Step 1: Add labels to each service** + +Add a `labels:` block to each monitored service. `litellm-init` is a one-shot container — do NOT label it. + +For `whisper-server` (after its `healthcheck:` block): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "voice" + agentmon.port: "18801" +``` + +For `kokoro-tts` (after `restart: unless-stopped`): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "voice" + agentmon.port: "18805" +``` + +For `brave-search` (after its `environment:` block): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "mcp" + agentmon.port: "18802" +``` + +For `searxng` (after its `volumes:` block): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "search" + agentmon.port: "18803" +``` + +For `litellm` (after its `healthcheck:` block): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "llm-proxy" + agentmon.port: "18804" +``` + +For `litellm-db` (after its `healthcheck:` block): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "db" +``` + +For `n8n-agent` (after its `healthcheck:` block): +```yaml + labels: + agentmon.monitor: "true" + agentmon.role: "automation" + agentmon.port: "18808" +``` + +**Step 2: Verify labels appear in running containers** + +Run: `docker ps --filter label=agentmon.monitor=true --format "table {{.Names}}\t{{.Status}}"` + +Expected: lists currently-running swarm containers (whichever profiles are active). + +**Step 3: Commit** + +```bash +cd /home/will/lab/swarm +git add docker-compose.yaml +git commit -m "feat: add agentmon monitor labels to swarm services" +``` + +--- + +### Task 2: Create swarm types + +**Files:** +- Create: `internal/monitor/swarm/types.go` + +**Step 1: Create the types file** + +```go +package swarm + +import "time" + +// ServiceSnapshot holds the collected state for one docker-compose service. +type ServiceSnapshot struct { + Name string `json:"name"` + Role string `json:"role"` + ContainerState string `json:"container_state"` // running/stopped/exited/missing + HealthState string `json:"health_state"` // healthy/unhealthy/starting/none + Status string `json:"status"` // healthy/degraded/down + UptimeSec int64 `json:"uptime_sec,omitempty"` + HTTPStatus *int `json:"http_status,omitempty"` + Extra map[string]any `json:"extra,omitempty"` +} + +// SwarmSnapshot holds a rolled-up snapshot of all labeled services. +type SwarmSnapshot struct { + Services []ServiceSnapshot `json:"services"` + Issues Issues `json:"issues"` + Timestamp time.Time `json:"timestamp"` +} + +// Issues flags notable problems detected during a poll. +type Issues struct { + ServiceDown []string `json:"service_down,omitempty"` + ServiceDegraded []string `json:"service_degraded,omitempty"` + LLMCooldowns bool `json:"llm_cooldowns,omitempty"` +} +``` + +**Step 2: Verify it compiles** + +Run: `cd /home/will/lab/agentmon && go build ./internal/monitor/swarm/` +Expected: no errors + +**Step 3: Commit** + +```bash +git add internal/monitor/swarm/types.go +git commit -m "feat: add swarm monitor types" +``` + +--- + +### Task 3: Create swarm collector + +**Files:** +- Create: `internal/monitor/swarm/collector.go` + +**Step 1: Create the collector** + +```go +package swarm + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os/exec" + "strconv" + "strings" + "time" +) + +// Config holds collector configuration. +type Config struct { + LiteLLMBaseURL string + LiteLLMAPIKey string + HTTPTimeout time.Duration +} + +// dockerPsEntry is the JSON shape from `docker ps --format '{{json .}}'`. +type dockerPsEntry struct { + ID string `json:"ID"` + Names string `json:"Names"` + Status string `json:"Status"` + State string `json:"State"` +} + +// dockerInspectEntry is the minimal shape we need from `docker inspect`. +type dockerInspectEntry struct { + Name string `json:"Name"` + State struct { + Status string `json:"Status"` + Running bool `json:"Running"` + StartedAt string `json:"StartedAt"` + Health *struct { + Status string `json:"Status"` + } `json:"Health"` + } `json:"State"` + Config struct { + Labels map[string]string `json:"Labels"` + } `json:"Config"` +} + +// CollectAll lists all containers labeled agentmon.monitor=true and collects +// a ServiceSnapshot for each. +func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) { + // List labeled containers (running + stopped). + out, err := exec.CommandContext(ctx, "docker", "ps", "-a", + "--filter", "label=agentmon.monitor=true", + "--format", "{{json .}}", + ).Output() + if err != nil { + return nil, fmt.Errorf("docker ps failed: %w", err) + } + + var entries []dockerPsEntry + for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { + if line == "" { + continue + } + var e dockerPsEntry + if err := json.Unmarshal([]byte(line), &e); err != nil { + continue + } + entries = append(entries, e) + } + + client := &http.Client{Timeout: cfg.HTTPTimeout} + var snapshots []ServiceSnapshot + for _, e := range entries { + snap := collectOne(ctx, e.Names, client, cfg) + snapshots = append(snapshots, snap) + } + + return snapshots, nil +} + +func collectOne(ctx context.Context, name string, client *http.Client, cfg Config) ServiceSnapshot { + snap := ServiceSnapshot{ + Name: name, + ContainerState: "missing", + HealthState: "none", + Status: "down", + } + + // Inspect for detailed state. + out, err := exec.CommandContext(ctx, "docker", "inspect", "--format", "{{json .}}", name).Output() + if err != nil { + return snap + } + + var detail dockerInspectEntry + if err := json.Unmarshal(out, &detail); err != nil { + return snap + } + + snap.Role = detail.Config.Labels["agentmon.role"] + snap.ContainerState = detail.State.Status + + if detail.State.Health != nil { + snap.HealthState = detail.State.Health.Status + } + + // Calculate uptime if running. + if detail.State.Running && detail.State.StartedAt != "" { + if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil { + snap.UptimeSec = int64(time.Since(t).Seconds()) + } + } + + // Role-specific probes. + switch snap.Role { + case "llm-proxy": + collectLLMProxy(ctx, &snap, client, cfg) + case "search": + collectHTTPProbe(ctx, &snap, client, "http://localhost:"+detail.Config.Labels["agentmon.port"]+"/") + case "mcp": + collectPortProbe(ctx, &snap, detail.Config.Labels["agentmon.port"]) + case "db", "voice", "automation": + // Docker healthcheck state is sufficient; no HTTP probe. + } + + snap.Status = deriveStatus(snap) + return snap +} + +func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) { + if snap.Extra == nil { + snap.Extra = make(map[string]any) + } + + // Health probe. + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil) + resp, err := client.Do(req) + if err == nil { + code := resp.StatusCode + snap.HTTPStatus = &code + resp.Body.Close() + } + + // Model count. + if cfg.LiteLLMAPIKey != "" { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil) + req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey) + resp, err := client.Do(req) + if err == nil { + defer resp.Body.Close() + var result struct { + Data []struct { + ModelName string `json:"model_name"` + } `json:"data"` + } + if json.NewDecoder(resp.Body).Decode(&result) == nil { + snap.Extra["model_count"] = len(result.Data) + } + } + } +} + +func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, url string) { + start := time.Now() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := client.Do(req) + if err == nil { + code := resp.StatusCode + snap.HTTPStatus = &code + resp.Body.Close() + ms := time.Since(start).Milliseconds() + if snap.Extra == nil { + snap.Extra = make(map[string]any) + } + snap.Extra["response_ms"] = ms + } +} + +func collectPortProbe(ctx context.Context, snap *ServiceSnapshot, port string) { + if port == "" { + return + } + // Use nc to check TCP reachability. + err := exec.CommandContext(ctx, "nc", "-z", "-w1", "localhost", port).Run() + reachable := err == nil + if snap.Extra == nil { + snap.Extra = make(map[string]any) + } + snap.Extra["port_reachable"] = reachable +} + +// deriveStatus computes the overall status from container state + health + probes. +func deriveStatus(snap ServiceSnapshot) string { + if snap.ContainerState != "running" { + return "down" + } + if snap.HealthState == "unhealthy" { + return "degraded" + } + if snap.HTTPStatus != nil && (*snap.HTTPStatus < 200 || *snap.HTTPStatus >= 400) { + return "degraded" + } + if reachable, ok := snap.Extra["port_reachable"].(bool); ok && !reachable { + return "degraded" + } + return "healthy" +} + +// DetectIssues scans a set of snapshots for notable problems. +func DetectIssues(services []ServiceSnapshot) Issues { + issues := Issues{} + for _, s := range services { + switch s.Status { + case "down": + issues.ServiceDown = append(issues.ServiceDown, s.Name) + case "degraded": + issues.ServiceDegraded = append(issues.ServiceDegraded, s.Name) + } + if s.Role == "llm-proxy" { + if extra := s.Extra; extra != nil { + if count, ok := extra["cooldown_count"].(int); ok && count > 0 { + issues.LLMCooldowns = true + } + } + } + } + return issues +} + +func intPtr(v int) *int { return &v } +func _ = intPtr // suppress unused warning +func _ = strconv.Itoa // imported for potential future use +``` + +**Step 2: Verify it compiles** + +Run: `cd /home/will/lab/agentmon && go build ./internal/monitor/swarm/` +Expected: no errors + +**Step 3: Commit** + +```bash +git add internal/monitor/swarm/collector.go +git commit -m "feat: add swarm collector with docker inspect + HTTP probes" +``` + +--- + +### Task 4: Create swarm-monitor binary + +**Files:** +- Create: `cmd/swarm-monitor/main.go` + +**Step 1: Create the binary** + +```go +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "time" + + "agentmon/internal/monitor/swarm" + qnats "agentmon/internal/queue/nats" +) + +func main() { + natsURL := envDefault("NATS_URL", "nats://nats:4222") + natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") + interval := envDefault("POLL_INTERVAL", "30s") + litellmBase := envDefault("LITELLM_BASE_URL", "http://localhost:18804") + litellmKey := os.Getenv("LITELLM_MASTER_KEY") + + pub, err := qnats.NewPublisher(natsURL, natsTopic) + if err != nil { + log.Fatalf("failed to connect to NATS: %v", err) + } + defer pub.Close() + + pollDuration, err := time.ParseDuration(interval) + if err != nil { + log.Fatalf("invalid poll interval: %v", err) + } + + cfg := swarm.Config{ + LiteLLMBaseURL: litellmBase, + LiteLLMAPIKey: litellmKey, + HTTPTimeout: 5 * time.Second, + } + + ticker := time.NewTicker(pollDuration) + defer ticker.Stop() + + ctx := context.Background() + log.Printf("swarm-monitor started, polling every %s", pollDuration) + + // Poll immediately on start. + if err := poll(ctx, pub, cfg); err != nil { + log.Printf("initial poll error: %v", err) + } + + for range ticker.C { + if err := poll(ctx, pub, cfg); err != nil { + log.Printf("poll error: %v", err) + } + } +} + +func poll(ctx context.Context, pub *qnats.Publisher, cfg swarm.Config) error { + services, err := swarm.CollectAll(ctx, cfg) + if err != nil { + return err + } + + issues := swarm.DetectIssues(services) + now := time.Now().UTC() + + // Emit rolled-up swarm.snapshot. + if err := emit(ctx, pub, "swarm.snapshot", "agentmon.swarm", map[string]any{ + "services": services, + "issues": issues, + }, now); err != nil { + log.Printf("failed to emit swarm.snapshot: %v", err) + } + + // Emit one swarm.service.snapshot per service. + for _, svc := range services { + if err := emit(ctx, pub, "swarm.service.snapshot", "agentmon.swarm.service", map[string]any{ + "service": svc, + }, now); err != nil { + log.Printf("failed to emit swarm.service.snapshot for %s: %v", svc.Name, err) + } + } + + return nil +} + +func emit(ctx context.Context, pub *qnats.Publisher, eventType, schemaName string, payload map[string]any, ts time.Time) error { + event := map[string]any{ + "schema": map[string]any{ + "name": schemaName, + "version": 1, + }, + "event": map[string]any{ + "id": generateID(), + "type": eventType, + "ts": ts.Format(time.RFC3339Nano), + }, + "payload": payload, + } + + data, err := json.Marshal(event) + if err != nil { + return err + } + + return pub.Publish(ctx, data) +} + +func generateID() string { + return time.Now().Format("20060102150405") + "-" + randomString(8) +} + +func randomString(n int) string { + const chars = "abcdefghijklmnopqrstuvwxyz0123456789" + b := make([]byte, n) + for i := range b { + b[i] = chars[time.Now().Nanosecond()%len(chars)] + time.Sleep(time.Nanosecond) + } + return string(b) +} + +func envDefault(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} +``` + +**Step 2: Verify it compiles** + +Run: `cd /home/will/lab/agentmon && go build ./cmd/swarm-monitor/` +Expected: no errors + +**Step 3: Verify all binaries still build** + +Run: `cd /home/will/lab/agentmon && go build ./...` +Expected: no errors + +**Step 4: Commit** + +```bash +git add cmd/swarm-monitor/main.go +git commit -m "feat: add swarm-monitor binary" +``` + +--- + +### Task 5: Dashboard swarm strip + +**Files:** +- Modify: `cmd/web-ui/static/app.js` +- Modify: `cmd/web-ui/static/style.css` + +**Step 1: Add swarmState and merge function to app.js** + +Near the top of the IIFE, alongside the existing `let openclawState = ...` declaration (line ~49), add: + +```js +let swarmState = { services: {} }; // keyed by service name +``` + +After the existing `mergeOpenClawEvents` function (~line 716), add: + +```js +function mergeSwarmSnapshot(evt) { + const payload = getEnvelopePayload(evt); + const services = payload.services || []; + for (const svc of services) { + if (svc.name) swarmState.services[svc.name] = svc; + } +} + +function mergeSwarmServiceSnapshot(evt) { + const payload = getEnvelopePayload(evt); + const svc = payload.service; + if (svc && svc.name) swarmState.services[svc.name] = svc; +} +``` + +**Step 2: Add swarm strip to renderDashboard** + +In `renderDashboard()`, the HTML template already has: +```html +
+``` + +Right after that line, add a swarm strip div: +```html + +``` + +**Step 3: Add renderSwarmStrip function** + +After the `renderAgentVMStrip_dash` function (~line 1351), add: + +```js +function renderSwarmStrip_dash() { + const strip = document.getElementById('dash-swarm-strip'); + if (!strip) return; + const services = Object.values(swarmState.services); + if (services.length === 0) return; + strip.innerHTML = services.map(svc => { + const statusClass = svc.status === 'healthy' ? 'active' + : svc.status === 'degraded' ? 'degraded' : 'inactive'; + const label = svc.status || 'unknown'; + return ` +Loading...
'; + + infraUnsubscribe = subscribeWS(handleInfraWS); + + try { + const [ocData, swarmData] = await Promise.all([ + api('/v1/events?event_type=openclaw.snapshot&limit=100'), + api('/v1/events?event_type=swarm.snapshot&limit=10').catch(() => ({ events: [] })), + ]); + + mergeOpenClawEvents(ocData.events || []); + for (const evt of swarmData.events || []) mergeSwarmSnapshot(evt); + + if (isCurrentPath('/infrastructure')) { + renderInfraGrid(); + } + } catch (e) { + if (isCurrentPath('/infrastructure')) { + app.innerHTML = `Error: ${escapeHTML(e.message)}
`; + } + } +} +``` + +**Step 6: Replace handleOpenClawWS with handleInfraWS** + +Replace the existing `handleOpenClawWS` function (lines ~682-699) with: + +```js +function handleInfraWS(msg) { + if (msg.type !== 'message') return; + + const eventType = getEnvelopeType(msg.data); + + if (eventType === 'openclaw.snapshot') { + mergeOpenClawEvents([msg.data]); + if (isCurrentPath('/infrastructure')) renderInfraGrid(); + if (isCurrentPath('/agents')) renderAgentVMStrip(); + return; + } + + if (eventType === 'swarm.snapshot') { + mergeSwarmSnapshot(msg.data); + if (isCurrentPath('/infrastructure')) renderInfraGrid(); + renderSwarmStrip_dash(); + return; + } + + if (eventType === 'swarm.service.snapshot') { + mergeSwarmServiceSnapshot(msg.data); + if (isCurrentPath('/infrastructure')) renderInfraGrid(); + renderSwarmStrip_dash(); + return; + } +} +``` + +**Step 7: Add renderInfraGrid function** + +Replace the existing `renderOpenClawGrid` function (lines ~718-785) with a new `renderInfraGrid` that shows both VMs and service cards. Add it right after the new `handleInfraWS` function: + +```js +function renderInfraGrid() { + const vmNames = Object.keys(openclawState.instances).sort(); + const services = Object.values(swarmState.services); + + app.innerHTML = ` +VMs
+ ${vmNames.length === 0 + ? 'No VM data
' + : `Services
+ ${services.length === 0 + ? 'No swarm service data
' + : `| Host | ${escapeHTML(inst.host || '-')} |
| Domain | ${escapeHTML(inst.domain || '-')} |
| vCPUs | ${host.vcpus || '-'} |
| Memory | ${escapeHTML(formatBytes(host.memory_kib ? host.memory_kib * 1024 : 0) || '-')} |
| Disk | ${escapeHTML(formatBytes(host.disk_actual_bytes) || '-')} |
| Autostart | ${host.autostart ? 'Yes' : 'No'} |
| Gateway | ${guest.service_active ? 'Active' : 'Inactive'} |
| HTTP | ${guest.http_status || 'N/A'} |
| Version | ${escapeHTML(guest.version || '-')} |
| Guest Mem | ${guest.memory_percent !== undefined ? guest.memory_percent.toFixed(1) : '-'}% |
| Guest Disk | ${guest.disk_percent !== undefined ? guest.disk_percent.toFixed(1) : '-'}% |
| Load | ${guest.load_average !== undefined ? guest.load_average.toFixed(2) : '-'} |
| Uptime | ${escapeHTML(guest.service_uptime || '-')} |