feat: scaffold agentmon services and k8s deploy
Adds Go microservices (ingest-gateway, event-processor, query-api, web-ui), NATS+Postgres wiring, initial schema/init job, ingress manifests for LAN+tailnet, and a multi-arch image build script.
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
package event
|
||||
|
||||
type Schema struct {
|
||||
Name string `json:"name"`
|
||||
Version int `json:"version"`
|
||||
}
|
||||
|
||||
type Source struct {
|
||||
Framework string `json:"framework"`
|
||||
ClientID string `json:"client_id"`
|
||||
Host string `json:"host"`
|
||||
User string `json:"user,omitempty"`
|
||||
Version string `json:"version,omitempty"`
|
||||
}
|
||||
|
||||
type EventMeta struct {
|
||||
ID string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
TS any `json:"ts"`
|
||||
Seq *int64 `json:"seq,omitempty"`
|
||||
|
||||
Source Source `json:"source"`
|
||||
}
|
||||
|
||||
type Correlation struct {
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
RunID string `json:"run_id,omitempty"`
|
||||
TraceID string `json:"trace_id,omitempty"`
|
||||
SpanID string `json:"span_id,omitempty"`
|
||||
ParentSpanID string `json:"parent_span_id,omitempty"`
|
||||
}
|
||||
|
||||
type Envelope struct {
|
||||
Schema Schema `json:"schema"`
|
||||
Event EventMeta `json:"event"`
|
||||
Correlation *Correlation `json:"correlation,omitempty"`
|
||||
Attributes map[string]any `json:"attributes,omitempty"`
|
||||
Payload map[string]any `json:"payload,omitempty"`
|
||||
Raw map[string]any `json:"-"`
|
||||
Extra map[string]any `json:"-"`
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package httpx
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func WriteJSON(w http.ResponseWriter, status int, v any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
_ = json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
gnats "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type Publisher struct {
|
||||
conn *gnats.Conn
|
||||
topic string
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
func NewPublisher(url, topic string) (*Publisher, error) {
|
||||
conn, err := gnats.Connect(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Publisher{conn: conn, topic: topic, timeout: 5 * time.Second}, nil
|
||||
}
|
||||
|
||||
func (p *Publisher) Close() {
|
||||
p.conn.Close()
|
||||
}
|
||||
|
||||
func (p *Publisher) Publish(ctx context.Context, data []byte) error {
|
||||
ctx, cancel := context.WithTimeout(ctx, p.timeout)
|
||||
defer cancel()
|
||||
|
||||
_ = ctx
|
||||
return p.conn.Publish(p.topic, data)
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
gnats "github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type Subscriber struct {
|
||||
conn *gnats.Conn
|
||||
sub *gnats.Subscription
|
||||
topic string
|
||||
}
|
||||
|
||||
func NewSubscriber(url, topic string) (*Subscriber, error) {
|
||||
conn, err := gnats.Connect(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Subscriber{conn: conn, topic: topic}, nil
|
||||
}
|
||||
|
||||
func (s *Subscriber) Close() {
|
||||
if s.sub != nil {
|
||||
_ = s.sub.Unsubscribe()
|
||||
}
|
||||
s.conn.Close()
|
||||
}
|
||||
|
||||
func (s *Subscriber) Subscribe(ctx context.Context, handler func(msg []byte) error) error {
|
||||
sub, err := s.conn.Subscribe(s.topic, func(m *gnats.Msg) {
|
||||
_ = handler(m.Data)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.sub = sub
|
||||
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
type InsertEvent struct {
|
||||
EventID string
|
||||
TS time.Time
|
||||
Type string
|
||||
|
||||
SessionID sql.NullString
|
||||
RunID sql.NullString
|
||||
TraceID sql.NullString
|
||||
SpanID sql.NullString
|
||||
ParentSpanID sql.NullString
|
||||
|
||||
SourceFramework sql.NullString
|
||||
ClientID sql.NullString
|
||||
Payload any
|
||||
}
|
||||
|
||||
func (d *DB) InsertEvent(ctx context.Context, e InsertEvent) error {
|
||||
payload, err := json.Marshal(e.Payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = d.sql.ExecContext(ctx, `
|
||||
insert into events (
|
||||
event_id, ts, type, session_id, run_id, trace_id, span_id, parent_span_id,
|
||||
source_framework, client_id, payload
|
||||
) values (
|
||||
$1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11
|
||||
)
|
||||
on conflict (event_id) do nothing
|
||||
`,
|
||||
e.EventID,
|
||||
e.TS,
|
||||
e.Type,
|
||||
e.SessionID,
|
||||
e.RunID,
|
||||
e.TraceID,
|
||||
e.SpanID,
|
||||
e.ParentSpanID,
|
||||
e.SourceFramework,
|
||||
e.ClientID,
|
||||
payload,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
var ErrMissingField = errors.New("missing required field")
|
||||
@@ -0,0 +1,27 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "github.com/jackc/pgx/v5/stdlib"
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
sql *sql.DB
|
||||
}
|
||||
|
||||
func Open(url string) (*DB, error) {
|
||||
db, err := sql.Open("pgx", url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &DB{sql: db}, nil
|
||||
}
|
||||
|
||||
func (d *DB) Close() error {
|
||||
return d.sql.Close()
|
||||
}
|
||||
|
||||
func (d *DB) Ping(ctx context.Context) error {
|
||||
return d.sql.PingContext(ctx)
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
)
|
||||
|
||||
type EventRow struct {
|
||||
EventID string `json:"event_id"`
|
||||
TS time.Time `json:"ts"`
|
||||
Type string `json:"type"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
}
|
||||
|
||||
func (d *DB) ListRecentEvents(ctx context.Context, limit int) ([]EventRow, error) {
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
if limit > 1000 {
|
||||
limit = 1000
|
||||
}
|
||||
|
||||
rows, err := d.sql.QueryContext(ctx, `
|
||||
select event_id, ts, type, payload
|
||||
from events
|
||||
order by ts desc
|
||||
limit $1
|
||||
`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var out []EventRow
|
||||
for rows.Next() {
|
||||
var r EventRow
|
||||
if err := rows.Scan(&r.EventID, &r.TS, &r.Type, &r.Payload); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, r)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
Reference in New Issue
Block a user