package swarm import ( "context" "encoding/json" "fmt" "net" "net/http" "net/url" "strings" "time" ) // Config holds collector configuration. type Config struct { LiteLLMBaseURL string LiteLLMAPIKey string HTTPTimeout time.Duration DockerSocket string // defaults to /var/run/docker.sock } // dockerContainer is the shape returned by GET /containers/json. type dockerContainer struct { ID string `json:"Id"` Names []string `json:"Names"` State string `json:"State"` Labels map[string]string `json:"Labels"` } // dockerContainerDetail is the shape returned by GET /containers/{id}/json. type dockerContainerDetail struct { State struct { Status string `json:"Status"` Running bool `json:"Running"` StartedAt string `json:"StartedAt"` Health *struct { Status string `json:"Status"` } `json:"Health"` } `json:"State"` } func newDockerClient(socketPath string) *http.Client { if socketPath == "" { socketPath = "/var/run/docker.sock" } return &http.Client{ Transport: &http.Transport{ DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { return (&net.Dialer{}).DialContext(ctx, "unix", socketPath) }, }, } } // CollectAll lists all containers labeled agentmon.monitor=true and collects // a ServiceSnapshot for each. func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) { dockerClient := newDockerClient(cfg.DockerSocket) httpClient := &http.Client{Timeout: cfg.HTTPTimeout} filters := url.QueryEscape(`{"label":["agentmon.monitor=true"]}`) req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost/v1.41/containers/json?all=1&filters="+filters, nil) if err != nil { return nil, err } resp, err := dockerClient.Do(req) if err != nil { return nil, fmt.Errorf("docker API unavailable: %w", err) } defer resp.Body.Close() var containers []dockerContainer if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil { return nil, fmt.Errorf("docker API parse error: %w", err) } var snapshots []ServiceSnapshot for _, c := range containers { snapshots = append(snapshots, collectOne(ctx, c, dockerClient, httpClient, cfg)) } return snapshots, nil } func collectOne(ctx context.Context, c dockerContainer, dockerClient, httpClient *http.Client, cfg Config) ServiceSnapshot { name := containerName(c) snap := ServiceSnapshot{ Name: name, Role: c.Labels["agentmon.role"], ContainerState: c.State, HealthState: "none", Status: "down", } // Inspect for health state and uptime (not in list response). req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost/v1.41/containers/"+c.ID+"/json", nil) if err == nil { if resp, err := dockerClient.Do(req); err == nil { var detail dockerContainerDetail if json.NewDecoder(resp.Body).Decode(&detail) == nil { if detail.State.Health != nil { snap.HealthState = detail.State.Health.Status } if detail.State.Running && detail.State.StartedAt != "" { if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil { snap.UptimeSec = int64(time.Since(t).Seconds()) } } } resp.Body.Close() } } port := c.Labels["agentmon.port"] switch snap.Role { case "llm-proxy": collectLLMProxy(ctx, &snap, httpClient, cfg) case "search": collectHTTPProbe(ctx, &snap, httpClient, "http://localhost:"+port+"/") case "mcp": collectPortProbe(&snap, port) } snap.Status = deriveStatus(snap) return snap } func containerName(c dockerContainer) string { if len(c.Names) > 0 { return strings.TrimPrefix(c.Names[0], "/") } return c.ID[:12] } func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) { if snap.Extra == nil { snap.Extra = make(map[string]any) } req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil) if resp, err := client.Do(req); err == nil { code := resp.StatusCode snap.HTTPStatus = &code resp.Body.Close() } if cfg.LiteLLMAPIKey != "" { req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil) req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey) if resp, err := client.Do(req); err == nil { defer resp.Body.Close() var result struct { Data []struct{} `json:"data"` } if json.NewDecoder(resp.Body).Decode(&result) == nil { snap.Extra["model_count"] = len(result.Data) } } } } func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, target string) { start := time.Now() req, _ := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) if resp, err := client.Do(req); err == nil { code := resp.StatusCode snap.HTTPStatus = &code resp.Body.Close() if snap.Extra == nil { snap.Extra = make(map[string]any) } snap.Extra["response_ms"] = time.Since(start).Milliseconds() } } func collectPortProbe(snap *ServiceSnapshot, port string) { if port == "" { return } conn, err := net.DialTimeout("tcp", "localhost:"+port, 2*time.Second) reachable := err == nil if conn != nil { conn.Close() } if snap.Extra == nil { snap.Extra = make(map[string]any) } snap.Extra["port_reachable"] = reachable } // deriveStatus computes the overall status from container state + health + probes. func deriveStatus(snap ServiceSnapshot) string { if snap.ContainerState != "running" { return "down" } if snap.HealthState == "unhealthy" { return "degraded" } if snap.HTTPStatus != nil && (*snap.HTTPStatus < 200 || *snap.HTTPStatus >= 400) { return "degraded" } if reachable, ok := snap.Extra["port_reachable"].(bool); ok && !reachable { return "degraded" } return "healthy" } // DetectIssues scans a set of snapshots for notable problems. func DetectIssues(services []ServiceSnapshot) Issues { issues := Issues{} for _, s := range services { switch s.Status { case "down": issues.ServiceDown = append(issues.ServiceDown, s.Name) case "degraded": issues.ServiceDegraded = append(issues.ServiceDegraded, s.Name) } if s.Role == "llm-proxy" { if extra := s.Extra; extra != nil { if count, ok := extra["cooldown_count"].(int); ok && count > 0 { issues.LLMCooldowns = true } } } } return issues }