256b841cbf
Adds Go microservices (ingest-gateway, event-processor, query-api, web-ui), NATS+Postgres wiring, initial schema/init job, ingress manifests for LAN+tailnet, and a multi-arch image build script.
42 lines
705 B
Go
42 lines
705 B
Go
package nats
|
|
|
|
import (
|
|
"context"
|
|
|
|
gnats "github.com/nats-io/nats.go"
|
|
)
|
|
|
|
type Subscriber struct {
|
|
conn *gnats.Conn
|
|
sub *gnats.Subscription
|
|
topic string
|
|
}
|
|
|
|
func NewSubscriber(url, topic string) (*Subscriber, error) {
|
|
conn, err := gnats.Connect(url)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Subscriber{conn: conn, topic: topic}, nil
|
|
}
|
|
|
|
func (s *Subscriber) Close() {
|
|
if s.sub != nil {
|
|
_ = s.sub.Unsubscribe()
|
|
}
|
|
s.conn.Close()
|
|
}
|
|
|
|
func (s *Subscriber) Subscribe(ctx context.Context, handler func(msg []byte) error) error {
|
|
sub, err := s.conn.Subscribe(s.topic, func(m *gnats.Msg) {
|
|
_ = handler(m.Data)
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.sub = sub
|
|
|
|
<-ctx.Done()
|
|
return ctx.Err()
|
|
}
|