diff --git a/Dockerfile b/Dockerfile index 71d6329..92554b9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,7 +20,6 @@ RUN apt-get update && apt-get install -y \ ca-certificates \ libvirt-clients \ openssh-client \ - netcat-openbsd \ && rm -rf /var/lib/apt/lists/* WORKDIR /app diff --git a/internal/monitor/swarm/collector.go b/internal/monitor/swarm/collector.go index 167cd41..41b435d 100644 --- a/internal/monitor/swarm/collector.go +++ b/internal/monitor/swarm/collector.go @@ -4,8 +4,9 @@ import ( "context" "encoding/json" "fmt" + "net" "net/http" - "os/exec" + "net/url" "strings" "time" ) @@ -15,19 +16,19 @@ type Config struct { LiteLLMBaseURL string LiteLLMAPIKey string HTTPTimeout time.Duration + DockerSocket string // defaults to /var/run/docker.sock } -// dockerPsEntry is the JSON shape from `docker ps --format '{{json .}}'`. -type dockerPsEntry struct { - ID string `json:"ID"` - Names string `json:"Names"` - Status string `json:"Status"` - State string `json:"State"` +// dockerContainer is the shape returned by GET /containers/json. +type dockerContainer struct { + ID string `json:"Id"` + Names []string `json:"Names"` + State string `json:"State"` + Labels map[string]string `json:"Labels"` } -// dockerInspectEntry is the minimal shape we need from `docker inspect`. -type dockerInspectEntry struct { - Name string `json:"Name"` +// dockerContainerDetail is the shape returned by GET /containers/{id}/json. +type dockerContainerDetail struct { State struct { Status string `json:"Status"` Running bool `json:"Running"` @@ -36,119 +37,122 @@ type dockerInspectEntry struct { Status string `json:"Status"` } `json:"Health"` } `json:"State"` - Config struct { - Labels map[string]string `json:"Labels"` - } `json:"Config"` +} + +func newDockerClient(socketPath string) *http.Client { + if socketPath == "" { + socketPath = "/var/run/docker.sock" + } + return &http.Client{ + Transport: &http.Transport{ + DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, "unix", socketPath) + }, + }, + } } // CollectAll lists all containers labeled agentmon.monitor=true and collects // a ServiceSnapshot for each. func CollectAll(ctx context.Context, cfg Config) ([]ServiceSnapshot, error) { - // List labeled containers (running + stopped). - out, err := exec.CommandContext(ctx, "docker", "ps", "-a", - "--filter", "label=agentmon.monitor=true", - "--format", "{{json .}}", - ).Output() + dockerClient := newDockerClient(cfg.DockerSocket) + httpClient := &http.Client{Timeout: cfg.HTTPTimeout} + + filters := url.QueryEscape(`{"label":["agentmon.monitor=true"]}`) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + "http://localhost/v1.41/containers/json?all=1&filters="+filters, nil) if err != nil { - return nil, fmt.Errorf("docker ps failed: %w", err) + return nil, err } - var entries []dockerPsEntry - for _, line := range strings.Split(strings.TrimSpace(string(out)), "\n") { - if line == "" { - continue - } - var e dockerPsEntry - if err := json.Unmarshal([]byte(line), &e); err != nil { - continue - } - entries = append(entries, e) + resp, err := dockerClient.Do(req) + if err != nil { + return nil, fmt.Errorf("docker API unavailable: %w", err) + } + defer resp.Body.Close() + + var containers []dockerContainer + if err := json.NewDecoder(resp.Body).Decode(&containers); err != nil { + return nil, fmt.Errorf("docker API parse error: %w", err) } - client := &http.Client{Timeout: cfg.HTTPTimeout} var snapshots []ServiceSnapshot - for _, e := range entries { - snap := collectOne(ctx, e.Names, client, cfg) - snapshots = append(snapshots, snap) + for _, c := range containers { + snapshots = append(snapshots, collectOne(ctx, c, dockerClient, httpClient, cfg)) } - return snapshots, nil } -func collectOne(ctx context.Context, name string, client *http.Client, cfg Config) ServiceSnapshot { +func collectOne(ctx context.Context, c dockerContainer, dockerClient, httpClient *http.Client, cfg Config) ServiceSnapshot { + name := containerName(c) snap := ServiceSnapshot{ Name: name, - ContainerState: "missing", + Role: c.Labels["agentmon.role"], + ContainerState: c.State, HealthState: "none", Status: "down", } - // Inspect for detailed state. - out, err := exec.CommandContext(ctx, "docker", "inspect", "--format", "{{json .}}", name).Output() - if err != nil { - return snap - } - - var detail dockerInspectEntry - if err := json.Unmarshal(out, &detail); err != nil { - return snap - } - - snap.Role = detail.Config.Labels["agentmon.role"] - snap.ContainerState = detail.State.Status - - if detail.State.Health != nil { - snap.HealthState = detail.State.Health.Status - } - - // Calculate uptime if running. - if detail.State.Running && detail.State.StartedAt != "" { - if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil { - snap.UptimeSec = int64(time.Since(t).Seconds()) + // Inspect for health state and uptime (not in list response). + req, err := http.NewRequestWithContext(ctx, http.MethodGet, + "http://localhost/v1.41/containers/"+c.ID+"/json", nil) + if err == nil { + if resp, err := dockerClient.Do(req); err == nil { + var detail dockerContainerDetail + if json.NewDecoder(resp.Body).Decode(&detail) == nil { + if detail.State.Health != nil { + snap.HealthState = detail.State.Health.Status + } + if detail.State.Running && detail.State.StartedAt != "" { + if t, err := time.Parse(time.RFC3339Nano, detail.State.StartedAt); err == nil { + snap.UptimeSec = int64(time.Since(t).Seconds()) + } + } + } + resp.Body.Close() } } - // Role-specific probes. + port := c.Labels["agentmon.port"] switch snap.Role { case "llm-proxy": - collectLLMProxy(ctx, &snap, client, cfg) + collectLLMProxy(ctx, &snap, httpClient, cfg) case "search": - collectHTTPProbe(ctx, &snap, client, "http://localhost:"+detail.Config.Labels["agentmon.port"]+"/") + collectHTTPProbe(ctx, &snap, httpClient, "http://localhost:"+port+"/") case "mcp": - collectPortProbe(ctx, &snap, detail.Config.Labels["agentmon.port"]) - case "db", "voice", "automation": - // Docker healthcheck state is sufficient; no HTTP probe. + collectPortProbe(&snap, port) } snap.Status = deriveStatus(snap) return snap } +func containerName(c dockerContainer) string { + if len(c.Names) > 0 { + return strings.TrimPrefix(c.Names[0], "/") + } + return c.ID[:12] +} + func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Client, cfg Config) { if snap.Extra == nil { snap.Extra = make(map[string]any) } - // Health probe. req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/health/liveliness", nil) - resp, err := client.Do(req) - if err == nil { + if resp, err := client.Do(req); err == nil { code := resp.StatusCode snap.HTTPStatus = &code resp.Body.Close() } - // Model count. if cfg.LiteLLMAPIKey != "" { req, _ := http.NewRequestWithContext(ctx, http.MethodGet, cfg.LiteLLMBaseURL+"/v2/model/info", nil) req.Header.Set("Authorization", "Bearer "+cfg.LiteLLMAPIKey) - resp, err := client.Do(req) - if err == nil { + if resp, err := client.Do(req); err == nil { defer resp.Body.Close() var result struct { - Data []struct { - ModelName string `json:"model_name"` - } `json:"data"` + Data []struct{} `json:"data"` } if json.NewDecoder(resp.Body).Decode(&result) == nil { snap.Extra["model_count"] = len(result.Data) @@ -157,29 +161,29 @@ func collectLLMProxy(ctx context.Context, snap *ServiceSnapshot, client *http.Cl } } -func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, url string) { +func collectHTTPProbe(ctx context.Context, snap *ServiceSnapshot, client *http.Client, target string) { start := time.Now() - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - resp, err := client.Do(req) - if err == nil { + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if resp, err := client.Do(req); err == nil { code := resp.StatusCode snap.HTTPStatus = &code resp.Body.Close() - ms := time.Since(start).Milliseconds() if snap.Extra == nil { snap.Extra = make(map[string]any) } - snap.Extra["response_ms"] = ms + snap.Extra["response_ms"] = time.Since(start).Milliseconds() } } -func collectPortProbe(ctx context.Context, snap *ServiceSnapshot, port string) { +func collectPortProbe(snap *ServiceSnapshot, port string) { if port == "" { return } - // Use nc to check TCP reachability. - err := exec.CommandContext(ctx, "nc", "-z", "-w1", "localhost", port).Run() + conn, err := net.DialTimeout("tcp", "localhost:"+port, 2*time.Second) reachable := err == nil + if conn != nil { + conn.Close() + } if snap.Extra == nil { snap.Extra = make(map[string]any) }