Files
agentmon/internal/queue/nats/subscribe.go
T
William Valentin 256b841cbf feat: scaffold agentmon services and k8s deploy
Adds Go microservices (ingest-gateway, event-processor, query-api, web-ui), NATS+Postgres wiring, initial schema/init job, ingress manifests for LAN+tailnet, and a multi-arch image build script.
2026-01-17 01:06:57 -08:00

42 lines
705 B
Go

package nats
import (
"context"
gnats "github.com/nats-io/nats.go"
)
type Subscriber struct {
conn *gnats.Conn
sub *gnats.Subscription
topic string
}
func NewSubscriber(url, topic string) (*Subscriber, error) {
conn, err := gnats.Connect(url)
if err != nil {
return nil, err
}
return &Subscriber{conn: conn, topic: topic}, nil
}
func (s *Subscriber) Close() {
if s.sub != nil {
_ = s.sub.Unsubscribe()
}
s.conn.Close()
}
func (s *Subscriber) Subscribe(ctx context.Context, handler func(msg []byte) error) error {
sub, err := s.conn.Subscribe(s.topic, func(m *gnats.Msg) {
_ = handler(m.Data)
})
if err != nil {
return err
}
s.sub = sub
<-ctx.Done()
return ctx.Err()
}