109 lines
2.2 KiB
Go
109 lines
2.2 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"time"
|
|
)
|
|
|
|
type InsertEvent struct {
|
|
EventID string
|
|
TS time.Time
|
|
Type string
|
|
|
|
SessionID sql.NullString
|
|
RunID sql.NullString
|
|
TraceID sql.NullString
|
|
SpanID sql.NullString
|
|
ParentSpanID sql.NullString
|
|
|
|
SourceFramework sql.NullString
|
|
ClientID sql.NullString
|
|
Payload any
|
|
}
|
|
|
|
func (d *DB) InsertEvent(ctx context.Context, e InsertEvent) error {
|
|
payload, err := json.Marshal(e.Payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = d.sql.ExecContext(ctx, `
|
|
insert into events (
|
|
event_id, ts, type, session_id, run_id, trace_id, span_id, parent_span_id,
|
|
source_framework, client_id, payload
|
|
) values (
|
|
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11
|
|
)
|
|
on conflict (event_id) do nothing
|
|
`,
|
|
e.EventID,
|
|
e.TS,
|
|
e.Type,
|
|
e.SessionID,
|
|
e.RunID,
|
|
e.TraceID,
|
|
e.SpanID,
|
|
e.ParentSpanID,
|
|
e.SourceFramework,
|
|
e.ClientID,
|
|
payload,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (d *DB) InsertEventBatch(ctx context.Context, events []InsertEvent) error {
|
|
if len(events) == 0 {
|
|
return nil
|
|
}
|
|
if len(events) == 1 {
|
|
return d.InsertEvent(ctx, events[0])
|
|
}
|
|
|
|
tx, err := d.sql.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback()
|
|
|
|
stmt, err := tx.PrepareContext(ctx, `
|
|
INSERT INTO events (
|
|
event_id, ts, type, session_id, run_id, trace_id, span_id, parent_span_id,
|
|
source_framework, client_id, payload
|
|
) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)
|
|
ON CONFLICT (event_id) DO NOTHING
|
|
`)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer stmt.Close()
|
|
|
|
for _, e := range events {
|
|
payload, err := json.Marshal(e.Payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = stmt.ExecContext(ctx, e.EventID, e.TS, e.Type, e.SessionID, e.RunID,
|
|
e.TraceID, e.SpanID, e.ParentSpanID, e.SourceFramework, e.ClientID, payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return tx.Commit()
|
|
}
|
|
|
|
// DeleteOlderThan removes events with ts older than the given cutoff.
|
|
// Returns the number of rows deleted.
|
|
func (d *DB) DeleteOlderThan(ctx context.Context, cutoff time.Time) (int64, error) {
|
|
result, err := d.sql.ExecContext(ctx, `DELETE FROM events WHERE ts < $1`, cutoff)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.RowsAffected()
|
|
}
|
|
|
|
var ErrMissingField = errors.New("missing required field")
|