package claude import ( "sync" "sync/atomic" "time" ) type EventHub struct { mu sync.RWMutex buffer []Event nextID int64 subscribers []chan Event bufferSize int } func NewEventHub(bufferSize int) *EventHub { return &EventHub{ buffer: make([]Event, 0, bufferSize), subscribers: make([]chan Event, 0), bufferSize: bufferSize, } } func (h *EventHub) Publish(ev Event) Event { if ev.ID == 0 { ev.ID = atomic.AddInt64(&h.nextID, 1) } if ev.TS.IsZero() { ev.TS = time.Now() } h.mu.Lock() defer h.mu.Unlock() if len(h.buffer) >= h.bufferSize { h.buffer = h.buffer[1:] } h.buffer = append(h.buffer, ev) for _, ch := range h.subscribers { select { case ch <- ev: default: } } return ev } func (h *EventHub) Subscribe() (chan Event, func()) { ch := make(chan Event, 10) h.mu.Lock() defer h.mu.Unlock() h.subscribers = append(h.subscribers, ch) cancel := func() { h.mu.Lock() defer h.mu.Unlock() for i, c := range h.subscribers { if c == ch { h.subscribers = append(h.subscribers[:i], h.subscribers[i+1:]...) close(ch) break } } } return ch, cancel } func (h *EventHub) ReplaySince(lastID int64) []Event { h.mu.RLock() defer h.mu.RUnlock() var result []Event for _, ev := range h.buffer { if ev.ID > lastID { result = append(result, ev) } } return result }