diff --git a/cmd/event-processor/main.go b/cmd/event-processor/main.go index dd15f93..22688ec 100644 --- a/cmd/event-processor/main.go +++ b/cmd/event-processor/main.go @@ -7,6 +7,7 @@ import ( "log" "os" "os/signal" + "sync" "syscall" "time" @@ -69,15 +70,60 @@ func main() { log.Printf("subscribed to %s (%s)", natsTopic, natsURL) + const ( + maxBatchSize = 100 + flushInterval = 200 * time.Millisecond + ) + + eventCh := make(chan postgres.InsertEvent, 1000) + + // Flush goroutine + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + batch := make([]postgres.InsertEvent, 0, maxBatchSize) + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + + flush := func() { + if len(batch) == 0 { + return + } + if err := db.InsertEventBatch(ctx, batch); err != nil { + log.Printf("batch insert error (%d events): %v", len(batch), err) + } + batch = batch[:0] + } + + for { + select { + case evt, ok := <-eventCh: + if !ok { + flush() + return + } + batch = append(batch, evt) + if len(batch) >= maxBatchSize { + flush() + } + case <-ticker.C: + flush() + } + } + }() + err = sub.Subscribe(ctx, func(msg []byte) error { var env envelope if err := json.Unmarshal(msg, &env); err != nil { - return err + log.Printf("unmarshal error: %v", err) + return nil // don't fail the subscriber } ts, err := parseTS(env.Event.TS) if err != nil { - return err + log.Printf("timestamp parse error: %v", err) + return nil } e := postgres.InsertEvent{ @@ -97,9 +143,17 @@ func main() { e.ParentSpanID = sqlNull(env.Correlation.ParentSpanID) } - return db.InsertEvent(ctx, e) + select { + case eventCh <- e: + default: + log.Printf("event channel full, dropping event %s", env.Event.ID) + } + return nil }) + close(eventCh) + wg.Wait() + if err != nil && err != context.Canceled { log.Printf("processor stopped: %v", err) } diff --git a/cmd/ingest-gateway/main.go b/cmd/ingest-gateway/main.go index 397b701..5c863ef 100644 --- a/cmd/ingest-gateway/main.go +++ b/cmd/ingest-gateway/main.go @@ -38,6 +38,7 @@ func main() { }) r.Post("/v1/events", func(w http.ResponseWriter, r *http.Request) { + r.Body = http.MaxBytesReader(w, r.Body, 10<<20) // 10MB limit var events []json.RawMessage if err := json.NewDecoder(r.Body).Decode(&events); err != nil { httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_json"}) diff --git a/internal/queue/nats/nats.go b/internal/queue/nats/nats.go index 74d9bb2..adc9333 100644 --- a/internal/queue/nats/nats.go +++ b/internal/queue/nats/nats.go @@ -26,9 +26,5 @@ func (p *Publisher) Close() { } func (p *Publisher) Publish(ctx context.Context, data []byte) error { - ctx, cancel := context.WithTimeout(ctx, p.timeout) - defer cancel() - - _ = ctx return p.conn.Publish(p.topic, data) } diff --git a/internal/sdk/emitter.go b/internal/sdk/emitter.go index d95b255..387139c 100644 --- a/internal/sdk/emitter.go +++ b/internal/sdk/emitter.go @@ -221,7 +221,7 @@ func (e *Emitter) Emit(ctx context.Context, event Event) error { e.buffer = append(e.buffer, event) if len(e.buffer) >= e.bufferSize { - return e.Flush(ctx) + return e.flushLocked(ctx) } return nil } @@ -235,6 +235,11 @@ func (e *Emitter) Flush(ctx context.Context) error { return fmt.Errorf("emitter is closed") } + return e.flushLocked(ctx) +} + +// flushLocked sends buffered events. Caller must hold e.mu. +func (e *Emitter) flushLocked(ctx context.Context) error { if len(e.buffer) == 0 { return nil } @@ -281,7 +286,7 @@ func (e *Emitter) Close(ctx context.Context) error { } if len(e.buffer) > 0 { - _ = e.Flush(ctx) + _ = e.flushLocked(ctx) } return nil