package engine import ( "context" "sync" "time" "tower/internal/collectors" "tower/internal/model" ) // IssueStore is the Engine's dependency on the issue store. // // The concrete implementation lives in internal/store. We depend on an interface // here to keep the Engine testable. // // NOTE: The store is responsible for dedupe + lifecycle (resolve-after, ack, etc.). // The Engine simply merges outputs from collectors and passes them into Upsert. // // Engine calls Snapshot() to publish UI snapshots. // // This interface must be satisfied by internal/store.IssueStore. // (Do not add persistence here.) type IssueStore interface { Upsert(now time.Time, issues []model.Issue) Snapshot(now time.Time) []model.Issue } // CollectorConfig wires a collector into the Engine. // Timeout applies per Collect() invocation. // Interval comes from the collector itself. // // If Timeout <= 0, no per-collector timeout is applied. type CollectorConfig struct { Collector collectors.Collector Timeout time.Duration } // CollectorHealth tracks the current health of a collector. // // Status is the last status returned by the collector. // LastError is the last error returned by the collector (if any). type CollectorHealth struct { Status collectors.Status LastError error LastRun time.Time LastOK time.Time LastRunDur time.Duration } // Snapshot is the Engine's UI-facing view. // // Issues are sorted using the default sort order (Priority desc, then recency desc). // Collectors is keyed by collector name. type Snapshot struct { At time.Time Issues []model.Issue Collectors map[string]CollectorHealth } type collectResult struct { name string at time.Time duration time.Duration issues []model.Issue status collectors.Status err error } type collectorRunner struct { cfg CollectorConfig refreshCh chan struct{} } // Engine runs collectors on their own schedules, merges issues, and updates the store. // It publishes snapshots for the UI. // // Lifecycle: // // e := New(...) // e.Start(ctx) // defer e.Stop() // // Snapshots are emitted: // - after any store update (collector completion) // - periodically at refreshInterval (if > 0) // // RefreshNow() forces all collectors to run immediately. type Engine struct { store IssueStore refreshInterval time.Duration snapshots chan Snapshot results chan collectResult mu sync.Mutex latestIssuesByCollector map[string][]model.Issue health map[string]CollectorHealth collectors []collectorRunner cancel context.CancelFunc wg sync.WaitGroup startOnce sync.Once stopOnce sync.Once } // New constructs an Engine. // // refreshInterval governs periodic snapshot emission. If refreshInterval <= 0, // snapshots are only emitted when collectors finish. func New(st IssueStore, cs []CollectorConfig, refreshInterval time.Duration) *Engine { runners := make([]collectorRunner, 0, len(cs)) for _, c := range cs { runners = append(runners, collectorRunner{ cfg: c, refreshCh: make(chan struct{}, 1), }) } return &Engine{ store: st, refreshInterval: refreshInterval, snapshots: make(chan Snapshot, 32), results: make(chan collectResult, 64), latestIssuesByCollector: map[string][]model.Issue{}, health: map[string]CollectorHealth{}, collectors: runners, } } // Start begins background collection. It is safe to call Start once. func (e *Engine) Start(parent context.Context) { e.startOnce.Do(func() { ctx, cancel := context.WithCancel(parent) e.cancel = cancel e.wg.Add(1) go func() { defer e.wg.Done() e.runAggregator(ctx) }() for i := range e.collectors { r := &e.collectors[i] e.wg.Add(1) go func(r *collectorRunner) { defer e.wg.Done() e.runCollector(ctx, r) }(r) } }) } // Stop stops the Engine and closes the snapshots channel. func (e *Engine) Stop() { e.stopOnce.Do(func() { if e.cancel != nil { e.cancel() } e.wg.Wait() close(e.snapshots) }) } // Snapshots returns a receive-only channel of snapshots. func (e *Engine) Snapshots() <-chan Snapshot { return e.snapshots } // RefreshNow forces all collectors to run immediately. // // This is non-blocking; if a collector already has a refresh queued, it will not // queue additional refresh signals. func (e *Engine) RefreshNow() { for i := range e.collectors { ch := e.collectors[i].refreshCh select { case ch <- struct{}{}: default: } } } func (e *Engine) runCollector(ctx context.Context, r *collectorRunner) { name := r.cfg.Collector.Name() interval := r.cfg.Collector.Interval() if interval <= 0 { interval = time.Second } doCollect := func() { start := time.Now() collectCtx := ctx cancel := func() {} if r.cfg.Timeout > 0 { collectCtx, cancel = context.WithTimeout(ctx, r.cfg.Timeout) } defer cancel() issues, st, err := r.cfg.Collector.Collect(collectCtx) finish := time.Now() dur := finish.Sub(start) // Copy issues slice to avoid data races when collectors reuse underlying storage. copied := make([]model.Issue, len(issues)) copy(copied, issues) res := collectResult{ name: name, at: finish, duration: dur, issues: copied, status: st, err: err, } select { case e.results <- res: case <-ctx.Done(): return } } // Collect immediately on start so the UI isn't empty for the first interval. doCollect() ticker := time.NewTicker(interval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: doCollect() case <-r.refreshCh: doCollect() } } } func (e *Engine) runAggregator(ctx context.Context) { var ticker *time.Ticker var tick <-chan time.Time if e.refreshInterval > 0 { ticker = time.NewTicker(e.refreshInterval) defer ticker.Stop() tick = ticker.C } emitSnapshot := func(at time.Time) { issues := e.store.Snapshot(at) // Ensure deterministic default sort for the UI. model.SortIssuesDefault(issues) // Copy collector health map. e.mu.Lock() h := make(map[string]CollectorHealth, len(e.health)) for k, v := range e.health { h[k] = v } e.mu.Unlock() snap := Snapshot{At: at, Issues: issues, Collectors: h} // Non-blocking publish; drop if UI is behind. select { case e.snapshots <- snap: default: } } for { select { case <-ctx.Done(): return case <-tick: emitSnapshot(time.Now()) case res := <-e.results: e.mu.Lock() // On collector errors, keep the last known issues for that collector. // This prevents transient errors/timeouts from making issues disappear. if res.err == nil { e.latestIssuesByCollector[res.name] = res.issues } ch := e.health[res.name] ch.Status = res.status ch.LastRun = res.at ch.LastRunDur = res.duration ch.LastError = res.err if res.err == nil { ch.LastOK = res.at } e.health[res.name] = ch merged := make([]model.Issue, 0, 64) for _, issues := range e.latestIssuesByCollector { merged = append(merged, issues...) } e.mu.Unlock() e.store.Upsert(res.at, merged) emitSnapshot(res.at) } } }