package postgres import ( "context" "time" ) type FrameworkStats struct { ActiveSessions int `json:"active_sessions"` Runs int `json:"runs"` Tools int `json:"tools"` Errors int `json:"errors"` } type Summary struct { ActiveSessions int `json:"active_sessions"` RunsToday int `json:"runs_today"` ToolCallsToday int `json:"tool_calls_today"` ErrorsToday int `json:"errors_today"` TokensToday int64 `json:"tokens_today"` CostToday float64 `json:"cost_today"` AvgDurationMS float64 `json:"avg_duration_ms"` ByFramework map[string]FrameworkStats `json:"by_framework"` } type TimeseriesBucket struct { TS time.Time `json:"ts"` Runs int `json:"runs"` Tools int `json:"tools"` Errors int `json:"errors"` Tokens int64 `json:"tokens"` InputTokens int64 `json:"input_tokens"` OutputTokens int64 `json:"output_tokens"` Cost float64 `json:"cost"` AvgDurationMS float64 `json:"avg_duration_ms"` } type TimeseriesResult struct { Window string `json:"window"` Bucket string `json:"bucket"` Series []TimeseriesBucket `json:"series"` } func (d *DB) GetSummary(ctx context.Context) (*Summary, error) { now := time.Now() midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) // Active sessions are open sessions with recent activity. Some hook sources can // miss session.end, so "started but never ended" alone overstates live work. activeQ := ` WITH session_groups AS ( SELECT session_id, COALESCE((ARRAY_AGG(source_framework ORDER BY CASE WHEN type = 'session.start' THEN 0 ELSE 1 END, ts) FILTER (WHERE source_framework IS NOT NULL))[1], 'unknown') AS framework, MAX(ts) AS last_event_at, BOOL_OR(type = 'session.start') AS has_start, BOOL_OR(type = 'session.end') AS has_end FROM events WHERE session_id IS NOT NULL GROUP BY session_id ) SELECT framework, COUNT(*) FROM session_groups WHERE has_start AND NOT has_end AND last_event_at >= $1 GROUP BY framework ` activeRows, err := d.sql.QueryContext(ctx, activeQ, now.Add(-15*time.Minute)) if err != nil { return nil, err } defer activeRows.Close() byFramework := make(map[string]FrameworkStats) var activeSessions int for activeRows.Next() { var fw string var count int if err := activeRows.Scan(&fw, &count); err != nil { return nil, err } fs := byFramework[fw] fs.ActiveSessions = count byFramework[fw] = fs activeSessions += count } if err := activeRows.Err(); err != nil { return nil, err } // Per-framework aggregates for today fwQ := ` SELECT COALESCE(source_framework, 'unknown'), COUNT(*) FILTER (WHERE type = 'run.start') AS runs, COUNT(*) FILTER (WHERE type = 'span.end' AND payload->'attributes'->>'span_kind' = 'tool') AS tools, COUNT(*) FILTER (WHERE type = 'error') AS errors FROM events WHERE ts >= $1 AND type IN ('run.start', 'span.end', 'error') GROUP BY source_framework ` rows, err := d.sql.QueryContext(ctx, fwQ, midnight) if err != nil { return nil, err } defer rows.Close() var totalRuns, totalTools, totalErrors int for rows.Next() { var fw string var fs FrameworkStats if err := rows.Scan(&fw, &fs.Runs, &fs.Tools, &fs.Errors); err != nil { return nil, err } if existing, ok := byFramework[fw]; ok { fs.ActiveSessions = existing.ActiveSessions } byFramework[fw] = fs totalRuns += fs.Runs totalTools += fs.Tools totalErrors += fs.Errors } if err := rows.Err(); err != nil { return nil, err } // Usage stats for today (tokens, cost, avg latency) usageQ := ` SELECT COALESCE(SUM((payload->'payload'->'usage'->>'total_tokens')::bigint), 0), COALESCE(SUM((payload->'payload'->'usage'->>'total_cost')::float8), 0), COALESCE(AVG((payload->'payload'->>'duration_ms')::float8), 0) FROM events WHERE type = 'run.end' AND ts >= $1 ` var tokensToday int64 var costToday, avgDurationMS float64 if err := d.sql.QueryRowContext(ctx, usageQ, midnight).Scan(&tokensToday, &costToday, &avgDurationMS); err != nil { return nil, err } return &Summary{ ActiveSessions: activeSessions, RunsToday: totalRuns, ToolCallsToday: totalTools, ErrorsToday: totalErrors, TokensToday: tokensToday, CostToday: costToday, AvgDurationMS: avgDurationMS, ByFramework: byFramework, }, nil } type TopTool struct { Name string `json:"name"` Count int `json:"count"` } type TopModel struct { Name string `json:"name"` Count int `json:"count"` } func (d *DB) GetTopTools(ctx context.Context, limit int) ([]TopTool, error) { if limit <= 0 { limit = 10 } now := time.Now() midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) q := ` SELECT payload->'attributes'->>'name' AS tool_name, COUNT(*) AS cnt FROM events WHERE type = 'span.end' AND payload->'attributes'->>'span_kind' = 'tool' AND payload->'attributes'->>'name' IS NOT NULL AND ts >= $1 GROUP BY tool_name ORDER BY cnt DESC LIMIT $2 ` rows, err := d.sql.QueryContext(ctx, q, midnight, limit) if err != nil { return nil, err } defer rows.Close() var out []TopTool for rows.Next() { var t TopTool if err := rows.Scan(&t.Name, &t.Count); err != nil { return nil, err } out = append(out, t) } return out, rows.Err() } func (d *DB) GetTopModels(ctx context.Context, limit int) ([]TopModel, error) { if limit <= 0 { limit = 10 } now := time.Now() midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) q := ` SELECT payload->'payload'->>'model' AS model_name, COUNT(*) AS cnt FROM events WHERE type = 'run.end' AND payload->'payload'->>'model' IS NOT NULL AND payload->'payload'->>'model' <> '' AND ts >= $1 GROUP BY model_name ORDER BY cnt DESC LIMIT $2 ` rows, err := d.sql.QueryContext(ctx, q, midnight, limit) if err != nil { return nil, err } defer rows.Close() var out []TopModel for rows.Next() { var m TopModel if err := rows.Scan(&m.Name, &m.Count); err != nil { return nil, err } out = append(out, m) } return out, rows.Err() } func bucketForWindow(window string) string { switch window { case "1h": return "1 minute" case "6h": return "5 minutes" case "7d": return "1 hour" default: // "24h" return "15 minutes" } } func durationForWindow(window string) time.Duration { switch window { case "1h": return 1 * time.Hour case "6h": return 6 * time.Hour case "7d": return 7 * 24 * time.Hour default: // "24h" return 24 * time.Hour } } func bucketLabelForWindow(window string) string { switch window { case "1h": return "1m" case "6h": return "5m" case "7d": return "1h" default: // "24h" return "15m" } } func (d *DB) GetTimeseries(ctx context.Context, window string) (*TimeseriesResult, error) { bucket := bucketForWindow(window) dur := durationForWindow(window) since := time.Now().Add(-dur) q := ` SELECT date_bin($1::interval, ts, '2000-01-01'::timestamptz) AS bucket_ts, COUNT(*) FILTER (WHERE type = 'run.start') AS runs, COUNT(*) FILTER (WHERE type = 'span.end' AND payload->'attributes'->>'span_kind' = 'tool') AS tools, COUNT(*) FILTER (WHERE type = 'error') AS errors, COALESCE(SUM((payload->'payload'->'usage'->>'total_tokens')::bigint) FILTER (WHERE type = 'run.end'), 0) AS tokens, COALESCE(SUM((payload->'payload'->'usage'->>'input_tokens')::bigint) FILTER (WHERE type = 'run.end'), 0) AS input_tokens, COALESCE(SUM((payload->'payload'->'usage'->>'output_tokens')::bigint) FILTER (WHERE type = 'run.end'), 0) AS output_tokens, COALESCE(SUM((payload->'payload'->'usage'->>'total_cost')::float8) FILTER (WHERE type = 'run.end'), 0) AS cost, COALESCE(AVG((payload->'payload'->>'duration_ms')::float8) FILTER (WHERE type = 'run.end'), 0) AS avg_duration_ms FROM events WHERE ts >= $2 AND type IN ('run.start', 'run.end', 'span.end', 'error') GROUP BY bucket_ts ORDER BY bucket_ts ASC ` rows, err := d.sql.QueryContext(ctx, q, bucket, since) if err != nil { return nil, err } defer rows.Close() var series []TimeseriesBucket for rows.Next() { var b TimeseriesBucket if err := rows.Scan(&b.TS, &b.Runs, &b.Tools, &b.Errors, &b.Tokens, &b.InputTokens, &b.OutputTokens, &b.Cost, &b.AvgDurationMS); err != nil { return nil, err } series = append(series, b) } if err := rows.Err(); err != nil { return nil, err } return &TimeseriesResult{ Window: window, Bucket: bucketLabelForWindow(window), Series: series, }, nil }