package swarm import ( "context" "encoding/json" "fmt" "net/http" "os/exec" "strings" "time" ) // Config holds collector configuration. type Config struct { LiteLLMBaseURL string LiteLLMAPIKey string HTTPTimeout time.Duration } // dockerPsEntry is the JSON shape from `docker ps --format '{{json .}}'`. type dockerPsEntry struct { ID string `json:"ID"` Names string `json:"Names"` Status string `json:"Status"` State string `json:"State"` } // dockerInspectEntry is the minimal shape we need from `docker inspect`. type dockerInspectEntry struct { Name string `json:"Name"` State struct { Status string `json:"Status"` Running bool `json:"Running"` StartedAt string `json:"StartedAt"` Health *struct { Status string `json:"Status"` } `json:"Health"` } `json:"State"` Config struct { Labels map[string]string `json:"Labels"` } `json:"Config"` } // CollectAll lists all containers labeled agentmon.monitor=true and collects // a ServiceSnapshot for each. func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) { // List labeled containers (running + stopped). out, err := exec.CommandContext(ctx, "docker", "ps", "-a", "--filter", "label=agentmon.monitor=true", "--format", "{{json .}}", ).Output() if err != nil { return nil, fmt.Errorf("docker ps failed: %w", err) } var entries []dockerPsEntry for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { if line == "" { continue } var e dockerPsEntry if err := json.Unmarshal([]byte(line), &e); err != nil { continue } entries = append(entries, e) } client := &http.Client{Timeout: cfg.HTTPTimeout} var snapshots []ServiceSnapshot for _, e := range entries { snap := collectOne(ctx, e.Names, client, cfg) snapshots = append(snapshots, snap) } return snapshots, nil } func collectOne(ctx context.Context, name string, client *http.Client, cfg Config) ServiceSnapshot { snap := ServiceSnapshot{ Name: name, ContainerState: "missing", HealthState: "none", Status: "down", } // Inspect for detailed state. out, err := exec.CommandContext(ctx, "docker", "inspect", "--format", "{{json .}}", name).Output() if err != nil { return snap } var detail dockerInspectEntry if err := json.Unmarshal(out, &detail); err != nil { return snap } snap.Role = detail.Config.Labels["agentmon.role"] snap.ContainerState = detail.State.Status if detail.State.Health != nil { snap.HealthState = detail.State.Health.Status } // Calculate uptime if running. if detail.State.Running && detail.State.StartedAt != "" { if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil { snap.UptimeSec = int64(time.Since(t).Seconds()) } } // Role-specific probes. switch snap.Role { case "llm-proxy": collectLLMProxy(ctx, &snap, client, cfg) case "search": collectHTTPProbe(ctx, &snap, client, "http://localhost:"+detail.Config.Labels["agentmon.port"]+"/") case "mcp": collectPortProbe(ctx, &snap, detail.Config.Labels["agentmon.port"]) case "db", "voice", "automation": // Docker healthcheck state is sufficient; no HTTP probe. } snap.Status = deriveStatus(snap) return snap } func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) { if snap.Extra == nil { snap.Extra = make(map[string]any) } // Health probe. req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil) resp, err := client.Do(req) if err == nil { code := resp.StatusCode snap.HTTPStatus = &code resp.Body.Close() } // Model count. if cfg.LiteLLMAPIKey != "" { req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil) req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey) resp, err := client.Do(req) if err == nil { defer resp.Body.Close() var result struct { Data []struct { ModelName string `json:"model_name"` } `json:"data"` } if json.NewDecoder(resp.Body).Decode(&result) == nil { snap.Extra["model_count"] = len(result.Data) } } } } func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, url string) { start := time.Now() req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) resp, err := client.Do(req) if err == nil { code := resp.StatusCode snap.HTTPStatus = &code resp.Body.Close() ms := time.Since(start).Milliseconds() if snap.Extra == nil { snap.Extra = make(map[string]any) } snap.Extra["response_ms"] = ms } } func collectPortProbe(ctx context.Context, snap *ServiceSnapshot, port string) { if port == "" { return } // Use nc to check TCP reachability. err := exec.CommandContext(ctx, "nc", "-z", "-w1", "localhost", port).Run() reachable := err == nil if snap.Extra == nil { snap.Extra = make(map[string]any) } snap.Extra["port_reachable"] = reachable } // deriveStatus computes the overall status from container state + health + probes. func deriveStatus(snap ServiceSnapshot) string { if snap.ContainerState != "running" { return "down" } if snap.HealthState == "unhealthy" { return "degraded" } if snap.HTTPStatus != nil && (*snap.HTTPStatus < 200 || *snap.HTTPStatus >= 400) { return "degraded" } if reachable, ok := snap.Extra["port_reachable"].(bool); ok && !reachable { return "degraded" } return "healthy" } // DetectIssues scans a set of snapshots for notable problems. func DetectIssues(services []ServiceSnapshot) Issues { issues := Issues{} for _, s := range services { switch s.Status { case "down": issues.ServiceDown = append(issues.ServiceDown, s.Name) case "degraded": issues.ServiceDegraded = append(issues.ServiceDegraded, s.Name) } if s.Role == "llm-proxy" { if extra := s.Extra; extra != nil { if count, ok := extra["cooldown_count"].(int); ok && count > 0 { issues.LLMCooldowns = true } } } } return issues }