feat: add session and run detail queries
GetSessionWithRuns returns session metadata and all runs. GetRunWithSpans returns run metadata and all spans with payload. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,161 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RunRow struct {
|
||||
RunID string `json:"run_id"`
|
||||
SessionID string `json:"session_id"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
EndedAt *time.Time `json:"ended_at,omitempty"`
|
||||
Status string `json:"status"`
|
||||
SpanCount int `json:"span_count"`
|
||||
}
|
||||
|
||||
type SessionDetail struct {
|
||||
SessionID string `json:"session_id"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
EndedAt *time.Time `json:"ended_at,omitempty"`
|
||||
Framework string `json:"framework"`
|
||||
Host string `json:"host"`
|
||||
}
|
||||
|
||||
func (d *DB) GetSessionWithRuns(ctx context.Context, sessionID string) (*SessionDetail, []RunRow, error) {
|
||||
// Get session info
|
||||
sessionQuery := `
|
||||
SELECT
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
MAX(source_framework) as framework,
|
||||
MAX(payload->'event'->'source'->>'host') as host
|
||||
FROM events
|
||||
WHERE session_id = $1
|
||||
GROUP BY session_id
|
||||
`
|
||||
var session SessionDetail
|
||||
var host *string
|
||||
err := d.sql.QueryRowContext(ctx, sessionQuery, sessionID).Scan(
|
||||
&session.SessionID, &session.StartedAt, &session.EndedAt, &session.Framework, &host,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if host != nil {
|
||||
session.Host = *host
|
||||
}
|
||||
|
||||
// Get runs
|
||||
runsQuery := `
|
||||
SELECT
|
||||
run_id,
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
CASE
|
||||
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
|
||||
ELSE 'success'
|
||||
END as status,
|
||||
COUNT(DISTINCT span_id) as span_count
|
||||
FROM events
|
||||
WHERE session_id = $1 AND run_id IS NOT NULL
|
||||
GROUP BY run_id, session_id
|
||||
ORDER BY started_at ASC
|
||||
`
|
||||
rows, err := d.sql.QueryContext(ctx, runsQuery, sessionID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var runs []RunRow
|
||||
for rows.Next() {
|
||||
var r RunRow
|
||||
if err := rows.Scan(&r.RunID, &r.SessionID, &r.StartedAt, &r.EndedAt, &r.Status, &r.SpanCount); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
runs = append(runs, r)
|
||||
}
|
||||
|
||||
return &session, runs, rows.Err()
|
||||
}
|
||||
|
||||
type SpanRow struct {
|
||||
SpanID string `json:"span_id"`
|
||||
Name string `json:"name"`
|
||||
Kind string `json:"kind"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
Duration *int64 `json:"duration_ms,omitempty"`
|
||||
Status string `json:"status"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
}
|
||||
|
||||
type RunDetail struct {
|
||||
RunID string `json:"run_id"`
|
||||
SessionID string `json:"session_id"`
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
EndedAt *time.Time `json:"ended_at,omitempty"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
func (d *DB) GetRunWithSpans(ctx context.Context, runID string) (*RunDetail, []SpanRow, error) {
|
||||
// Get run info
|
||||
runQuery := `
|
||||
SELECT
|
||||
run_id,
|
||||
session_id,
|
||||
MIN(ts) as started_at,
|
||||
MAX(ts) as ended_at,
|
||||
CASE
|
||||
WHEN bool_or(type = 'error' OR payload->'payload'->>'status' = 'error') THEN 'error'
|
||||
ELSE 'success'
|
||||
END as status
|
||||
FROM events
|
||||
WHERE run_id = $1
|
||||
GROUP BY run_id, session_id
|
||||
`
|
||||
var run RunDetail
|
||||
err := d.sql.QueryRowContext(ctx, runQuery, runID).Scan(
|
||||
&run.RunID, &run.SessionID, &run.StartedAt, &run.EndedAt, &run.Status,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Get spans
|
||||
spansQuery := `
|
||||
SELECT
|
||||
span_id,
|
||||
COALESCE(payload->'attributes'->>'name', payload->'event'->>'type', type) as name,
|
||||
COALESCE(payload->'attributes'->>'span_kind', 'unknown') as kind,
|
||||
ts as started_at,
|
||||
(payload->'payload'->>'duration_ms')::bigint as duration_ms,
|
||||
CASE
|
||||
WHEN type = 'error' OR payload->'payload'->>'status' = 'error' THEN 'error'
|
||||
ELSE 'success'
|
||||
END as status,
|
||||
payload
|
||||
FROM events
|
||||
WHERE run_id = $1 AND span_id IS NOT NULL
|
||||
ORDER BY ts ASC
|
||||
`
|
||||
rows, err := d.sql.QueryContext(ctx, spansQuery, runID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var spans []SpanRow
|
||||
for rows.Next() {
|
||||
var s SpanRow
|
||||
if err := rows.Scan(&s.SpanID, &s.Name, &s.Kind, &s.StartedAt, &s.Duration, &s.Status, &s.Payload); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
spans = append(spans, s)
|
||||
}
|
||||
|
||||
return &run, spans, rows.Err()
|
||||
}
|
||||
Reference in New Issue
Block a user