From 9c2f048b92c93363393fade41821e61c20a733ee Mon Sep 17 00:00:00 2001 From: William Valentin Date: Wed, 18 Mar 2026 10:10:34 -0700 Subject: [PATCH] feat: add swarm collector with docker inspect + HTTP probes --- internal/monitor/swarm/collector.go | 225 ++++++++++++++++++++++++++++ 1 file changed, 225 insertions(+) create mode 100644 internal/monitor/swarm/collector.go diff --git a/internal/monitor/swarm/collector.go b/internal/monitor/swarm/collector.go new file mode 100644 index 0000000..167cd41 --- /dev/null +++ b/internal/monitor/swarm/collector.go @@ -0,0 +1,225 @@ +package swarm + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "os/exec" + "strings" + "time" +) + +// Config holds collector configuration. +type Config struct { + LiteLLMBaseURL string + LiteLLMAPIKey string + HTTPTimeout time.Duration +} + +// dockerPsEntry is the JSON shape from `docker ps --format '{{json .}}'`. +type dockerPsEntry struct { + ID string `json:"ID"` + Names string `json:"Names"` + Status string `json:"Status"` + State string `json:"State"` +} + +// dockerInspectEntry is the minimal shape we need from `docker inspect`. +type dockerInspectEntry struct { + Name string `json:"Name"` + State struct { + Status string `json:"Status"` + Running bool `json:"Running"` + StartedAt string `json:"StartedAt"` + Health *struct { + Status string `json:"Status"` + } `json:"Health"` + } `json:"State"` + Config struct { + Labels map[string]string `json:"Labels"` + } `json:"Config"` +} + +// CollectAll lists all containers labeled agentmon.monitor=true and collects +// a ServiceSnapshot for each. +func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) { + // List labeled containers (running + stopped). + out, err := exec.CommandContext(ctx, "docker", "ps", "-a", + "--filter", "label=agentmon.monitor=true", + "--format", "{{json .}}", + ).Output() + if err != nil { + return nil, fmt.Errorf("docker ps failed: %w", err) + } + + var entries []dockerPsEntry + for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { + if line == "" { + continue + } + var e dockerPsEntry + if err := json.Unmarshal([]byte(line), &e); err != nil { + continue + } + entries = append(entries, e) + } + + client := &http.Client{Timeout: cfg.HTTPTimeout} + var snapshots []ServiceSnapshot + for _, e := range entries { + snap := collectOne(ctx, e.Names, client, cfg) + snapshots = append(snapshots, snap) + } + + return snapshots, nil +} + +func collectOne(ctx context.Context, name string, client *http.Client, cfg Config) ServiceSnapshot { + snap := ServiceSnapshot{ + Name: name, + ContainerState: "missing", + HealthState: "none", + Status: "down", + } + + // Inspect for detailed state. + out, err := exec.CommandContext(ctx, "docker", "inspect", "--format", "{{json .}}", name).Output() + if err != nil { + return snap + } + + var detail dockerInspectEntry + if err := json.Unmarshal(out, &detail); err != nil { + return snap + } + + snap.Role = detail.Config.Labels["agentmon.role"] + snap.ContainerState = detail.State.Status + + if detail.State.Health != nil { + snap.HealthState = detail.State.Health.Status + } + + // Calculate uptime if running. + if detail.State.Running && detail.State.StartedAt != "" { + if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil { + snap.UptimeSec = int64(time.Since(t).Seconds()) + } + } + + // Role-specific probes. + switch snap.Role { + case "llm-proxy": + collectLLMProxy(ctx, &snap, client, cfg) + case "search": + collectHTTPProbe(ctx, &snap, client, "http://localhost:"+detail.Config.Labels["agentmon.port"]+"/") + case "mcp": + collectPortProbe(ctx, &snap, detail.Config.Labels["agentmon.port"]) + case "db", "voice", "automation": + // Docker healthcheck state is sufficient; no HTTP probe. + } + + snap.Status = deriveStatus(snap) + return snap +} + +func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) { + if snap.Extra == nil { + snap.Extra = make(map[string]any) + } + + // Health probe. + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil) + resp, err := client.Do(req) + if err == nil { + code := resp.StatusCode + snap.HTTPStatus = &code + resp.Body.Close() + } + + // Model count. + if cfg.LiteLLMAPIKey != "" { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil) + req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey) + resp, err := client.Do(req) + if err == nil { + defer resp.Body.Close() + var result struct { + Data []struct { + ModelName string `json:"model_name"` + } `json:"data"` + } + if json.NewDecoder(resp.Body).Decode(&result) == nil { + snap.Extra["model_count"] = len(result.Data) + } + } + } +} + +func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, url string) { + start := time.Now() + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + resp, err := client.Do(req) + if err == nil { + code := resp.StatusCode + snap.HTTPStatus = &code + resp.Body.Close() + ms := time.Since(start).Milliseconds() + if snap.Extra == nil { + snap.Extra = make(map[string]any) + } + snap.Extra["response_ms"] = ms + } +} + +func collectPortProbe(ctx context.Context, snap *ServiceSnapshot, port string) { + if port == "" { + return + } + // Use nc to check TCP reachability. + err := exec.CommandContext(ctx, "nc", "-z", "-w1", "localhost", port).Run() + reachable := err == nil + if snap.Extra == nil { + snap.Extra = make(map[string]any) + } + snap.Extra["port_reachable"] = reachable +} + +// deriveStatus computes the overall status from container state + health + probes. +func deriveStatus(snap ServiceSnapshot) string { + if snap.ContainerState != "running" { + return "down" + } + if snap.HealthState == "unhealthy" { + return "degraded" + } + if snap.HTTPStatus != nil && (*snap.HTTPStatus < 200 || *snap.HTTPStatus >= 400) { + return "degraded" + } + if reachable, ok := snap.Extra["port_reachable"].(bool); ok && !reachable { + return "degraded" + } + return "healthy" +} + +// DetectIssues scans a set of snapshots for notable problems. +func DetectIssues(services []ServiceSnapshot) Issues { + issues := Issues{} + for _, s := range services { + switch s.Status { + case "down": + issues.ServiceDown = append(issues.ServiceDown, s.Name) + case "degraded": + issues.ServiceDegraded = append(issues.ServiceDegraded, s.Name) + } + if s.Role == "llm-proxy" { + if extra := s.Extra; extra != nil { + if count, ok := extra["cooldown_count"].(int); ok && count > 0 { + issues.LLMCooldowns = true + } + } + } + } + return issues +}