diff --git a/internal/store/postgres/sessions.go b/internal/store/postgres/sessions.go new file mode 100644 index 0000000..053e07f --- /dev/null +++ b/internal/store/postgres/sessions.go @@ -0,0 +1,118 @@ +package postgres + +import ( + "context" + "fmt" + "strings" + "time" +) + +type SessionRow struct { + SessionID string `json:"session_id"` + StartedAt time.Time `json:"started_at"` + EndedAt *time.Time `json:"ended_at,omitempty"` + Framework string `json:"framework"` + Host string `json:"host"` + RunCount int `json:"run_count"` +} + +type SessionsFilter struct { + From *time.Time + To *time.Time + Framework string + Host string + Limit int + Cursor *time.Time // cursor is the last started_at seen +} + +func (d *DB) ListSessions(ctx context.Context, f SessionsFilter) ([]SessionRow, *time.Time, error) { + if f.Limit <= 0 { + f.Limit = 50 + } + if f.Limit > 200 { + f.Limit = 200 + } + + // Build query dynamically + var conditions []string + var args []any + argN := 1 + + // Default time range: last 24h + if f.From == nil { + t := time.Now().Add(-24 * time.Hour) + f.From = &t + } + conditions = append(conditions, fmt.Sprintf("ts >= $%d", argN)) + args = append(args, *f.From) + argN++ + + if f.To != nil { + conditions = append(conditions, fmt.Sprintf("ts <= $%d", argN)) + args = append(args, *f.To) + argN++ + } + + if f.Framework != "" { + conditions = append(conditions, fmt.Sprintf("source_framework = $%d", argN)) + args = append(args, f.Framework) + argN++ + } + + if f.Host != "" { + conditions = append(conditions, fmt.Sprintf("payload->'event'->'source'->>'host' = $%d", argN)) + args = append(args, f.Host) + argN++ + } + + if f.Cursor != nil { + conditions = append(conditions, fmt.Sprintf("ts < $%d", argN)) + args = append(args, *f.Cursor) + argN++ + } + + where := strings.Join(conditions, " AND ") + + query := fmt.Sprintf(` + SELECT + session_id, + MIN(ts) as started_at, + MAX(ts) as ended_at, + MAX(source_framework) as framework, + MAX(payload->'event'->'source'->>'host') as host, + COUNT(DISTINCT run_id) as run_count + FROM events + WHERE session_id IS NOT NULL AND %s + GROUP BY session_id + ORDER BY started_at DESC + LIMIT $%d + `, where, argN) + args = append(args, f.Limit+1) // fetch one extra to detect next page + + rows, err := d.sql.QueryContext(ctx, query, args...) + if err != nil { + return nil, nil, err + } + defer rows.Close() + + var out []SessionRow + for rows.Next() { + var r SessionRow + var host *string + if err := rows.Scan(&r.SessionID, &r.StartedAt, &r.EndedAt, &r.Framework, &host, &r.RunCount); err != nil { + return nil, nil, err + } + if host != nil { + r.Host = *host + } + out = append(out, r) + } + + var nextCursor *time.Time + if len(out) > f.Limit { + out = out[:f.Limit] + nextCursor = &out[len(out)-1].StartedAt + } + + return out, nextCursor, rows.Err() +}