Files
agentmon/cmd/query-api/main.go
T
William Valentin f8ddea3698 feat: add agentmon services section to infrastructure page
Label all agentmon docker-compose services with agentmon.monitor=true
and agentmon.group=agentmon so the swarm-monitor picks them up.
Adds Group field to ServiceSnapshot, probes /healthz for api/web roles,
and renders a separate "Agentmon" section below Swarm Services on the
Infrastructure page with new api and worker card renderers.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 13:41:26 -07:00

262 lines
6.5 KiB
Go

package main
import (
"database/sql"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
"agentmon/internal/httpx"
"agentmon/internal/store/postgres"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
)
var (
wsUpgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
wsClients = make(map[*websocket.Conn]bool)
wsMu sync.RWMutex
natsConn *nats.Conn
)
func subscribeToNATS(nc *nats.Conn) {
topic := envDefault("NATS_TOPIC", "agentmon.events.v1")
sub, err := nc.Subscribe(topic, func(msg *nats.Msg) {
wsMu.RLock()
var stale []*websocket.Conn
for conn := range wsClients {
err := conn.WriteMessage(websocket.TextMessage, msg.Data)
if err != nil {
conn.Close()
stale = append(stale, conn)
}
}
wsMu.RUnlock()
if len(stale) > 0 {
wsMu.Lock()
for _, conn := range stale {
delete(wsClients, conn)
}
wsMu.Unlock()
}
})
if err != nil {
log.Printf("failed to subscribe to NATS: %v", err)
return
}
log.Printf("subscribed to NATS topic: %s", topic)
_ = sub
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := wsUpgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
wsMu.Lock()
wsClients[conn] = true
wsMu.Unlock()
log.Printf("WebSocket client connected")
for {
_, _, err := conn.ReadMessage()
if err != nil {
break
}
}
wsMu.Lock()
delete(wsClients, conn)
wsMu.Unlock()
log.Printf("WebSocket client disconnected")
}
func main() {
addr := envDefault("AGENTMON_QUERY_ADDR", ":8081")
dsn := os.Getenv("DATABASE_URL")
natsURL := envDefault("NATS_URL", "nats://localhost:4222")
if dsn == "" {
log.Fatalf("DATABASE_URL is required")
}
db, err := postgres.Open(dsn)
if err != nil {
log.Fatalf("failed to open DB: %v", err)
}
defer func() { _ = db.Close() }()
nc, err := nats.Connect(natsURL)
if err != nil {
log.Printf("warning: failed to connect to NATS: %v", err)
} else {
natsConn = nc
go subscribeToNATS(nc)
}
r := chi.NewRouter()
r.Use(middleware.RequestID)
r.Use(middleware.RealIP)
r.Use(middleware.Logger)
r.Use(middleware.Recoverer)
r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
r.Get("/v1/ws", wsHandler)
r.Get("/v1/events", func(w http.ResponseWriter, r *http.Request) {
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
f := postgres.EventsFilter{
Limit: limit,
EventType: r.URL.Query().Get("event_type"),
Framework: r.URL.Query().Get("framework"),
}
events, err := db.ListRecentEvents(r.Context(), f)
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
httpx.WriteJSON(w, http.StatusOK, map[string]any{"events": events})
})
r.Get("/v1/sessions", func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
f := postgres.SessionsFilter{
Framework: q.Get("framework"),
Host: q.Get("host"),
}
if v := q.Get("limit"); v != "" {
f.Limit, _ = strconv.Atoi(v)
}
if v := q.Get("from"); v != "" {
if t, err := time.Parse(time.RFC3339, v); err == nil {
f.From = &t
} else if t, err := time.Parse("2006-01-02", v); err == nil {
f.From = &t
}
}
if v := q.Get("to"); v != "" {
if t, err := time.Parse(time.RFC3339, v); err == nil {
f.To = &t
} else if t, err := time.Parse("2006-01-02", v); err == nil {
end := t.Add(24*time.Hour - time.Nanosecond)
f.To = &end
}
}
if v := q.Get("cursor"); v != "" {
if t, err := time.Parse(time.RFC3339Nano, v); err == nil {
f.Cursor = &t
}
}
sessions, nextCursor, err := db.ListSessions(r.Context(), f)
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
resp := map[string]any{"sessions": sessions}
if nextCursor != nil {
resp["next_cursor"] = nextCursor.Format(time.RFC3339Nano)
}
httpx.WriteJSON(w, http.StatusOK, resp)
})
r.Get("/v1/sessions/{sessionID}", func(w http.ResponseWriter, r *http.Request) {
sessionID := chi.URLParam(r, "sessionID")
session, runs, err := db.GetSessionWithRuns(r.Context(), sessionID)
if err == sql.ErrNoRows {
httpx.WriteJSON(w, http.StatusNotFound, map[string]any{"error": "not_found"})
return
}
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
httpx.WriteJSON(w, http.StatusOK, map[string]any{"session": session, "runs": runs})
})
r.Get("/v1/runs/{runID}", func(w http.ResponseWriter, r *http.Request) {
runID := chi.URLParam(r, "runID")
run, spans, err := db.GetRunWithSpans(r.Context(), runID)
if err == sql.ErrNoRows {
httpx.WriteJSON(w, http.StatusNotFound, map[string]any{"error": "not_found"})
return
}
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
httpx.WriteJSON(w, http.StatusOK, map[string]any{"run": run, "spans": spans})
})
r.Get("/v1/stats/summary", func(w http.ResponseWriter, r *http.Request) {
summary, err := db.GetSummary(r.Context())
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
httpx.WriteJSON(w, http.StatusOK, summary)
})
r.Get("/v1/stats/top-tools", func(w http.ResponseWriter, r *http.Request) {
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
tools, err := db.GetTopTools(r.Context(), limit)
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
if tools == nil {
tools = []postgres.TopTool{}
}
httpx.WriteJSON(w, http.StatusOK, map[string]any{"tools": tools})
})
r.Get("/v1/stats/timeseries", func(w http.ResponseWriter, r *http.Request) {
window := r.URL.Query().Get("window")
switch window {
case "1h", "6h", "24h", "7d":
case "":
window = "1h"
default:
httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_window"})
return
}
timeseries, err := db.GetTimeseries(r.Context(), window)
if err != nil {
httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"})
return
}
httpx.WriteJSON(w, http.StatusOK, timeseries)
})
log.Printf("query-api listening on %s", addr)
log.Fatal(http.ListenAndServe(addr, r))
}
func envDefault(key, def string) string {
if v := os.Getenv(key); v != "" {
return v
}
return def
}