package main import ( "database/sql" "log" "net/http" "os" "strconv" "sync" "time" "agentmon/internal/httpx" "agentmon/internal/store/postgres" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/gorilla/websocket" "github.com/nats-io/nats.go" ) var ( wsUpgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } wsClients = make(map[*websocket.Conn]bool) wsMu sync.RWMutex natsConn *nats.Conn ) func subscribeToNATS(nc *nats.Conn) { topic := envDefault("NATS_TOPIC", "agentmon.events.v1") sub, err := nc.Subscribe(topic, func(msg *nats.Msg) { wsMu.RLock() var stale []*websocket.Conn for conn := range wsClients { err := conn.WriteMessage(websocket.TextMessage, msg.Data) if err != nil { conn.Close() stale = append(stale, conn) } } wsMu.RUnlock() if len(stale) > 0 { wsMu.Lock() for _, conn := range stale { delete(wsClients, conn) } wsMu.Unlock() } }) if err != nil { log.Printf("failed to subscribe to NATS: %v", err) return } log.Printf("subscribed to NATS topic: %s", topic) _ = sub } func wsHandler(w http.ResponseWriter, r *http.Request) { conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() wsMu.Lock() wsClients[conn] = true wsMu.Unlock() log.Printf("WebSocket client connected") for { _, _, err := conn.ReadMessage() if err != nil { break } } wsMu.Lock() delete(wsClients, conn) wsMu.Unlock() log.Printf("WebSocket client disconnected") } func main() { addr := envDefault("AGENTMON_QUERY_ADDR", ":8081") dsn := os.Getenv("DATABASE_URL") natsURL := envDefault("NATS_URL", "nats://localhost:4222") if dsn == "" { log.Fatalf("DATABASE_URL is required") } db, err := postgres.Open(dsn) if err != nil { log.Fatalf("failed to open DB: %v", err) } defer func() { _ = db.Close() }() nc, err := nats.Connect(natsURL) if err != nil { log.Printf("warning: failed to connect to NATS: %v", err) } else { natsConn = nc go subscribeToNATS(nc) } r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.RealIP) r.Use(middleware.Logger) r.Use(middleware.Recoverer) r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) r.Get("/v1/ws", wsHandler) r.Get("/v1/events", func(w http.ResponseWriter, r *http.Request) { limit, _ := strconv.Atoi(r.URL.Query().Get("limit")) f := postgres.EventsFilter{ Limit: limit, EventType: r.URL.Query().Get("event_type"), Framework: r.URL.Query().Get("framework"), } events, err := db.ListRecentEvents(r.Context(), f) if err != nil { httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) return } httpx.WriteJSON(w, http.StatusOK, map[string]any{"events": events}) }) r.Get("/v1/sessions", func(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() f := postgres.SessionsFilter{ Framework: q.Get("framework"), Host: q.Get("host"), } if v := q.Get("limit"); v != "" { f.Limit, _ = strconv.Atoi(v) } if v := q.Get("from"); v != "" { if t, err := time.Parse(time.RFC3339, v); err == nil { f.From = &t } else if t, err := time.Parse("2006-01-02", v); err == nil { f.From = &t } } if v := q.Get("to"); v != "" { if t, err := time.Parse(time.RFC3339, v); err == nil { f.To = &t } else if t, err := time.Parse("2006-01-02", v); err == nil { end := t.Add(24*time.Hour - time.Nanosecond) f.To = &end } } if v := q.Get("cursor"); v != "" { if t, err := time.Parse(time.RFC3339Nano, v); err == nil { f.Cursor = &t } } sessions, nextCursor, err := db.ListSessions(r.Context(), f) if err != nil { httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) return } resp := map[string]any{"sessions": sessions} if nextCursor != nil { resp["next_cursor"] = nextCursor.Format(time.RFC3339Nano) } httpx.WriteJSON(w, http.StatusOK, resp) }) r.Get("/v1/sessions/{sessionID}", func(w http.ResponseWriter, r *http.Request) { sessionID := chi.URLParam(r, "sessionID") session, runs, err := db.GetSessionWithRuns(r.Context(), sessionID) if err == sql.ErrNoRows { httpx.WriteJSON(w, http.StatusNotFound, map[string]any{"error": "not_found"}) return } if err != nil { httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) return } httpx.WriteJSON(w, http.StatusOK, map[string]any{"session": session, "runs": runs}) }) r.Get("/v1/runs/{runID}", func(w http.ResponseWriter, r *http.Request) { runID := chi.URLParam(r, "runID") run, spans, err := db.GetRunWithSpans(r.Context(), runID) if err == sql.ErrNoRows { httpx.WriteJSON(w, http.StatusNotFound, map[string]any{"error": "not_found"}) return } if err != nil { httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) return } httpx.WriteJSON(w, http.StatusOK, map[string]any{"run": run, "spans": spans}) }) r.Get("/v1/stats/summary", func(w http.ResponseWriter, r *http.Request) { summary, err := db.GetSummary(r.Context()) if err != nil { httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) return } httpx.WriteJSON(w, http.StatusOK, summary) }) r.Get("/v1/stats/timeseries", func(w http.ResponseWriter, r *http.Request) { window := r.URL.Query().Get("window") switch window { case "1h", "6h", "24h", "7d": case "": window = "1h" default: httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_window"}) return } timeseries, err := db.GetTimeseries(r.Context(), window) if err != nil { httpx.WriteJSON(w, http.StatusInternalServerError, map[string]any{"error": "db_error"}) return } httpx.WriteJSON(w, http.StatusOK, timeseries) }) log.Printf("query-api listening on %s", addr) log.Fatal(http.ListenAndServe(addr, r)) } func envDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def }