Files
agentmon/internal/monitor/swarm/collector.go
T
2026-03-18 10:10:34 -07:00

226 lines
5.9 KiB
Go

package swarm
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os/exec"
"strings"
"time"
)
// Config holds collector configuration.
type Config struct {
LiteLLMBaseURL string
LiteLLMAPIKey string
HTTPTimeout time.Duration
}
// dockerPsEntry is the JSON shape from `docker ps --format '{{json .}}'`.
type dockerPsEntry struct {
ID string `json:"ID"`
Names string `json:"Names"`
Status string `json:"Status"`
State string `json:"State"`
}
// dockerInspectEntry is the minimal shape we need from `docker inspect`.
type dockerInspectEntry struct {
Name string `json:"Name"`
State struct {
Status string `json:"Status"`
Running bool `json:"Running"`
StartedAt string `json:"StartedAt"`
Health *struct {
Status string `json:"Status"`
} `json:"Health"`
} `json:"State"`
Config struct {
Labels map[string]string `json:"Labels"`
} `json:"Config"`
}
// CollectAll lists all containers labeled agentmon.monitor=true and collects
// a ServiceSnapshot for each.
func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) {
// List labeled containers (running + stopped).
out, err := exec.CommandContext(ctx, "docker", "ps", "-a",
"--filter", "label=agentmon.monitor=true",
"--format", "{{json .}}",
).Output()
if err != nil {
return nil, fmt.Errorf("docker ps failed: %w", err)
}
var entries []dockerPsEntry
for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") {
if line == "" {
continue
}
var e dockerPsEntry
if err := json.Unmarshal([]byte(line), &e); err != nil {
continue
}
entries = append(entries, e)
}
client := &http.Client{Timeout: cfg.HTTPTimeout}
var snapshots []ServiceSnapshot
for _, e := range entries {
snap := collectOne(ctx, e.Names, client, cfg)
snapshots = append(snapshots, snap)
}
return snapshots, nil
}
func collectOne(ctx context.Context, name string, client *http.Client, cfg Config) ServiceSnapshot {
snap := ServiceSnapshot{
Name: name,
ContainerState: "missing",
HealthState: "none",
Status: "down",
}
// Inspect for detailed state.
out, err := exec.CommandContext(ctx, "docker", "inspect", "--format", "{{json .}}", name).Output()
if err != nil {
return snap
}
var detail dockerInspectEntry
if err := json.Unmarshal(out, &detail); err != nil {
return snap
}
snap.Role = detail.Config.Labels["agentmon.role"]
snap.ContainerState = detail.State.Status
if detail.State.Health != nil {
snap.HealthState = detail.State.Health.Status
}
// Calculate uptime if running.
if detail.State.Running && detail.State.StartedAt != "" {
if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil {
snap.UptimeSec = int64(time.Since(t).Seconds())
}
}
// Role-specific probes.
switch snap.Role {
case "llm-proxy":
collectLLMProxy(ctx, &snap, client, cfg)
case "search":
collectHTTPProbe(ctx, &snap, client, "http://localhost:"+detail.Config.Labels["agentmon.port"]+"/")
case "mcp":
collectPortProbe(ctx, &snap, detail.Config.Labels["agentmon.port"])
case "db", "voice", "automation":
// Docker healthcheck state is sufficient; no HTTP probe.
}
snap.Status = deriveStatus(snap)
return snap
}
func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) {
if snap.Extra == nil {
snap.Extra = make(map[string]any)
}
// Health probe.
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil)
resp, err := client.Do(req)
if err == nil {
code := resp.StatusCode
snap.HTTPStatus = &code
resp.Body.Close()
}
// Model count.
if cfg.LiteLLMAPIKey != "" {
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil)
req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey)
resp, err := client.Do(req)
if err == nil {
defer resp.Body.Close()
var result struct {
Data []struct {
ModelName string `json:"model_name"`
} `json:"data"`
}
if json.NewDecoder(resp.Body).Decode(&result) == nil {
snap.Extra["model_count"] = len(result.Data)
}
}
}
}
func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, url string) {
start := time.Now()
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
resp, err := client.Do(req)
if err == nil {
code := resp.StatusCode
snap.HTTPStatus = &code
resp.Body.Close()
ms := time.Since(start).Milliseconds()
if snap.Extra == nil {
snap.Extra = make(map[string]any)
}
snap.Extra["response_ms"] = ms
}
}
func collectPortProbe(ctx context.Context, snap *ServiceSnapshot, port string) {
if port == "" {
return
}
// Use nc to check TCP reachability.
err := exec.CommandContext(ctx, "nc", "-z", "-w1", "localhost", port).Run()
reachable := err == nil
if snap.Extra == nil {
snap.Extra = make(map[string]any)
}
snap.Extra["port_reachable"] = reachable
}
// deriveStatus computes the overall status from container state + health + probes.
func deriveStatus(snap ServiceSnapshot) string {
if snap.ContainerState != "running" {
return "down"
}
if snap.HealthState == "unhealthy" {
return "degraded"
}
if snap.HTTPStatus != nil && (*snap.HTTPStatus < 200 || *snap.HTTPStatus >= 400) {
return "degraded"
}
if reachable, ok := snap.Extra["port_reachable"].(bool); ok && !reachable {
return "degraded"
}
return "healthy"
}
// DetectIssues scans a set of snapshots for notable problems.
func DetectIssues(services []ServiceSnapshot) Issues {
issues := Issues{}
for _, s := range services {
switch s.Status {
case "down":
issues.ServiceDown = append(issues.ServiceDown, s.Name)
case "degraded":
issues.ServiceDegraded = append(issues.ServiceDegraded, s.Name)
}
if s.Role == "llm-proxy" {
if extra := s.Extra; extra != nil {
if count, ok := extra["cooldown_count"].(int); ok && count > 0 {
issues.LLMCooldowns = true
}
}
}
}
return issues
}