Files
claude-code/docs/plans/2026-01-01-claude-realtime-monitoring-implementation.md
OpenCode Test e43e052a32 Add design plans for dashboard integration
Add implementation plans for morning report, Claude ops dashboard, and realtime monitoring features.
2026-01-03 10:55:22 -08:00

16 KiB
Raw Permalink Blame History

Claude Real-Time Monitoring (SSE) Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: Add real-time-ish monitoring of Claude Code agent activity to the existing Go dashboard, with a backlog (last 200 history events) plus SSE updates, shown in a new “Live” UI feed with prettified rows and expandable raw JSON.

Architecture: Add an in-memory EventHub (pub/sub + ring buffer) that receives events from a HistoryTailer (tails ~/.claude/history.jsonl) and a debounced file watcher (for stats-cache.json and state/component-registry.json). Expose a REST backlog endpoint (/api/claude/live/backlog) returning normalized Event objects (newest→oldest) and an SSE stream endpoint (/api/claude/stream) that pushes new events to the browser. Frontend uses EventSource plus batching (render every 12s).

Tech Stack: Go 1.21+, chi, vanilla HTML/CSS/JS, optional fsnotify.


Decisions (locked)

  • Transport: SSE first (WebSockets later).
  • Acceptable latency: 25 seconds.
  • UI: prettified table with expandable raw JSON.
  • Backlog: enabled, default limit=200.
  • Backlog ordering: newest → oldest.
  • Parsing: generic-first, best-effort extraction of a few fields; always preserve raw JSON.
  • Backlog response format: normalized events (not just raw lines).

Data Contract

Event JSON

All events share:

{
  "id": 123,
  "ts": "2026-01-01T12:00:00Z",
  "type": "history.append",
  "data": {
    "summary": {
      "sessionId": "...",
      "project": "...",
      "display": "/model"
    },
    "rawLine": "{...}",
    "json": { "...": "..." },
    "parseError": "..."
  }
}

Event types:

  • history.append
  • file.changed
  • server.notice
  • server.error

Task 1: Add Event types

Files:

  • Create: ~/.claude/dashboard/internal/claude/events.go
  • Test: ~/.claude/dashboard/internal/claude/events_test.go

Step 1: Write the failing test

package claude

import "testing"

func TestEventTypesCompile(t *testing.T) {
	_ = Event{}
	_ = EventTypeHistoryAppend
	_ = EventTypeFileChanged
	_ = EventTypeServerNotice
	_ = EventTypeServerError
}

Step 2: Run test to verify it fails

Run: go test ./... Expected: FAIL with undefined: Event

Step 3: Write minimal implementation

package claude

import "time"

type EventType string

const (
	EventTypeHistoryAppend EventType = "history.append"
	EventTypeFileChanged   EventType = "file.changed"
	EventTypeServerNotice  EventType = "server.notice"
	EventTypeServerError   EventType = "server.error"
)

type Event struct {
	ID   int64     `json:"id"`
	TS   time.Time `json:"ts"`
	Type EventType `json:"type"`
	Data any       `json:"data"`
}

Step 4: Run test to verify it passes

Run: go test ./... Expected: PASS

Step 5: Commit

git add internal/claude/events.go internal/claude/events_test.go
git commit -m "feat: add real-time event types"

Task 2: Implement EventHub (pub/sub + ring buffer)

Files:

  • Create: ~/.claude/dashboard/internal/claude/eventhub.go
  • Test: ~/.claude/dashboard/internal/claude/eventhub_test.go

Step 1: Write failing tests

package claude

import (
	"testing"
	"time"
)

func TestEventHub_PublishSubscribe(t *testing.T) {
	hub := NewEventHub(10)
	ch, cancel := hub.Subscribe()
	defer cancel()

	hub.Publish(Event{TS: time.Unix(1, 0), Type: EventTypeServerNotice, Data: map[string]any{"msg": "hi"}})

	select {
	case ev := <-ch:
		if ev.Type != EventTypeServerNotice {
			t.Fatalf("type=%s", ev.Type)
		}
		if ev.ID == 0 {
			t.Fatalf("expected id to be assigned")
		}
	default:
		t.Fatalf("expected event")
	}
}

func TestEventHub_ReplaySince(t *testing.T) {
	hub := NewEventHub(3)
	hub.Publish(Event{TS: time.Unix(1, 0), Type: EventTypeServerNotice}) // id 1
	hub.Publish(Event{TS: time.Unix(2, 0), Type: EventTypeServerNotice}) // id 2
	hub.Publish(Event{TS: time.Unix(3, 0), Type: EventTypeServerNotice}) // id 3

	got := hub.ReplaySince(1)
	if len(got) != 2 {
		t.Fatalf("len=%d", len(got))
	}
	if got[0].ID != 2 || got[1].ID != 3 {
		t.Fatalf("ids=%d,%d", got[0].ID, got[1].ID)
	}
}

