Files
claude-code/docs/plans/2026-01-01-claude-realtime-monitoring-implementation.md
OpenCode Test e43e052a32 Add design plans for dashboard integration
Add implementation plans for morning report, Claude ops dashboard, and realtime monitoring features.
2026-01-03 10:55:22 -08:00

660 lines
16 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Claude Real-Time Monitoring (SSE) Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Add real-time-ish monitoring of Claude Code agent activity to the existing Go dashboard, with a backlog (last 200 history events) plus SSE updates, shown in a new “Live” UI feed with prettified rows and expandable raw JSON.
**Architecture:** Add an in-memory `EventHub` (pub/sub + ring buffer) that receives events from a `HistoryTailer` (tails `~/.claude/history.jsonl`) and a debounced file watcher (for `stats-cache.json` and `state/component-registry.json`). Expose a REST backlog endpoint (`/api/claude/live/backlog`) returning normalized `Event` objects (newest→oldest) and an SSE stream endpoint (`/api/claude/stream`) that pushes new events to the browser. Frontend uses `EventSource` plus batching (render every 12s).
**Tech Stack:** Go 1.21+, `chi`, vanilla HTML/CSS/JS, optional `fsnotify`.
---
## Decisions (locked)
- Transport: SSE first (WebSockets later).
- Acceptable latency: 25 seconds.
- UI: prettified table with expandable raw JSON.
- Backlog: enabled, default `limit=200`.
- Backlog ordering: **newest → oldest**.
- Parsing: **generic-first**, best-effort extraction of a few fields; always preserve raw JSON.
- Backlog response format: **normalized `events`** (not just raw lines).
---
## Data Contract
### Event JSON
All events share:
```json
{
"id": 123,
"ts": "2026-01-01T12:00:00Z",
"type": "history.append",
"data": {
"summary": {
"sessionId": "...",
"project": "...",
"display": "/model"
},
"rawLine": "{...}",
"json": { "...": "..." },
"parseError": "..."
}
}
```
Event types:
- `history.append`
- `file.changed`
- `server.notice`
- `server.error`
---
## Task 1: Add `Event` types
**Files:**
- Create: `~/.claude/dashboard/internal/claude/events.go`
- Test: `~/.claude/dashboard/internal/claude/events_test.go`
**Step 1: Write the failing test**
```go
package claude
import "testing"
func TestEventTypesCompile(t *testing.T) {
_ = Event{}
_ = EventTypeHistoryAppend
_ = EventTypeFileChanged
_ = EventTypeServerNotice
_ = EventTypeServerError
}
```
**Step 2: Run test to verify it fails**
Run: `go test ./...`
Expected: FAIL with `undefined: Event`
**Step 3: Write minimal implementation**
```go
package claude
import "time"
type EventType string
const (
EventTypeHistoryAppend EventType = "history.append"
EventTypeFileChanged EventType = "file.changed"
EventTypeServerNotice EventType = "server.notice"
EventTypeServerError EventType = "server.error"
)
type Event struct {
ID int64 `json:"id"`
TS time.Time `json:"ts"`
Type EventType `json:"type"`
Data any `json:"data"`
}
```
**Step 4: Run test to verify it passes**
Run: `go test ./...`
Expected: PASS
**Step 5: Commit**
```bash
git add internal/claude/events.go internal/claude/events_test.go
git commit -m "feat: add real-time event types"
```
---
## Task 2: Implement `EventHub` (pub/sub + ring buffer)
**Files:**
- Create: `~/.claude/dashboard/internal/claude/eventhub.go`
- Test: `~/.claude/dashboard/internal/claude/eventhub_test.go`
**Step 1: Write failing tests**
```go
package claude
import (
"testing"
"time"
)
func TestEventHub_PublishSubscribe(t *testing.T) {
hub := NewEventHub(10)
ch, cancel := hub.Subscribe()
defer cancel()
hub.Publish(Event{TS: time.Unix(1, 0), Type: EventTypeServerNotice, Data: map[string]any{"msg": "hi"}})
select {
case ev := <-ch:
if ev.Type != EventTypeServerNotice {
t.Fatalf("type=%s", ev.Type)
}
if ev.ID == 0 {
t.Fatalf("expected id to be assigned")
}
default:
t.Fatalf("expected event")
}
}
func TestEventHub_ReplaySince(t *testing.T) {
hub := NewEventHub(3)
hub.Publish(Event{TS: time.Unix(1, 0), Type: EventTypeServerNotice}) // id 1
hub.Publish(Event{TS: time.Unix(2, 0), Type: EventTypeServerNotice}) // id 2
hub.Publish(Event{TS: time.Unix(3, 0), Type: EventTypeServerNotice}) // id 3
got := hub.ReplaySince(1)
if len(got) != 2 {
t.Fatalf("len=%d", len(got))
}
if got[0].ID != 2 || got[1].ID != 3 {
t.Fatalf("ids=%d,%d", got[0].ID, got[1].ID)
}
}
```
**Step 2: Run tests to verify RED**
Run: `go test ./...`
Expected: FAIL with `undefined: NewEventHub`
**Step 3: Write minimal implementation**
Implement:
- `type EventHub struct { ... }`
- `NewEventHub(bufferSize int) *EventHub`
- `Publish(ev Event) Event`:
- assign `ID` if zero using an internal counter
- set `TS = time.Now()` if zero
- append to ring buffer
- broadcast to subscriber channels (non-blocking send)
- `Subscribe() (chan Event, func())` returns a buffered channel and a cancel func
- `ReplaySince(lastID int64) []Event`
**Step 4: Run tests to verify GREEN**
Run: `go test ./...`
Expected: PASS
**Step 5: Commit**
```bash
git add internal/claude/eventhub.go internal/claude/eventhub_test.go
git commit -m "feat: add event hub with replay buffer"
```
---
## Task 3: Tail last N lines helper (newest → oldest)
**Files:**
- Create: `~/.claude/dashboard/internal/claude/tail.go`
- Test: `~/.claude/dashboard/internal/claude/tail_test.go`
**Step 1: Write the failing test**
```go
package claude
import (
"os"
"path/filepath"
"strings"
"testing"
)
func TestTailLastNLines_NewestFirst(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "history.jsonl")
var b strings.Builder
for i := 1; i <= 5; i++ {
b.WriteString("line")
b.WriteString([]string{"1","2","3","4","5"}[i-1])
b.WriteString("\n")
}
if err := os.WriteFile(p, []byte(b.String()), 0o600); err != nil {
t.Fatalf("write: %v", err)
}
lines, err := TailLastNLines(p, 2)
if err != nil {
t.Fatalf("TailLastNLines: %v", err)
}
if len(lines) != 2 {
t.Fatalf("len=%d", len(lines))
}
if lines[0] != "line5" || lines[1] != "line4" {
t.Fatalf("got=%v", lines)
}
}
```
**Step 2: Run test to verify it fails**
Run: `go test ./...`
Expected: FAIL with `undefined: TailLastNLines`
**Step 3: Minimal implementation**
Create `~/.claude/dashboard/internal/claude/tail.go`:
- `TailLastNLines(path string, n int) ([]string, error)`
- First implementation can be simple (read whole file + split) with a TODO noting potential optimization.
- Return text lines without trailing newline; newest→oldest ordering.
**Step 4: Run tests to verify it passes**
Run: `go test ./...`
Expected: PASS
**Step 5: Commit**
```bash
git add internal/claude/tail.go internal/claude/tail_test.go
git commit -m "feat: add tail last N lines helper"
```
---
## Task 4: Add backlog endpoint returning normalized events
**Files:**
- Create: `~/.claude/dashboard/internal/api/claude_live_handlers.go`
- Modify: `~/.claude/dashboard/internal/api/claude_handlers.go` (only to share helper if needed)
- Modify: `~/.claude/dashboard/cmd/server/main.go`
- Test: `~/.claude/dashboard/internal/api/claude_live_handlers_test.go`
**Step 1: Write failing test**
```go
package api
import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"github.com/go-chi/chi/v5"
"github.com/will/k8s-agent-dashboard/internal/claude"
)
type fakeClaudeDirLoader struct{ dir string }
func (f fakeClaudeDirLoader) ClaudeDir() string { return f.dir }
func (f fakeClaudeDirLoader) LoadStatsCache() (*claude.StatsCache, error) { return &claude.StatsCache{}, nil }
func (f fakeClaudeDirLoader) ListDir(name string) ([]claude.DirEntry, error) { return nil, nil }
func (f fakeClaudeDirLoader) FileMeta(relPath string) (claude.FileMeta, error) { return claude.FileMeta{}, nil }
func (f fakeClaudeDirLoader) PathExists(relPath string) bool { return true }
func TestClaudeLiveBacklog_DefaultLimit(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "history.jsonl")
if err := os.WriteFile(p, []byte("{\"display\":\"/model\"}\n"), 0o600); err != nil {
t.Fatalf("write: %v", err)
}
loader := fakeClaudeDirLoader{dir: dir}
r := chi.NewRouter()
r.Get("/api/claude/live/backlog", GetClaudeLiveBacklog(loader))
req := httptest.NewRequest(http.MethodGet, "/api/claude/live/backlog", nil)
w := httptest.NewRecorder()
r.ServeHTTP(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
}
// Assert response includes an "events" array with at least 1 event.
if !jsonContainsKey(t, w.Body.Bytes(), "events") {
t.Fatalf("expected events in response: %s", w.Body.String())
}
}
```
**Step 2: Run test to verify RED**
Run: `go test ./...`
Expected: FAIL with `undefined: GetClaudeLiveBacklog`
**Step 3: Minimal implementation**
Create `~/.claude/dashboard/internal/api/claude_live_handlers.go`:
- `GetClaudeLiveBacklog(loader ClaudeLoader) http.HandlerFunc`
- Query param: `limit` (default 200; clamp 1..1000)
- Use `claude.TailLastNLines(filepath.Join(loader.ClaudeDir(), "history.jsonl"), limit)`
- For each line, create a `claude.Event` with:
- `Type: claude.EventTypeHistoryAppend`
- `TS: time.Now()` (or parse timestamp if present in JSON)
- `Data` contains: `rawLine`, optionally `json`, optionally `parseError`, and `summary` (best effort)
- JSON parsing should be schema-agnostic: unmarshal into `map[string]any`.
- Summary extraction should look for keys: `sessionId`, `project`, `display` (strings).
Return payload:
```json
{ "limit": 200, "events": [ ... ] }
```
**Step 4: Run tests to verify GREEN**
Run: `go test ./...`
Expected: PASS
**Step 5: Wire route**
Modify `~/.claude/dashboard/cmd/server/main.go` to register:
- `GET /api/claude/live/backlog`
**Step 6: Run tests again**
Run: `go test ./...`
Expected: PASS
**Step 7: Commit**
```bash
git add cmd/server/main.go internal/api/claude_live_handlers.go internal/api/claude_live_handlers_test.go internal/claude/tail.go internal/claude/tail_test.go
git commit -m "feat: add claude live backlog endpoint"
```
---
## Task 5: Add SSE stream endpoint
**Files:**
- Create: `~/.claude/dashboard/internal/api/claude_stream_handlers.go`
- Modify: `~/.claude/dashboard/cmd/server/main.go`
- Test: `~/.claude/dashboard/internal/api/claude_stream_handlers_test.go`
**Step 1: Write failing test**
```go
package api
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/will/k8s-agent-dashboard/internal/claude"
)
func TestClaudeStream_SendsEvent(t *testing.T) {
hub := claude.NewEventHub(10)
r := chi.NewRouter()
r.Get("/api/claude/stream", GetClaudeStream(hub))
req := httptest.NewRequest(http.MethodGet, "/api/claude/stream", nil)
w := httptest.NewRecorder()
// Publish after handler starts.
go func() {
time.Sleep(10 * time.Millisecond)
hub.Publish(claude.Event{Type: claude.EventTypeServerNotice, Data: map[string]any{"msg": "hi"}})
}()
r.ServeHTTP(w, req)
if ct := w.Header().Get("Content-Type"); !strings.Contains(ct, "text/event-stream") {
t.Fatalf("content-type=%q", ct)
}
if !strings.Contains(w.Body.String(), "event:") || !strings.Contains(w.Body.String(), "data:") {
t.Fatalf("body=%s", w.Body.String())
}
}
```
**Step 2: Run test to verify RED**
Run: `go test ./...`
Expected: FAIL with `undefined: GetClaudeStream`
**Step 3: Implement minimal SSE handler**
Create `~/.claude/dashboard/internal/api/claude_stream_handlers.go`:
- `GetClaudeStream(hub *claude.EventHub) http.HandlerFunc`
- Set headers:
- `Content-Type: text/event-stream`
- `Cache-Control: no-cache`
- Subscribe to hub; write events in SSE format:
```
event: <type>
id: <id>
data: <json>
```
- Flush after each event.
- Keep it minimal; add keepalive pings later.
**Step 4: Run tests to verify GREEN**
Run: `go test ./...`
Expected: PASS
**Step 5: Wire route**
Modify `~/.claude/dashboard/cmd/server/main.go` to register:
- `GET /api/claude/stream`
**Step 6: Run tests again**
Run: `go test ./...`
Expected: PASS
**Step 7: Commit**
```bash
git add cmd/server/main.go internal/api/claude_stream_handlers.go internal/api/claude_stream_handlers_test.go internal/claude/eventhub.go internal/claude/eventhub_test.go
git commit -m "feat: add claude sse stream endpoint"
```
---
## Task 6: Implement HistoryTailer to publish hub events
**Files:**
- Create: `~/.claude/dashboard/internal/claude/history_tailer.go`
- Test: `~/.claude/dashboard/internal/claude/history_tailer_test.go`
- Modify: `~/.claude/dashboard/cmd/server/main.go`
**Step 1: Write failing test**
```go
package claude
import (
"os"
"path/filepath"
"testing"
"time"
)
func TestHistoryTailer_EmitsOnAppend(t *testing.T) {
dir := t.TempDir()
p := filepath.Join(dir, "history.jsonl")
if err := os.WriteFile(p, []byte(""), 0o600); err != nil {
t.Fatalf("write: %v", err)
}
hub := NewEventHub(10)
ch, cancel := hub.Subscribe()
defer cancel()
stop := make(chan struct{})
go TailHistoryFile(stop, hub, p)
// Append a line
if err := os.WriteFile(p, []byte("{\"display\":\"/status\"}\n"), 0o600); err != nil {
t.Fatalf("append: %v", err)
}
select {
case ev := <-ch:
if ev.Type != EventTypeHistoryAppend {
t.Fatalf("type=%s", ev.Type)
}
case <-time.After(200 * time.Millisecond):
t.Fatalf("timed out waiting for event")
}
close(stop)
}
```
**Step 2: Run tests to verify RED**
Run: `go test ./...`
Expected: FAIL with `undefined: TailHistoryFile`
**Step 3: Minimal implementation**
Create `~/.claude/dashboard/internal/claude/history_tailer.go` implementing:
- `TailHistoryFile(stop <-chan struct{}, hub *EventHub, path string)`
- Simple polling loop (since target latency is 25s):
- Every 500ms1s, stat file size
- If size grew, read new bytes from offset, split on `\n`, publish `history.append` events
- If size shrank, reset offset to 0 and publish `server.notice`
Also implement an internal helper to parse a history line into event `Data` with `summary` extraction (same logic as backlog).
**Step 4: Run tests to verify GREEN**
Run: `go test ./...`
Expected: PASS
**Step 5: Wire tailer in server**
Modify `~/.claude/dashboard/cmd/server/main.go`:
- Create hub at startup: `hub := claude.NewEventHub(1000)`
- Start goroutine tailing `filepath.Join(*claudeDir, "history.jsonl")`
**Step 6: Run tests again**
Run: `go test ./...`
Expected: PASS
**Step 7: Commit**
```bash
git add cmd/server/main.go internal/claude/history_tailer.go internal/claude/history_tailer_test.go
git commit -m "feat: stream history.jsonl appends via event hub"
```
---
## Task 7: Frontend Live view (EventSource + batching)
**Files:**
- Modify: `~/.claude/dashboard/cmd/server/web/index.html`
- Modify: `~/.claude/dashboard/cmd/server/web/static/js/app.js`
- Modify: `~/.claude/dashboard/cmd/server/web/static/css/style.css`
**Step 1: Add Live tab + markup**
- Add nav button: `data-view="live"`
- Add section:
- `id="live-view"`
- table `id="claude-live-table"` and a connection indicator `id="claude-live-conn"`
**Step 2: Add JS backlog fetch + EventSource**
Modify `~/.claude/dashboard/cmd/server/web/static/js/app.js`:
- On DOMContentLoaded, create `EventSource('/api/claude/stream')`
- Maintain:
- `let pendingLiveEvents = []`
- `let liveEvents = []` (cap at 500)
- Every 1000ms:
- move pending → live
- render table rows
- Fetch backlog once:
- `GET /api/claude/live/backlog?limit=200`
- prepend/append into `liveEvents` (newest→oldest returned; UI should render newest first at top)
**Step 3: CSS**
- Add a small connection badge style (green/yellow/red)
- Ensure table remains readable
**Step 4: Manual verification**
Run:
- `go test ./...`
- `go run ./cmd/server --port 8080 --data /tmp/k8s --claude ~/.claude`
Expected:
- Live tab loads backlog rows
- New history events appear on subsequent CLI activity
**Step 5: Commit**
```bash
git add cmd/server/web/index.html cmd/server/web/static/js/app.js cmd/server/web/static/css/style.css
git commit -m "feat: add live feed UI with SSE batching"
```
---
## Task 8: End-to-end verification
**Files:**
- None (unless a bug requires fixes)
**Step 1: Run full test suite**
Run: `go test ./...`
Expected: PASS (0 failures)
**Step 2: Manual smoke check**
Run:
- `go run ./cmd/server --port 8080 --data /tmp/k8s --claude ~/.claude`
Check:
- `curl -N http://localhost:8080/api/claude/stream` prints SSE lines
- `curl http://localhost:8080/api/claude/live/backlog?limit=5` returns `events` array
- Browser Live tab updates
---
## Execution handoff
Plan complete and saved to `~/.claude/docs/plans/2026-01-01-claude-realtime-monitoring-implementation.md`.
Two execution options:
1. Subagent-Driven (this session) — I dispatch fresh subagent per task, review between tasks, fast iteration
2. Parallel Session (separate) — Open new session with `superpowers:executing-plans`, batch execution with checkpoints
Which approach?