diff --git a/cmd/ingest-gateway/main.go b/cmd/ingest-gateway/main.go index c7d8701..397b701 100644 --- a/cmd/ingest-gateway/main.go +++ b/cmd/ingest-gateway/main.go @@ -6,6 +6,7 @@ import ( "net/http" "os" + "agentmon/internal/event" "agentmon/internal/httpx" qnats "agentmon/internal/queue/nats" @@ -45,11 +46,28 @@ func main() { accepted := 0 rejected := 0 + var errors []map[string]any for _, raw := range events { if len(raw) == 0 { rejected++ continue } + + var m map[string]any + if err := json.Unmarshal(raw, &m); err != nil { + rejected++ + errors = append(errors, map[string]any{"error": "invalid_json"}) + continue + } + + if err := event.Validate(m); err != nil { + rejected++ + if ve, ok := err.(event.ValidationError); ok { + errors = append(errors, map[string]any{"error": "validation_failed", "field": ve.Field, "message": ve.Message}) + } + continue + } + if err := pub.Publish(r.Context(), raw); err != nil { rejected++ continue @@ -57,7 +75,11 @@ func main() { accepted++ } - httpx.WriteJSON(w, http.StatusAccepted, map[string]any{"accepted": accepted, "rejected": rejected}) + resp := map[string]any{"accepted": accepted, "rejected": rejected} + if len(errors) > 0 { + resp["errors"] = errors + } + httpx.WriteJSON(w, http.StatusAccepted, resp) }) r.Get("/v1/ws", wsHandler(pub)) @@ -84,18 +106,26 @@ func wsHandler(pub *qnats.Publisher) http.HandlerFunc { return } - var v map[string]any - if err := json.Unmarshal(msg, &v); err != nil { + var m map[string]any + if err := json.Unmarshal(msg, &m); err != nil { _ = conn.WriteJSON(map[string]any{"error": "invalid_json"}) continue } + if err := event.Validate(m); err != nil { + if ve, ok := err.(event.ValidationError); ok { + _ = conn.WriteJSON(map[string]any{"error": "validation_failed", "field": ve.Field, "message": ve.Message}) + } else { + _ = conn.WriteJSON(map[string]any{"error": "validation_failed"}) + } + continue + } + if err := pub.Publish(r.Context(), msg); err != nil { _ = conn.WriteJSON(map[string]any{"error": "publish_failed"}) continue } - _ = v _ = conn.WriteJSON(map[string]any{"ack": map[string]any{"up_to_seq": nil}}) } }