Files
agentmon/internal/monitor/swarm/collector.go
T
William Valentin f8ddea3698 feat: add agentmon services section to infrastructure page
Label all agentmon docker-compose services with agentmon.monitor=true
and agentmon.group=agentmon so the swarm-monitor picks them up.
Adds Group field to ServiceSnapshot, probes /healthz for api/web roles,
and renders a separate "Agentmon" section below Swarm Services on the
Infrastructure page with new api and worker card renderers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 13:41:26 -07:00

235 lines
6.4 KiB
Go

package swarm
import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strings"
"time"
)
// Config holds collector configuration.
type Config struct {
LiteLLMBaseURL string
LiteLLMAPIKey string
HTTPTimeout time.Duration
DockerSocket string // defaults to /var/run/docker.sock
}
// dockerContainer is the shape returned by GET /containers/json.
type dockerContainer struct {
ID string `json:"Id"`
Names []string `json:"Names"`
State string `json:"State"`
Labels map[string]string `json:"Labels"`
}
// dockerContainerDetail is the shape returned by GET /containers/{id}/json.
type dockerContainerDetail struct {
State struct {
Status string `json:"Status"`
Running bool `json:"Running"`
StartedAt string `json:"StartedAt"`
Health *struct {
Status string `json:"Status"`
} `json:"Health"`
} `json:"State"`
}
func newDockerClient(socketPath string) *http.Client {
if socketPath == "" {
socketPath = "/var/run/docker.sock"
}
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
},
},
}
}
// CollectAll lists all containers labeled agentmon.monitor=true and collects
// a ServiceSnapshot for each.
func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) {
dockerClient := newDockerClient(cfg.DockerSocket)
httpClient := &http.Client{Timeout: cfg.HTTPTimeout}
filters := url.QueryEscape(`{"label":["agentmon.monitor=true"]}`)
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
"http://localhost/v1.41/containers/json?all=1&filters="+filters, nil)
if err != nil {
return nil, err
}
resp, err := dockerClient.Do(req)
if err != nil {
return nil, fmt.Errorf("docker API unavailable: %w", err)
}
defer resp.Body.Close()
var containers []dockerContainer
if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil {
return nil, fmt.Errorf("docker API parse error: %w", err)
}
var snapshots []ServiceSnapshot
for _, c := range containers {
snapshots = append(snapshots, collectOne(ctx, c, dockerClient, httpClient, cfg))
}
return snapshots, nil
}
func collectOne(ctx context.Context, c dockerContainer, dockerClient, httpClient *http.Client, cfg Config) ServiceSnapshot {
name := containerName(c)
snap := ServiceSnapshot{
Name: name,
Group: c.Labels["agentmon.group"],
Role: c.Labels["agentmon.role"],
ContainerState: c.State,
HealthState: "none",
Status: "down",
}
// Inspect for health state and uptime (not in list response).
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
"http://localhost/v1.41/containers/"+c.ID+"/json", nil)
if err == nil {
if resp, err := dockerClient.Do(req); err == nil {
var detail dockerContainerDetail
if json.NewDecoder(resp.Body).Decode(&detail) == nil {
if detail.State.Health != nil {
snap.HealthState = detail.State.Health.Status
}
if detail.State.Running && detail.State.StartedAt != "" {
if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil {
snap.UptimeSec = int64(time.Since(t).Seconds())
}
}
}
resp.Body.Close()
}
}
port := c.Labels["agentmon.port"]
switch snap.Role {
case "llm-proxy":
collectLLMProxy(ctx, &snap, httpClient, cfg)
case "search":
collectHTTPProbe(ctx, &snap, httpClient, "http://localhost:"+port+"/")
case "mcp":
collectPortProbe(&snap, port)
case "api", "web":
if port != "" {
collectHTTPProbe(ctx, &snap, httpClient, "http://localhost:"+port+"/healthz")
}
}
snap.Status = deriveStatus(snap)
return snap
}
func containerName(c dockerContainer) string {
if len(c.Names) > 0 {
return strings.TrimPrefix(c.Names[0], "/")
}
return c.ID[:12]
}
func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) {
if snap.Extra == nil {
snap.Extra = make(map[string]any)
}
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil)
if resp, err := client.Do(req); err == nil {
code := resp.StatusCode
snap.HTTPStatus = &code
resp.Body.Close()
}
if cfg.LiteLLMAPIKey != "" {
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil)
req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey)
if resp, err := client.Do(req); err == nil {
defer resp.Body.Close()
var result struct {
Data []struct{} `json:"data"`
}
if json.NewDecoder(resp.Body).Decode(&result) == nil {
snap.Extra["model_count"] = len(result.Data)
}
}
}
}
func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, target string) {
start := time.Now()
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, target, nil)
if resp, err := client.Do(req); err == nil {
code := resp.StatusCode
snap.HTTPStatus = &code
resp.Body.Close()
if snap.Extra == nil {
snap.Extra = make(map[string]any)
}
snap.Extra["response_ms"] = time.Since(start).Milliseconds()
}
}
func collectPortProbe(snap *ServiceSnapshot, port string) {
if port == "" {
return
}
conn, err := net.DialTimeout("tcp", "localhost:"+port, 2*time.Second)
reachable := err == nil
if conn != nil {
conn.Close()
}
if snap.Extra == nil {
snap.Extra = make(map[string]any)
}
snap.Extra["port_reachable"] = reachable
}
// deriveStatus computes the overall status from container state + health + probes.
func deriveStatus(snap ServiceSnapshot) string {
if snap.ContainerState != "running" {
return "down"
}
if snap.HealthState == "unhealthy" {
return "degraded"
}
if snap.HTTPStatus != nil && (*snap.HTTPStatus < 200 || *snap.HTTPStatus >= 400) {
return "degraded"
}
if reachable, ok := snap.Extra["port_reachable"].(bool); ok && !reachable {
return "degraded"
}
return "healthy"
}
// DetectIssues scans a set of snapshots for notable problems.
func DetectIssues(services []ServiceSnapshot) Issues {
issues := Issues{}
for _, s := range services {
switch s.Status {
case "down":
issues.ServiceDown = append(issues.ServiceDown, s.Name)
case "degraded":
issues.ServiceDegraded = append(issues.ServiceDegraded, s.Name)
}
if s.Role == "llm-proxy" {
if extra := s.Extra; extra != nil {
if count, ok := extra["cooldown_count"].(int); ok && count > 0 {
issues.LLMCooldowns = true
}
}
}
}
return issues
}