package main import ( "context" "database/sql" "encoding/json" "log" "os" "os/signal" "syscall" "time" qnats "agentmon/internal/queue/nats" "agentmon/internal/store/postgres" ) type envelope struct { Schema struct { Name string `json:"name"` Version int `json:"version"` } `json:"schema"` Event struct { ID string `json:"id"` Type string `json:"type"` TS any `json:"ts"` Source struct { Framework string `json:"framework"` ClientID string `json:"client_id"` } `json:"source"` } `json:"event"` Correlation *struct { SessionID string `json:"session_id,omitempty"` RunID string `json:"run_id,omitempty"` TraceID string `json:"trace_id,omitempty"` SpanID string `json:"span_id,omitempty"` ParentSpanID string `json:"parent_span_id,omitempty"` } `json:"correlation,omitempty"` } func main() { log.Printf("event-processor starting") dsn := os.Getenv("DATABASE_URL") if dsn == "" { log.Fatalf("DATABASE_URL is required") } natsURL := envDefault("NATS_URL", "nats://nats:4222") natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") db, err := postgres.Open(dsn) if err != nil { log.Fatalf("failed to open DB: %v", err) } defer func() { _ = db.Close() }() ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() if err := db.Ping(ctx); err != nil { log.Fatalf("failed to ping DB: %v", err) } sub, err := qnats.NewSubscriber(natsURL, natsTopic) if err != nil { log.Fatalf("failed to connect to NATS: %v", err) } defer sub.Close() log.Printf("subscribed to %s (%s)", natsTopic, natsURL) err = sub.Subscribe(ctx, func(msg []byte) error { var env envelope if err := json.Unmarshal(msg, &env); err != nil { return err } ts, err := parseTS(env.Event.TS) if err != nil { return err } e := postgres.InsertEvent{ EventID: env.Event.ID, TS: ts, Type: env.Event.Type, Payload: json.RawMessage(msg), SourceFramework: sqlNull(env.Event.Source.Framework), ClientID: sqlNull(env.Event.Source.ClientID), } if env.Correlation != nil { e.SessionID = sqlNull(env.Correlation.SessionID) e.RunID = sqlNull(env.Correlation.RunID) e.TraceID = sqlNull(env.Correlation.TraceID) e.SpanID = sqlNull(env.Correlation.SpanID) e.ParentSpanID = sqlNull(env.Correlation.ParentSpanID) } return db.InsertEvent(ctx, e) }) if err != nil && err != context.Canceled { log.Printf("processor stopped: %v", err) } } func envDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } func sqlNull(s string) sql.NullString { if s == "" { return sql.NullString{} } return sql.NullString{String: s, Valid: true} } func parseTS(v any) (time.Time, error) { // Accept RFC3339 string or unix-ms number. switch t := v.(type) { case string: return time.Parse(time.RFC3339Nano, t) case float64: ms := int64(t) return time.Unix(0, ms*int64(time.Millisecond)), nil default: return time.Time{}, postgres.ErrMissingField } }