feat: add validation to ingest gateway

HTTP and WebSocket handlers now validate events before publishing.
Returns detailed error info on validation failures.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
William Valentin
2026-01-17 01:56:04 -08:00
parent 0e2734be23
commit 2fd4fe0ae0
+34 -4
View File
@@ -6,6 +6,7 @@ import (
"net/http" "net/http"
"os" "os"
"agentmon/internal/event"
"agentmon/internal/httpx" "agentmon/internal/httpx"
qnats "agentmon/internal/queue/nats" qnats "agentmon/internal/queue/nats"
@@ -45,11 +46,28 @@ func main() {
accepted := 0 accepted := 0
rejected := 0 rejected := 0
var errors []map[string]any
for _, raw := range events { for _, raw := range events {
if len(raw) == 0 { if len(raw) == 0 {
rejected++ rejected++
continue continue
} }
var m map[string]any
if err := json.Unmarshal(raw, &m); err != nil {
rejected++
errors = append(errors, map[string]any{"error": "invalid_json"})
continue
}
if err := event.Validate(m); err != nil {
rejected++
if ve, ok := err.(event.ValidationError); ok {
errors = append(errors, map[string]any{"error": "validation_failed", "field": ve.Field, "message": ve.Message})
}
continue
}
if err := pub.Publish(r.Context(), raw); err != nil { if err := pub.Publish(r.Context(), raw); err != nil {
rejected++ rejected++
continue continue
@@ -57,7 +75,11 @@ func main() {
accepted++ accepted++
} }
httpx.WriteJSON(w, http.StatusAccepted, map[string]any{"accepted": accepted, "rejected": rejected}) resp := map[string]any{"accepted": accepted, "rejected": rejected}
if len(errors) > 0 {
resp["errors"] = errors
}
httpx.WriteJSON(w, http.StatusAccepted, resp)
}) })
r.Get("/v1/ws", wsHandler(pub)) r.Get("/v1/ws", wsHandler(pub))
@@ -84,18 +106,26 @@ func wsHandler(pub *qnats.Publisher) http.HandlerFunc {
return return
} }
var v map[string]any var m map[string]any
if err := json.Unmarshal(msg, &v); err != nil { if err := json.Unmarshal(msg, &m); err != nil {
_ = conn.WriteJSON(map[string]any{"error": "invalid_json"}) _ = conn.WriteJSON(map[string]any{"error": "invalid_json"})
continue continue
} }
if err := event.Validate(m); err != nil {
if ve, ok := err.(event.ValidationError); ok {
_ = conn.WriteJSON(map[string]any{"error": "validation_failed", "field": ve.Field, "message": ve.Message})
} else {
_ = conn.WriteJSON(map[string]any{"error": "validation_failed"})
}
continue
}
if err := pub.Publish(r.Context(), msg); err != nil { if err := pub.Publish(r.Context(), msg); err != nil {
_ = conn.WriteJSON(map[string]any{"error": "publish_failed"}) _ = conn.WriteJSON(map[string]any{"error": "publish_failed"})
continue continue
} }
_ = v
_ = conn.WriteJSON(map[string]any{"ack": map[string]any{"up_to_seq": nil}}) _ = conn.WriteJSON(map[string]any{"ack": map[string]any{"up_to_seq": nil}})
} }
} }