feat(ingest): batch event writes and harden transport

This commit is contained in:
William Valentin
2026-03-26 11:22:42 -07:00
parent 43877a5448
commit 6605780b58
4 changed files with 65 additions and 9 deletions
+57 -3
View File
@@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
@@ -69,15 +70,60 @@ func main() {
log.Printf("subscribed to %s (%s)", natsTopic, natsURL)
const (
maxBatchSize = 100
flushInterval = 200 * time.Millisecond
)
eventCh := make(chan postgres.InsertEvent, 1000)
// Flush goroutine
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
batch := make([]postgres.InsertEvent, 0, maxBatchSize)
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
flush := func() {
if len(batch) == 0 {
return
}
if err := db.InsertEventBatch(ctx, batch); err != nil {
log.Printf("batch insert error (%d events): %v", len(batch), err)
}
batch = batch[:0]
}
for {
select {
case evt, ok := <-eventCh:
if !ok {
flush()
return
}
batch = append(batch, evt)
if len(batch) >= maxBatchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}()
err = sub.Subscribe(ctx, func(msg []byte) error {
var env envelope
if err := json.Unmarshal(msg, &env); err != nil {
return err
log.Printf("unmarshal error: %v", err)
return nil // don't fail the subscriber
}
ts, err := parseTS(env.Event.TS)
if err != nil {
return err
log.Printf("timestamp parse error: %v", err)
return nil
}
e := postgres.InsertEvent{
@@ -97,9 +143,17 @@ func main() {
e.ParentSpanID = sqlNull(env.Correlation.ParentSpanID)
}
return db.InsertEvent(ctx, e)
select {
case eventCh <- e:
default:
log.Printf("event channel full, dropping event %s", env.Event.ID)
}
return nil
})
close(eventCh)
wg.Wait()
if err != nil && err != context.Canceled {
log.Printf("processor stopped: %v", err)
}
+1
View File
@@ -38,6 +38,7 @@ func main() {
})
r.Post("/v1/events", func(w http.ResponseWriter, r *http.Request) {
r.Body = http.MaxBytesReader(w, r.Body, 10<<20) // 10MB limit
var events []json.RawMessage
if err := json.NewDecoder(r.Body).Decode(&events); err != nil {
httpx.WriteJSON(w, http.StatusBadRequest, map[string]any{"error": "invalid_json"})
-4
View File
@@ -26,9 +26,5 @@ func (p *Publisher) Close() {
}
func (p *Publisher) Publish(ctx context.Context, data []byte) error {
ctx, cancel := context.WithTimeout(ctx, p.timeout)
defer cancel()
_ = ctx
return p.conn.Publish(p.topic, data)
}
+7 -2
View File
@@ -221,7 +221,7 @@ func (e *Emitter) Emit(ctx context.Context, event Event) error {
e.buffer = append(e.buffer, event)
if len(e.buffer) >= e.bufferSize {
return e.Flush(ctx)
return e.flushLocked(ctx)
}
return nil
}
@@ -235,6 +235,11 @@ func (e *Emitter) Flush(ctx context.Context) error {
return fmt.Errorf("emitter is closed")
}
return e.flushLocked(ctx)
}
// flushLocked sends buffered events. Caller must hold e.mu.
func (e *Emitter) flushLocked(ctx context.Context) error {
if len(e.buffer) == 0 {
return nil
}
@@ -281,7 +286,7 @@ func (e *Emitter) Close(ctx context.Context) error {
}
if len(e.buffer) > 0 {
_ = e.Flush(ctx)
_ = e.flushLocked(ctx)
}
return nil