Step 2: Run tests to verify RED

Run: go test ./... Expected: FAIL with undefined: NewEventHub

Step 3: Write minimal implementation

Implement:

  • type EventHub struct { ... }
  • NewEventHub(bufferSize int) *EventHub
  • Publish(ev Event) Event:
    • assign ID if zero using an internal counter
    • set TS = time.Now() if zero
    • append to ring buffer
    • broadcast to subscriber channels (non-blocking send)
  • Subscribe() (chan Event, func()) returns a buffered channel and a cancel func
  • ReplaySince(lastID int64) []Event

Step 4: Run tests to verify GREEN

Run: go test ./... Expected: PASS

Step 5: Commit

git add internal/claude/eventhub.go internal/claude/eventhub_test.go
git commit -m "feat: add event hub with replay buffer"

Task 3: Tail last N lines helper (newest → oldest)

Files:

  • Create: ~/.claude/dashboard/internal/claude/tail.go
  • Test: ~/.claude/dashboard/internal/claude/tail_test.go

Step 1: Write the failing test

package claude

import (
	"os"
	"path/filepath"
	"strings"
	"testing"
)

func TestTailLastNLines_NewestFirst(t *testing.T) {
	dir := t.TempDir()
	p := filepath.Join(dir, "history.jsonl")

	var b strings.Builder
	for i := 1; i <= 5; i++ {
		b.WriteString("line")
		b.WriteString([]string{"1","2","3","4","5"}[i-1])
		b.WriteString("\n")
	}
	if err := os.WriteFile(p, []byte(b.String()), 0o600); err != nil {
		t.Fatalf("write: %v", err)
	}

	lines, err := TailLastNLines(p, 2)
	if err != nil {
		t.Fatalf("TailLastNLines: %v", err)
	}
	if len(lines) != 2 {
		t.Fatalf("len=%d", len(lines))
	}
	if lines[0] != "line5" || lines[1] != "line4" {
		t.Fatalf("got=%v", lines)
	}
}

Step 2: Run test to verify it fails

Run: go test ./... Expected: FAIL with undefined: TailLastNLines

Step 3: Minimal implementation

Create ~/.claude/dashboard/internal/claude/tail.go:

  • TailLastNLines(path string, n int) ([]string, error)
  • First implementation can be simple (read whole file + split) with a TODO noting potential optimization.
  • Return text lines without trailing newline; newest→oldest ordering.

Step 4: Run tests to verify it passes

Run: go test ./... Expected: PASS

Step 5: Commit

git add internal/claude/tail.go internal/claude/tail_test.go
git commit -m "feat: add tail last N lines helper"

Task 4: Add backlog endpoint returning normalized events

Files:

  • Create: ~/.claude/dashboard/internal/api/claude_live_handlers.go
  • Modify: ~/.claude/dashboard/internal/api/claude_handlers.go (only to share helper if needed)
  • Modify: ~/.claude/dashboard/cmd/server/main.go
  • Test: ~/.claude/dashboard/internal/api/claude_live_handlers_test.go

Step 1: Write failing test

package api

import (
	"net/http"
	"net/http/httptest"
	"os"
	"path/filepath"
	"testing"

	"github.com/go-chi/chi/v5"
	"github.com/will/k8s-agent-dashboard/internal/claude"
)

type fakeClaudeDirLoader struct{ dir string }

func (f fakeClaudeDirLoader) ClaudeDir() string                                { return f.dir }
func (f fakeClaudeDirLoader) LoadStatsCache() (*claude.StatsCache, error)      { return &claude.StatsCache{}, nil }
func (f fakeClaudeDirLoader) ListDir(name string) ([]claude.DirEntry, error)   { return nil, nil }
func (f fakeClaudeDirLoader) FileMeta(relPath string) (claude.FileMeta, error) { return claude.FileMeta{}, nil }
func (f fakeClaudeDirLoader) PathExists(relPath string) bool                   { return true }

func TestClaudeLiveBacklog_DefaultLimit(t *testing.T) {
	dir := t.TempDir()
	p := filepath.Join(dir, "history.jsonl")
	if err := os.WriteFile(p, []byte("{\"display\":\"/model\"}\n"), 0o600); err != nil {
		t.Fatalf("write: %v", err)
	}

	loader := fakeClaudeDirLoader{dir: dir}

	r := chi.NewRouter()
	r.Get("/api/claude/live/backlog", GetClaudeLiveBacklog(loader))

	req := httptest.NewRequest(http.MethodGet, "/api/claude/live/backlog", nil)
	w := httptest.NewRecorder()
	r.ServeHTTP(w, req)

	if w.Code != http.StatusOK {
		t.Fatalf("status=%d body=%s", w.Code, w.Body.String())
	}
	// Assert response includes an "events" array with at least 1 event.
	if !jsonContainsKey(t, w.Body.Bytes(), "events") {
		t.Fatalf("expected events in response: %s", w.Body.String())
	}
}

