Files
agentmon/internal/store/postgres/runs.go
T
2026-03-26 11:22:34 -07:00

363 lines
8.7 KiB
Go

package postgres
import (
"context"
"encoding/json"
"time"
)
type RunRow struct {
RunID string `json:"run_id"`
SessionID string `json:"session_id"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Status string `json:"status"`
SpanCount int `json:"span_count"`
ToolCount int `json:"tool_count"`
Model string `json:"model,omitempty"`
Spans []SpanRow `json:"spans,omitempty"`
}
type SessionDetail struct {
SessionID string `json:"session_id"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Framework string `json:"framework"`
Host string `json:"host"`
}
func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*SessionDetail, []RunRow, error) {
// Get session info
sessionQuery := `
SELECT
session_id,
MIN(ts) as started_at,
MAX(CASE WHEN type = 'session.end' THEN ts END) as ended_at,
MAX(source_framework) as framework,
MAX(payload->'event'->'source'->>'host') as host
FROM events
WHERE session_id = $1
GROUP BY session_id
`
var session SessionDetail
var host *string
err := d.sql.QueryRowContext(ctx, sessionQuery, sessionID).Scan(
&session.SessionID, &session.StartedAt, &session.EndedAt, &session.Framework, &host,
)
if err != nil {
return nil, nil, err
}
if host != nil {
session.Host = *host
}
// Get runs
runsQuery := `
SELECT
run_id,
session_id,
MIN(ts) as started_at,
MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at,
CASE
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
ELSE 'success'
END as status,
COUNT(DISTINCT span_id) as span_count,
COUNT(DISTINCT CASE WHEN payload->'attributes'->>'span_kind' = 'tool' THEN span_id END) as tool_count,
COALESCE(
MAX(CASE WHEN type = 'run.end' THEN payload->'payload'->>'model' END),
MAX(CASE WHEN type = 'metric.snapshot' THEN payload->'payload'->'metrics'->>'model' END),
''
) as model
FROM events
WHERE session_id = $1 AND run_id IS NOT NULL
GROUP BY run_id, session_id
ORDER BY started_at ASC
`
rows, err := d.sql.QueryContext(ctx, runsQuery, sessionID)
if err != nil {
return nil, nil, err
}
defer rows.Close()
var runs []RunRow
for rows.Next() {
var r RunRow
if err := rows.Scan(&r.RunID, &r.SessionID, &r.StartedAt, &r.EndedAt, &r.Status, &r.SpanCount, &r.ToolCount, &r.Model); err != nil {
return nil, nil, err
}
runs = append(runs, r)
}
if err := rows.Err(); err != nil {
return nil, nil, err
}
runs, err = d.attachSpansToRuns(ctx, sessionID, runs)
if err != nil {
return nil, nil, err
}
return &session, runs, nil
}
type SpanRow struct {
RunID string `json:"run_id,omitempty"`
SpanID string `json:"span_id"`
Name string `json:"name"`
Kind string `json:"kind"`
StartedAt time.Time `json:"started_at"`
Duration *int64 `json:"duration_ms,omitempty"`
Status string `json:"status"`
Payload json.RawMessage `json:"payload"`
}
type RunDetail struct {
RunID string `json:"run_id"`
SessionID string `json:"session_id"`
StartedAt time.Time `json:"started_at"`
EndedAt *time.Time `json:"ended_at,omitempty"`
Status string `json:"status"`
}
func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []SpanRow, error) {
// Get run info
runQuery := `
SELECT
run_id,
session_id,
MIN(ts) as started_at,
MAX(CASE WHEN type = 'run.end' THEN ts END) as ended_at,
CASE
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
ELSE 'success'
END as status
FROM events
WHERE run_id = $1
GROUP BY run_id, session_id
`
var run RunDetail
err := d.sql.QueryRowContext(ctx, runQuery, runID).Scan(
&run.RunID, &run.SessionID, &run.StartedAt, &run.EndedAt, &run.Status,
)
if err != nil {
return nil, nil, err
}
spans, err := d.listSpansForRun(ctx, runID)
if err != nil {
return nil, nil, err
}
return &run, spans, nil
}
func mergeSpanEvent(existing *SpanRow, s SpanRow) {
if existing.Name == "" && s.Name != "" {
existing.Name = s.Name
}
if existing.Kind == "" || existing.Kind == "unknown" {
existing.Kind = s.Kind
}
if s.Duration != nil {
existing.Duration = s.Duration
}
if s.Status == "error" {
existing.Status = "error"
}
existing.Payload = mergeEnvelopeJSON(existing.Payload, s.Payload)
}
func mergeEnvelopeJSON(existing, next json.RawMessage) json.RawMessage {
if len(existing) == 0 {
return next
}
if len(next) == 0 {
return existing
}
var dst map[string]any
if err := json.Unmarshal(existing, &dst); err != nil {
return next
}
var src map[string]any
if err := json.Unmarshal(next, &src); err != nil {
return existing
}
mergeJSONObjects(dst, src)
merged, err := json.Marshal(dst)
if err != nil {
return next
}
return merged
}
func mergeJSONObjects(dst, src map[string]any) {
for key, value := range src {
srcMap, srcOK := value.(map[string]any)
if !srcOK {
dst[key] = value
continue
}
dstMap, dstOK := dst[key].(map[string]any)
if !dstOK {
dst[key] = srcMap
continue
}
mergeJSONObjects(dstMap, srcMap)
}
}
func findRunIndexForSpan(runs []RunRow, spanStartedAt time.Time) int {
for i := len(runs) - 1; i >= 0; i-- {
run := runs[i]
if spanStartedAt.Before(run.StartedAt) {
continue
}
if run.EndedAt == nil || !spanStartedAt.After(*run.EndedAt) {
return i
}
}
return -1
}
func (d *DB) listSpansForRun(ctx context.Context, runID string) ([]SpanRow, error) {
rows, err := d.sql.QueryContext(ctx, `
SELECT
run_id,
span_id,
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
ts as started_at,
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
CASE
WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error'
ELSE 'success'
END as status,
payload
FROM events
WHERE run_id = $1 AND span_id IS NOT NULL
ORDER BY ts ASC
`, runID)
if err != nil {
return nil, err
}
defer rows.Close()
spansByID := make(map[string]*SpanRow)
var spanOrder []string
for rows.Next() {
var s SpanRow
if err := rows.Scan(&s.RunID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
return nil, err
}
existing := spansByID[s.SpanID]
if existing == nil {
copy := s
spansByID[s.SpanID] = &copy
spanOrder = append(spanOrder, s.SpanID)
continue
}
mergeSpanEvent(existing, s)
}
if err := rows.Err(); err != nil {
return nil, err
}
spans := make([]SpanRow, 0, len(spanOrder))
for _, spanID := range spanOrder {
spans = append(spans, *spansByID[spanID])
}
return spans, nil
}
func (d *DB) attachSpansToRuns(ctx context.Context, sessionID string, runs []RunRow) ([]RunRow, error) {
if len(runs) == 0 {
return runs, nil
}
rows, err := d.sql.QueryContext(ctx, `
SELECT
run_id,
span_id,
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
ts as started_at,
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
CASE WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error' ELSE 'success' END as status,
payload
FROM events
WHERE session_id = $1 AND span_id IS NOT NULL
ORDER BY ts ASC
`, sessionID)
if err != nil {
return nil, err
}
defer rows.Close()
// Map of run_id -> (map of span_id -> *SpanRow) for merging
type runSpans struct {
byID map[string]*SpanRow
order []string
}
spansByRun := make(map[string]*runSpans)
for rows.Next() {
var s SpanRow
var runID *string
if err := rows.Scan(&runID, &s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
return nil, err
}
if runID != nil {
s.RunID = *runID
}
if s.RunID == "" {
runIndex := findRunIndexForSpan(runs, s.StartedAt)
if runIndex == -1 {
continue
}
s.RunID = runs[runIndex].RunID
}
rs := spansByRun[s.RunID]
if rs == nil {
rs = &runSpans{byID: make(map[string]*SpanRow)}
spansByRun[s.RunID] = rs
}
existing := rs.byID[s.SpanID]
if existing == nil {
copy := s
rs.byID[s.SpanID] = &copy
rs.order = append(rs.order, s.SpanID)
continue
}
mergeSpanEvent(existing, s)
}
if err := rows.Err(); err != nil {
return nil, err
}
for i := range runs {
rs := spansByRun[runs[i].RunID]
if rs == nil {
continue
}
spans := make([]SpanRow, 0, len(rs.order))
for _, spanID := range rs.order {
spans = append(spans, *rs.byID[spanID])
}
runs[i].Spans = spans
}
return runs, nil
}