package engine import ( "context" "errors" "sync" "sync/atomic" "testing" "time" "tower/internal/collectors" "tower/internal/model" ) type fakeStore struct { mu sync.Mutex upsertCalls int lastNow time.Time lastIssues []model.Issue } func (s *fakeStore) Upsert(now time.Time, issues []model.Issue) { s.mu.Lock() defer s.mu.Unlock() s.upsertCalls++ s.lastNow = now // Deep-ish copy: slice copy is enough for our tests. s.lastIssues = append([]model.Issue(nil), issues...) } func (s *fakeStore) Snapshot(now time.Time) []model.Issue { s.mu.Lock() defer s.mu.Unlock() return append([]model.Issue(nil), s.lastIssues...) } func (s *fakeStore) UpsertCount() int { s.mu.Lock() defer s.mu.Unlock() return s.upsertCalls } type fakeCollector struct { name string interval time.Duration // delay simulates work. If ctx is canceled/timeout hits, Collect returns ctx.Err(). delay time.Duration issuesFn func(call int64) []model.Issue calls atomic.Int64 callCh chan time.Time } func (c *fakeCollector) Name() string { return c.name } func (c *fakeCollector) Interval() time.Duration { return c.interval } func (c *fakeCollector) Collect(ctx context.Context) ([]model.Issue, collectors.Status, error) { call := c.calls.Add(1) if c.callCh != nil { select { case c.callCh <- time.Now(): default: } } if c.delay > 0 { t := time.NewTimer(c.delay) defer t.Stop() select { case <-ctx.Done(): var st collectors.Status return nil, st, ctx.Err() case <-t.C: } } var st collectors.Status if c.issuesFn != nil { return c.issuesFn(call), st, nil } return nil, st, nil } func recvSnapshot(t *testing.T, ch <-chan Snapshot, within time.Duration) Snapshot { t.Helper() select { case s := <-ch: return s case <-time.After(within): t.Fatalf("timed out waiting for snapshot") return Snapshot{} } } func TestEngine_UpsertAndSnapshotsEmitted(t *testing.T) { st := &fakeStore{} c := &fakeCollector{ name: "c1", interval: 100 * time.Millisecond, issuesFn: func(call int64) []model.Issue { return []model.Issue{{ ID: "id-1", Priority: model.PriorityP1, Title: "hello", LastSeen: time.Now(), }} }, } e := New(st, []CollectorConfig{{Collector: c, Timeout: 200 * time.Millisecond}}, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() defer e.Stop() e.Start(ctx) snap := recvSnapshot(t, e.Snapshots(), 300*time.Millisecond) if st.UpsertCount() < 1 { t.Fatalf("expected store.Upsert to be called") } if len(snap.Issues) != 1 || snap.Issues[0].ID != "id-1" { t.Fatalf("expected snapshot to contain issue id-1; got %+v", snap.Issues) } if _, ok := snap.Collectors["c1"]; !ok { t.Fatalf("expected collector health entry for c1") } } func TestEngine_CollectorTimeoutCancelsLongCollect(t *testing.T) { st := &fakeStore{} c := &fakeCollector{ name: "slow", interval: time.Hour, delay: 200 * time.Millisecond, } e := New(st, []CollectorConfig{{Collector: c, Timeout: 20 * time.Millisecond}}, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() defer e.Stop() e.Start(ctx) snap := recvSnapshot(t, e.Snapshots(), 400*time.Millisecond) ch, ok := snap.Collectors["slow"] if !ok { t.Fatalf("expected collector health entry for slow") } if ch.LastError == nil { t.Fatalf("expected LastError to be set") } if !errors.Is(ch.LastError, context.DeadlineExceeded) { t.Fatalf("expected context deadline exceeded; got %v", ch.LastError) } if st.UpsertCount() < 1 { t.Fatalf("expected store.Upsert to be called") } } func TestEngine_RefreshNowTriggersImmediateCollect(t *testing.T) { st := &fakeStore{} callCh := make(chan time.Time, 10) c := &fakeCollector{ name: "r", interval: 200 * time.Millisecond, callCh: callCh, } e := New(st, []CollectorConfig{{Collector: c, Timeout: time.Second}}, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() defer e.Stop() e.Start(ctx) // First collect happens immediately. select { case <-callCh: case <-time.After(200 * time.Millisecond): t.Fatalf("timed out waiting for initial collect") } // Trigger refresh; should happen well before the 200ms interval. time.Sleep(10 * time.Millisecond) e.RefreshNow() select { case <-callCh: // ok case <-time.After(120 * time.Millisecond): t.Fatalf("expected RefreshNow to trigger a collect quickly") } } func TestEngine_MultipleCollectorsRunOnIntervals(t *testing.T) { st := &fakeStore{} fast := &fakeCollector{name: "fast", interval: 30 * time.Millisecond} slow := &fakeCollector{name: "slow", interval: 80 * time.Millisecond} e := New(st, []CollectorConfig{{Collector: fast, Timeout: time.Second}, {Collector: slow, Timeout: time.Second}}, 0) ctx, cancel := context.WithCancel(context.Background()) defer cancel() e.Start(ctx) // Let it run a bit. time.Sleep(220 * time.Millisecond) e.Stop() fastCalls := fast.calls.Load() slowCalls := slow.calls.Load() // Includes initial collect. if fastCalls < 4 { t.Fatalf("expected fast collector to be called multiple times; got %d", fastCalls) } if slowCalls < 2 { t.Fatalf("expected slow collector to be called multiple times; got %d", slowCalls) } }