Step 2: Run test to verify RED

Run: go test ./... Expected: FAIL with undefined: GetClaudeLiveBacklog

Step 3: Minimal implementation

Create ~/.claude/dashboard/internal/api/claude_live_handlers.go:

  • GetClaudeLiveBacklog(loader ClaudeLoader) http.HandlerFunc
  • Query param: limit (default 200; clamp 1..1000)
  • Use claude.TailLastNLines(filepath.Join(loader.ClaudeDir(), "history.jsonl"), limit)
  • For each line, create a claude.Event with:
    • Type: claude.EventTypeHistoryAppend
    • TS: time.Now() (or parse timestamp if present in JSON)
    • Data contains: rawLine, optionally json, optionally parseError, and summary (best effort)
  • JSON parsing should be schema-agnostic: unmarshal into map[string]any.
  • Summary extraction should look for keys: sessionId, project, display (strings).

Return payload:

{ "limit": 200, "events": [ ... ] }

Step 4: Run tests to verify GREEN

Run: go test ./... Expected: PASS

Step 5: Wire route

Modify ~/.claude/dashboard/cmd/server/main.go to register:

  • GET /api/claude/live/backlog

Step 6: Run tests again

Run: go test ./... Expected: PASS

Step 7: Commit

git add cmd/server/main.go internal/api/claude_live_handlers.go internal/api/claude_live_handlers_test.go internal/claude/tail.go internal/claude/tail_test.go
git commit -m "feat: add claude live backlog endpoint"

Task 5: Add SSE stream endpoint

Files:

  • Create: ~/.claude/dashboard/internal/api/claude_stream_handlers.go
  • Modify: ~/.claude/dashboard/cmd/server/main.go
  • Test: ~/.claude/dashboard/internal/api/claude_stream_handlers_test.go

Step 1: Write failing test

package api

import (
	"net/http"
	"net/http/httptest"
	"strings"
	"testing"
	"time"

	"github.com/go-chi/chi/v5"
	"github.com/will/k8s-agent-dashboard/internal/claude"
)

func TestClaudeStream_SendsEvent(t *testing.T) {
	hub := claude.NewEventHub(10)

	r := chi.NewRouter()
	r.Get("/api/claude/stream", GetClaudeStream(hub))

	req := httptest.NewRequest(http.MethodGet, "/api/claude/stream", nil)
	w := httptest.NewRecorder()

	// Publish after handler starts.
	go func() {
		time.Sleep(10 * time.Millisecond)
		hub.Publish(claude.Event{Type: claude.EventTypeServerNotice, Data: map[string]any{"msg": "hi"}})
	}()

	r.ServeHTTP(w, req)

	if ct := w.Header().Get("Content-Type"); !strings.Contains(ct, "text/event-stream") {
		t.Fatalf("content-type=%q", ct)
	}
	if !strings.Contains(w.Body.String(), "event:") || !strings.Contains(w.Body.String(), "data:") {
		t.Fatalf("body=%s", w.Body.String())
	}
}

Step 2: Run test to verify RED

Run: go test ./... Expected: FAIL with undefined: GetClaudeStream

Step 3: Implement minimal SSE handler

Create ~/.claude/dashboard/internal/api/claude_stream_handlers.go:

  • GetClaudeStream(hub *claude.EventHub) http.HandlerFunc
  • Set headers:
    • Content-Type: text/event-stream
    • Cache-Control: no-cache
  • Subscribe to hub; write events in SSE format:
event: <type>
id: <id>
data: <json>


  • Flush after each event.
  • Keep it minimal; add keepalive pings later.

Step 4: Run tests to verify GREEN

Run: go test ./... Expected: PASS

Step 5: Wire route

Modify ~/.claude/dashboard/cmd/server/main.go to register:

  • GET /api/claude/stream

Step 6: Run tests again

Run: go test ./... Expected: PASS

Step 7: Commit

git add cmd/server/main.go internal/api/claude_stream_handlers.go internal/api/claude_stream_handlers_test.go internal/claude/eventhub.go internal/claude/eventhub_test.go
git commit -m "feat: add claude sse stream endpoint"

Task 6: Implement HistoryTailer to publish hub events

Files:

  • Create: ~/.claude/dashboard/internal/claude/history_tailer.go
  • Test: ~/.claude/dashboard/internal/claude/history_tailer_test.go
  • Modify: ~/.claude/dashboard/cmd/server/main.go

