Add Claude integration to dashboard
Add comprehensive Claude Code monitoring and realtime streaming to the K8s dashboard. Includes API endpoints for health, stats, summary, inventory, and live event streaming. Frontend provides overview, usage, inventory, debug, and live feed views.
This commit is contained in:
85
dashboard/internal/claude/eventhub.go
Normal file
85
dashboard/internal/claude/eventhub.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type EventHub struct {
|
||||
mu sync.RWMutex
|
||||
buffer []Event
|
||||
nextID int64
|
||||
subscribers []chan Event
|
||||
bufferSize int
|
||||
}
|
||||
|
||||
func NewEventHub(bufferSize int) *EventHub {
|
||||
return &EventHub{
|
||||
buffer: make([]Event, 0, bufferSize),
|
||||
subscribers: make([]chan Event, 0),
|
||||
bufferSize: bufferSize,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *EventHub) Publish(ev Event) Event {
|
||||
if ev.ID == 0 {
|
||||
ev.ID = atomic.AddInt64(&h.nextID, 1)
|
||||
}
|
||||
if ev.TS.IsZero() {
|
||||
ev.TS = time.Now()
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if len(h.buffer) >= h.bufferSize {
|
||||
h.buffer = h.buffer[1:]
|
||||
}
|
||||
h.buffer = append(h.buffer, ev)
|
||||
|
||||
for _, ch := range h.subscribers {
|
||||
select {
|
||||
case ch <- ev:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
return ev
|
||||
}
|
||||
|
||||
func (h *EventHub) Subscribe() (chan Event, func()) {
|
||||
ch := make(chan Event, 10)
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
h.subscribers = append(h.subscribers, ch)
|
||||
|
||||
cancel := func() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
for i, c := range h.subscribers {
|
||||
if c == ch {
|
||||
h.subscribers = append(h.subscribers[:i], h.subscribers[i+1:]...)
|
||||
close(ch)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ch, cancel
|
||||
}
|
||||
|
||||
func (h *EventHub) ReplaySince(lastID int64) []Event {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
var result []Event
|
||||
for _, ev := range h.buffer {
|
||||
if ev.ID > lastID {
|
||||
result = append(result, ev)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
41
dashboard/internal/claude/eventhub_test.go
Normal file
41
dashboard/internal/claude/eventhub_test.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEventHub_PublishSubscribe(t *testing.T) {
|
||||
hub := NewEventHub(10)
|
||||
ch, cancel := hub.Subscribe()
|
||||
defer cancel()
|
||||
|
||||
hub.Publish(Event{TS: time.Unix(1, 0), Type: EventTypeServerNotice, Data: map[string]any{"msg": "hi"}})
|
||||
|
||||
select {
|
||||
case ev := <-ch:
|
||||
if ev.Type != EventTypeServerNotice {
|
||||
t.Fatalf("type=%s", ev.Type)
|
||||
}
|
||||
if ev.ID == 0 {
|
||||
t.Fatalf("expected id to be assigned")
|
||||
}
|
||||
default:
|
||||
t.Fatalf("expected event")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventHub_ReplaySince(t *testing.T) {
|
||||
hub := NewEventHub(3)
|
||||
hub.Publish(Event{TS: time.Unix(1, 0), Type: EventTypeServerNotice}) // id 1
|
||||
hub.Publish(Event{TS: time.Unix(2, 0), Type: EventTypeServerNotice}) // id 2
|
||||
hub.Publish(Event{TS: time.Unix(3, 0), Type: EventTypeServerNotice}) // id 3
|
||||
|
||||
got := hub.ReplaySince(1)
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("len=%d", len(got))
|
||||
}
|
||||
if got[0].ID != 2 || got[1].ID != 3 {
|
||||
t.Fatalf("ids=%d,%d", got[0].ID, got[1].ID)
|
||||
}
|
||||
}
|
||||
19
dashboard/internal/claude/events.go
Normal file
19
dashboard/internal/claude/events.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package claude
|
||||
|
||||
import "time"
|
||||
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
EventTypeHistoryAppend EventType = "history.append"
|
||||
EventTypeFileChanged EventType = "file.changed"
|
||||
EventTypeServerNotice EventType = "server.notice"
|
||||
EventTypeServerError EventType = "server.error"
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
ID int64 `json:"id"`
|
||||
TS time.Time `json:"ts"`
|
||||
Type EventType `json:"type"`
|
||||
Data any `json:"data"`
|
||||
}
|
||||
11
dashboard/internal/claude/events_test.go
Normal file
11
dashboard/internal/claude/events_test.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package claude
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestEventTypesCompile(t *testing.T) {
|
||||
_ = Event{}
|
||||
_ = EventTypeHistoryAppend
|
||||
_ = EventTypeFileChanged
|
||||
_ = EventTypeServerNotice
|
||||
_ = EventTypeServerError
|
||||
}
|
||||
105
dashboard/internal/claude/history_tailer.go
Normal file
105
dashboard/internal/claude/history_tailer.go
Normal file
@@ -0,0 +1,105 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TailHistoryFile(stop <-chan struct{}, hub *EventHub, path string) {
|
||||
var offset int64
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
hub.Publish(Event{
|
||||
TS: time.Now(),
|
||||
Type: EventTypeServerError,
|
||||
Data: map[string]any{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
size := stat.Size()
|
||||
if size > offset {
|
||||
if err := processNewBytes(path, offset, size, hub); err != nil {
|
||||
hub.Publish(Event{
|
||||
TS: time.Now(),
|
||||
Type: EventTypeServerError,
|
||||
Data: map[string]any{"error": err.Error()},
|
||||
})
|
||||
}
|
||||
offset = size
|
||||
} else if size < offset {
|
||||
offset = 0
|
||||
hub.Publish(Event{
|
||||
TS: time.Now(),
|
||||
Type: EventTypeServerNotice,
|
||||
Data: map[string]any{"msg": "file truncated, resetting offset"},
|
||||
})
|
||||
}
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func processNewBytes(path string, oldSize, newSize int64, hub *EventHub) error {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
if _, err := f.Seek(oldSize, 0); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
data := map[string]any{
|
||||
"rawLine": line,
|
||||
}
|
||||
|
||||
var jsonData map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &jsonData); err != nil {
|
||||
data["parseError"] = err.Error()
|
||||
} else {
|
||||
data["json"] = jsonData
|
||||
|
||||
summary := map[string]string{}
|
||||
if v, ok := jsonData["sessionId"].(string); ok {
|
||||
summary["sessionId"] = v
|
||||
}
|
||||
if v, ok := jsonData["project"].(string); ok {
|
||||
summary["project"] = v
|
||||
}
|
||||
if v, ok := jsonData["display"].(string); ok {
|
||||
summary["display"] = v
|
||||
}
|
||||
data["summary"] = summary
|
||||
}
|
||||
|
||||
hub.Publish(Event{
|
||||
TS: time.Now(),
|
||||
Type: EventTypeHistoryAppend,
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
|
||||
return scanner.Err()
|
||||
}
|
||||
40
dashboard/internal/claude/history_tailer_test.go
Normal file
40
dashboard/internal/claude/history_tailer_test.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestHistoryTailer_EmitsOnAppend(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
p := filepath.Join(dir, "history.jsonl")
|
||||
if err := os.WriteFile(p, []byte(""), 0o600); err != nil {
|
||||
t.Fatalf("write: %v", err)
|
||||
}
|
||||
|
||||
hub := NewEventHub(10)
|
||||
ch, cancel := hub.Subscribe()
|
||||
defer cancel()
|
||||
|
||||
stop := make(chan struct{})
|
||||
go TailHistoryFile(stop, hub, p)
|
||||
|
||||
time.Sleep(600 * time.Millisecond)
|
||||
|
||||
if err := os.WriteFile(p, []byte("{\"display\":\"/status\"}\n"), 0o600); err != nil {
|
||||
t.Fatalf("append: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case ev := <-ch:
|
||||
if ev.Type != EventTypeHistoryAppend {
|
||||
t.Fatalf("type=%s", ev.Type)
|
||||
}
|
||||
case <-time.After(700 * time.Millisecond):
|
||||
t.Fatalf("timed out waiting for event")
|
||||
}
|
||||
|
||||
close(stop)
|
||||
}
|
||||
100
dashboard/internal/claude/loader.go
Normal file
100
dashboard/internal/claude/loader.go
Normal file
@@ -0,0 +1,100 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Loader reads Claude Code state files from a local claude directory (typically ~/.claude).
|
||||
//
|
||||
// Keep this minimal for now; more helpers (e.g. ListDir / FileInfo) can be added later.
|
||||
type Loader struct {
|
||||
claudeDir string
|
||||
}
|
||||
|
||||
type DirEntry struct {
|
||||
Name string `json:"name"`
|
||||
IsDir bool `json:"isDir"`
|
||||
}
|
||||
|
||||
type FileMeta struct {
|
||||
Path string `json:"path"`
|
||||
Exists bool `json:"exists"`
|
||||
Size int64 `json:"size"`
|
||||
ModTime string `json:"modTime"`
|
||||
}
|
||||
|
||||
func NewLoader(claudeDir string) *Loader {
|
||||
return &Loader{claudeDir: claudeDir}
|
||||
}
|
||||
|
||||
func (l *Loader) ClaudeDir() string { return l.claudeDir }
|
||||
|
||||
func (l *Loader) LoadStatsCache() (*StatsCache, error) {
|
||||
if l.claudeDir == "" {
|
||||
return nil, fmt.Errorf("claude dir is empty")
|
||||
}
|
||||
|
||||
p := filepath.Join(l.claudeDir, "stats-cache.json")
|
||||
b, err := os.ReadFile(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read stats cache %q: %w", p, err)
|
||||
}
|
||||
|
||||
var stats StatsCache
|
||||
if err := json.Unmarshal(b, &stats); err != nil {
|
||||
return nil, fmt.Errorf("parse stats cache %q: %w", p, err)
|
||||
}
|
||||
|
||||
return &stats, nil
|
||||
}
|
||||
|
||||
func (l *Loader) ListDir(name string) ([]DirEntry, error) {
|
||||
if l.claudeDir == "" {
|
||||
return nil, fmt.Errorf("claude dir is empty")
|
||||
}
|
||||
|
||||
entries, err := os.ReadDir(filepath.Join(l.claudeDir, name))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read dir %q: %w", name, err)
|
||||
}
|
||||
|
||||
out := make([]DirEntry, 0, len(entries))
|
||||
for _, e := range entries {
|
||||
out = append(out, DirEntry{Name: e.Name(), IsDir: e.IsDir()})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (l *Loader) PathExists(relPath string) bool {
|
||||
if l.claudeDir == "" {
|
||||
return false
|
||||
}
|
||||
_, err := os.Stat(filepath.Join(l.claudeDir, relPath))
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (l *Loader) FileMeta(relPath string) (FileMeta, error) {
|
||||
if l.claudeDir == "" {
|
||||
return FileMeta{}, fmt.Errorf("claude dir is empty")
|
||||
}
|
||||
|
||||
p := filepath.Join(l.claudeDir, relPath)
|
||||
st, err := os.Stat(p)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return FileMeta{Path: relPath, Exists: false}, nil
|
||||
}
|
||||
return FileMeta{}, fmt.Errorf("stat %q: %w", p, err)
|
||||
}
|
||||
|
||||
return FileMeta{
|
||||
Path: relPath,
|
||||
Exists: true,
|
||||
Size: st.Size(),
|
||||
ModTime: st.ModTime().UTC().Format(time.RFC3339),
|
||||
}, nil
|
||||
}
|
||||
25
dashboard/internal/claude/loader_test.go
Normal file
25
dashboard/internal/claude/loader_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestLoadStatsCache(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
p := filepath.Join(dir, "stats-cache.json")
|
||||
err := os.WriteFile(p, []byte(`{"version":1,"lastComputedDate":"2025-12-31","totalSessions":1,"totalMessages":2}`), 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("WriteFile: %v", err)
|
||||
}
|
||||
|
||||
loader := NewLoader(dir)
|
||||
stats, err := loader.LoadStatsCache()
|
||||
if err != nil {
|
||||
t.Fatalf("LoadStatsCache: %v", err)
|
||||
}
|
||||
if stats.TotalSessions != 1 {
|
||||
t.Fatalf("TotalSessions=%d", stats.TotalSessions)
|
||||
}
|
||||
}
|
||||
33
dashboard/internal/claude/models.go
Normal file
33
dashboard/internal/claude/models.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package claude
|
||||
|
||||
type DailyActivity struct {
|
||||
Date string `json:"date"`
|
||||
MessageCount int `json:"messageCount"`
|
||||
SessionCount int `json:"sessionCount"`
|
||||
ToolCallCount int `json:"toolCallCount"`
|
||||
}
|
||||
|
||||
type DailyModelTokens struct {
|
||||
Date string `json:"date"`
|
||||
TokensByModel map[string]int `json:"tokensByModel"`
|
||||
}
|
||||
|
||||
type ModelUsage struct {
|
||||
InputTokens int `json:"inputTokens"`
|
||||
OutputTokens int `json:"outputTokens"`
|
||||
CacheReadInputTokens int `json:"cacheReadInputTokens"`
|
||||
CacheCreationInputTokens int `json:"cacheCreationInputTokens"`
|
||||
WebSearchRequests int `json:"webSearchRequests"`
|
||||
CostUSD float64 `json:"costUSD"`
|
||||
ContextWindow int `json:"contextWindow"`
|
||||
}
|
||||
|
||||
type StatsCache struct {
|
||||
Version int `json:"version"`
|
||||
LastComputedDate string `json:"lastComputedDate"`
|
||||
DailyActivity []DailyActivity `json:"dailyActivity"`
|
||||
DailyModelTokens []DailyModelTokens `json:"dailyModelTokens"`
|
||||
ModelUsage map[string]ModelUsage `json:"modelUsage"`
|
||||
TotalSessions int `json:"totalSessions"`
|
||||
TotalMessages int `json:"totalMessages"`
|
||||
}
|
||||
9
dashboard/internal/claude/models_test.go
Normal file
9
dashboard/internal/claude/models_test.go
Normal file
@@ -0,0 +1,9 @@
|
||||
package claude
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestModelTypesCompile(t *testing.T) {
|
||||
_ = StatsCache{}
|
||||
_ = DailyActivity{}
|
||||
_ = ModelUsage{}
|
||||
}
|
||||
24
dashboard/internal/claude/tail.go
Normal file
24
dashboard/internal/claude/tail.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func TailLastNLines(path string, n int) ([]string, error) {
|
||||
content, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lines := strings.Split(string(content), "\n")
|
||||
|
||||
var result []string
|
||||
for i := len(lines) - 1; i >= 0 && len(result) < n; i-- {
|
||||
if lines[i] != "" {
|
||||
result = append(result, lines[i])
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
34
dashboard/internal/claude/tail_test.go
Normal file
34
dashboard/internal/claude/tail_test.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestTailLastNLines_NewestFirst(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
p := filepath.Join(dir, "history.jsonl")
|
||||
|
||||
var b strings.Builder
|
||||
for i := 1; i <= 5; i++ {
|
||||
b.WriteString("line")
|
||||
b.WriteString([]string{"1", "2", "3", "4", "5"}[i-1])
|
||||
b.WriteString("\n")
|
||||
}
|
||||
if err := os.WriteFile(p, []byte(b.String()), 0o600); err != nil {
|
||||
t.Fatalf("write: %v", err)
|
||||
}
|
||||
|
||||
lines, err := TailLastNLines(p, 2)
|
||||
if err != nil {
|
||||
t.Fatalf("TailLastNLines: %v", err)
|
||||
}
|
||||
if len(lines) != 2 {
|
||||
t.Fatalf("len=%d", len(lines))
|
||||
}
|
||||
if lines[0] != "line5" || lines[1] != "line4" {
|
||||
t.Fatalf("got=%v", lines)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user