package main import ( "context" "database/sql" "encoding/json" "log" "os" "os/signal" "sync" "syscall" "time" qnats "agentmon/internal/queue/nats" "agentmon/internal/store/postgres" ) type envelope struct { Schema struct { Name string `json:"name"` Version int `json:"version"` } `json:"schema"` Event struct { ID string `json:"id"` Type string `json:"type"` TS any `json:"ts"` Source struct { Framework string `json:"framework"` ClientID string `json:"client_id"` } `json:"source"` } `json:"event"` Correlation *struct { SessionID string `json:"session_id,omitempty"` RunID string `json:"run_id,omitempty"` TraceID string `json:"trace_id,omitempty"` SpanID string `json:"span_id,omitempty"` ParentSpanID string `json:"parent_span_id,omitempty"` } `json:"correlation,omitempty"` } func main() { log.Printf("event-processor starting") dsn := os.Getenv("DATABASE_URL") if dsn == "" { log.Fatalf("DATABASE_URL is required") } natsURL := envDefault("NATS_URL", "nats://nats:4222") natsTopic := envDefault("NATS_TOPIC", "agentmon.events.v1") db, err := postgres.Open(dsn) if err != nil { log.Fatalf("failed to open DB: %v", err) } defer func() { _ = db.Close() }() ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() if err := db.Ping(ctx); err != nil { log.Fatalf("failed to ping DB: %v", err) } sub, err := qnats.NewSubscriber(natsURL, natsTopic) if err != nil { log.Fatalf("failed to connect to NATS: %v", err) } defer sub.Close() log.Printf("subscribed to %s (%s)", natsTopic, natsURL) const ( maxBatchSize = 100 flushInterval = 200 * time.Millisecond ) eventCh := make(chan postgres.InsertEvent, 1000) // Flush goroutine var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() batch := make([]postgres.InsertEvent, 0, maxBatchSize) ticker := time.NewTicker(flushInterval) defer ticker.Stop() flush := func() { if len(batch) == 0 { return } if err := db.InsertEventBatch(ctx, batch); err != nil { log.Printf("batch insert error (%d events): %v", len(batch), err) } batch = batch[:0] } for { select { case evt, ok := <-eventCh: if !ok { flush() return } batch = append(batch, evt) if len(batch) >= maxBatchSize { flush() } case <-ticker.C: flush() } } }() err = sub.Subscribe(ctx, func(msg []byte) error { var env envelope if err := json.Unmarshal(msg, &env); err != nil { log.Printf("unmarshal error: %v", err) return nil // don't fail the subscriber } ts, err := parseTS(env.Event.TS) if err != nil { log.Printf("timestamp parse error: %v", err) return nil } e := postgres.InsertEvent{ EventID: env.Event.ID, TS: ts, Type: env.Event.Type, Payload: json.RawMessage(msg), SourceFramework: sqlNull(env.Event.Source.Framework), ClientID: sqlNull(env.Event.Source.ClientID), } if env.Correlation != nil { e.SessionID = sqlNull(env.Correlation.SessionID) e.RunID = sqlNull(env.Correlation.RunID) e.TraceID = sqlNull(env.Correlation.TraceID) e.SpanID = sqlNull(env.Correlation.SpanID) e.ParentSpanID = sqlNull(env.Correlation.ParentSpanID) } select { case eventCh <- e: default: log.Printf("event channel full, dropping event %s", env.Event.ID) } return nil }) close(eventCh) wg.Wait() if err != nil && err != context.Canceled { log.Printf("processor stopped: %v", err) } } func envDefault(key, def string) string { if v := os.Getenv(key); v != "" { return v } return def } func sqlNull(s string) sql.NullString { if s == "" { return sql.NullString{} } return sql.NullString{String: s, Valid: true} } func parseTS(v any) (time.Time, error) { // Accept RFC3339 string or unix-ms number. switch t := v.(type) { case string: return time.Parse(time.RFC3339Nano, t) case float64: ms := int64(t) return time.Unix(0, ms*int64(time.Millisecond)), nil default: return time.Time{}, postgres.ErrMissingField } }