package main import ( "encoding/json" "log" "net/http" "os" "agentmon/internal/event" "agentmon/internal/httpx" qnats "agentmon/internal/queue/nats" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/gorilla/websocket" ) func main() { addr := envDefault("AGENTMON_ADDR", ":8080") natsURL := envDefault("NATS_URL", "nats://nats:4222") natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") pub, err := qnats.NewPublisher(natsURL, natsTopic) if err != nil { log.Fatalf("failed to connect to NATS: %v", err) } defer pub.Close() r := chi.NewRouter() r.Use(middleware.RequestID) r.Use(middleware.RealIP) r.Use(middleware.Logger) r.Use(middleware.Recoverer) r.Get("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) _, _ = w.Write([]byte("ok")) }) r.Post("/v1/events", func(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, 10<<20) // 10MB limit var events []json.RawMessage if err := json.NewDecoder(r.Body).Decode(&events); err != nil { httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_json"}) return } accepted := 0 rejected := 0 var errors []map[string]any for _, raw := range events { if len(raw) == 0 { rejected++ continue } var m map[string]any if err := json.Unmarshal(raw, &m); err != nil { rejected++ errors = append(errors, map[string]any{"error": "invalid_json"}) continue } if err := event.Validate(m); err != nil { rejected++ if ve, ok := err.(event.ValidationError); ok { errors = append(errors, map[string]any{"error": "validation_failed", "field": ve.Field, "message": ve.Message}) } continue } if err := pub.Publish(r.Context(), raw); err != nil { rejected++ continue } accepted++ } resp := map[string]any{"accepted": accepted, "rejected": rejected} if len(errors) > 0 { resp["errors"] = errors } httpx.WriteJSON(w, http.StatusAccepted, resp) }) r.Get("/v1/ws", wsHandler(pub)) log.Printf("ingest-gateway listening on %s", addr) log.Fatal(http.ListenAndServe(addr, r)) } func wsHandler(pub *qnats.Publisher) http.HandlerFunc { upgrader := websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } return func(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close() for { _, msg, err := conn.ReadMessage() if err != nil { return } var m map[string]any if err := json.Unmarshal(msg, &m); err != nil { _ = conn.WriteJSON(map[string]any{"error": "invalid_json"}) continue } if err := event.Validate(m); err != nil { if ve, ok := err.(event.ValidationError); ok { _ = conn.WriteJSON(map[string]any{"error": "validation_failed", "field": ve.Field, "message": ve.Message}) } else { _ = conn.WriteJSON(map[string]any{"error": "validation_failed"}) } continue } if err := pub.Publish(r.Context(), msg); err != nil { _ = conn.WriteJSON(map[string]any{"error": "publish_failed"}) continue } _ = conn.WriteJSON(map[string]any{"ack": map[string]any{"up_to_seq": nil}}) } } } func envDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def }