Step 1: Write failing test

package claude

import (
	"os"
	"path/filepath"
	"testing"
	"time"
)

func TestHistoryTailer_EmitsOnAppend(t *testing.T) {
	dir := t.TempDir()
	p := filepath.Join(dir, "history.jsonl")
	if err := os.WriteFile(p, []byte(""), 0o600); err != nil {
		t.Fatalf("write: %v", err)
	}

	hub := NewEventHub(10)
	ch, cancel := hub.Subscribe()
	defer cancel()

	stop := make(chan struct{})
	go TailHistoryFile(stop, hub, p)

	// Append a line
	if err := os.WriteFile(p, []byte("{\"display\":\"/status\"}\n"), 0o600); err != nil {
		t.Fatalf("append: %v", err)
	}

	select {
	case ev := <-ch:
		if ev.Type != EventTypeHistoryAppend {
			t.Fatalf("type=%s", ev.Type)
		}
	case <-time.After(200 * time.Millisecond):
		t.Fatalf("timed out waiting for event")
	}

	close(stop)
}

Step 2: Run tests to verify RED

Run: go test ./... Expected: FAIL with undefined: TailHistoryFile

Step 3: Minimal implementation

Create ~/.claude/dashboard/internal/claude/history_tailer.go implementing:

  • TailHistoryFile(stop <-chan struct{}, hub *EventHub, path string)
  • Simple polling loop (since target latency is 25s):
    • Every 500ms1s, stat file size
    • If size grew, read new bytes from offset, split on \n, publish history.append events
    • If size shrank, reset offset to 0 and publish server.notice

Also implement an internal helper to parse a history line into event Data with summary extraction (same logic as backlog).

Step 4: Run tests to verify GREEN

Run: go test ./... Expected: PASS

Step 5: Wire tailer in server

Modify ~/.claude/dashboard/cmd/server/main.go:

  • Create hub at startup: hub := claude.NewEventHub(1000)
  • Start goroutine tailing filepath.Join(*claudeDir, "history.jsonl")

Step 6: Run tests again

Run: go test ./... Expected: PASS

Step 7: Commit

git add cmd/server/main.go internal/claude/history_tailer.go internal/claude/history_tailer_test.go
git commit -m "feat: stream history.jsonl appends via event hub"

Task 7: Frontend Live view (EventSource + batching)

Files:

  • Modify: ~/.claude/dashboard/cmd/server/web/index.html
  • Modify: ~/.claude/dashboard/cmd/server/web/static/js/app.js
  • Modify: ~/.claude/dashboard/cmd/server/web/static/css/style.css

Step 1: Add Live tab + markup

  • Add nav button: data-view="live"
  • Add section:
    • id="live-view"
    • table id="claude-live-table" and a connection indicator id="claude-live-conn"

Step 2: Add JS backlog fetch + EventSource

Modify ~/.claude/dashboard/cmd/server/web/static/js/app.js:

  • On DOMContentLoaded, create EventSource('/api/claude/stream')
  • Maintain:
    • let pendingLiveEvents = []
    • let liveEvents = [] (cap at 500)
  • Every 1000ms:
    • move pending → live
    • render table rows
  • Fetch backlog once:
    • GET /api/claude/live/backlog?limit=200
    • prepend/append into liveEvents (newest→oldest returned; UI should render newest first at top)

Step 3: CSS

  • Add a small connection badge style (green/yellow/red)
  • Ensure table remains readable

Step 4: Manual verification

Run:

  • go test ./...
  • go run ./cmd/server --port 8080 --data /tmp/k8s --claude ~/.claude

Expected:

  • Live tab loads backlog rows
  • New history events appear on subsequent CLI activity

Step 5: Commit

git add cmd/server/web/index.html cmd/server/web/static/js/app.js cmd/server/web/static/css/style.css
git commit -m "feat: add live feed UI with SSE batching"

Task 8: End-to-end verification

Files:

  • None (unless a bug requires fixes)

Step 1: Run full test suite

Run: go test ./... Expected: PASS (0 failures)

Step 2: Manual smoke check

Run:

  • go run ./cmd/server --port 8080 --data /tmp/k8s --claude ~/.claude

Check:

  • curl -N http://localhost:8080/api/claude/stream prints SSE lines
  • curl http://localhost:8080/api/claude/live/backlog?limit=5 returns events array
  • Browser Live tab updates

Execution handoff

Plan complete and saved to ~/.claude/docs/plans/2026-01-01-claude-realtime-monitoring-implementation.md.

Two execution options:

  1. Subagent-Driven (this session) — I dispatch fresh subagent per task, review between tasks, fast iteration
  2. Parallel Session (separate) — Open new session with superpowers:executing-plans, batch execution with checkpoints

Which approach?