357 lines
7.4 KiB
Go
357 lines
7.4 KiB
Go
// Package sdk provides the agentmon emitter SDK for sending telemetry events.
|
|
package sdk
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
const (
|
|
schemaName = "agentmon.event"
|
|
schemaVersion = 1
|
|
)
|
|
|
|
// Emitter is the main client for sending agentmon events.
|
|
type Emitter struct {
|
|
config Config
|
|
httpClient *http.Client
|
|
wsClient *WSClient
|
|
buffer []Event
|
|
bufferSize int
|
|
mu sync.Mutex
|
|
closed bool
|
|
}
|
|
|
|
// Config holds emitter configuration.
|
|
type Config struct {
|
|
// ServerURL is the base URL of the ingest gateway (e.g., "http://localhost:8080")
|
|
ServerURL string
|
|
// APIKey is optional authentication key
|
|
APIKey string
|
|
// Framework is the name of the agent framework (e.g., "opencode", "claude-code")
|
|
Framework string
|
|
// ClientID is a stable identifier for this emitter instance
|
|
ClientID string
|
|
// Host is the hostname where events originate
|
|
Host string
|
|
// BufferSize is the max number of events to buffer before flushing
|
|
BufferSize int
|
|
// UseWebSocket enables WebSocket streaming mode
|
|
UseWebSocket bool
|
|
// EnableLogging enables debug logging
|
|
EnableLogging bool
|
|
}
|
|
|
|
// Event represents a complete agentmon event.
|
|
type Event map[string]any
|
|
|
|
// WSClient handles WebSocket communication with the ingest gateway.
|
|
type WSClient struct {
|
|
conn *websocket.Conn
|
|
sendChan chan []byte
|
|
ackChan chan int
|
|
mu sync.Mutex
|
|
closed bool
|
|
}
|
|
|
|
// NewWSClient creates a new WebSocket client.
|
|
func NewWSClient(url string) (*WSClient, error) {
|
|
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &WSClient{
|
|
conn: conn,
|
|
sendChan: make(chan []byte, 100),
|
|
ackChan: make(chan int, 1),
|
|
}, nil
|
|
}
|
|
|
|
// Run starts the WebSocket client's main loop.
|
|
func (w *WSClient) Run(ctx context.Context) {
|
|
defer w.Close()
|
|
|
|
go w.readMessages()
|
|
w.writeMessages()
|
|
}
|
|
|
|
// Send queues an event to be sent via WebSocket.
|
|
func (w *WSClient) Send(data []byte) error {
|
|
w.mu.Lock()
|
|
if w.closed {
|
|
w.mu.Unlock()
|
|
return fmt.Errorf("WebSocket client is closed")
|
|
}
|
|
w.mu.Unlock()
|
|
|
|
select {
|
|
case w.sendChan <- data:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("send buffer full")
|
|
}
|
|
}
|
|
|
|
// Close closes the WebSocket connection.
|
|
func (w *WSClient) Close() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
if w.closed {
|
|
return nil
|
|
}
|
|
w.closed = true
|
|
|
|
if w.conn != nil {
|
|
_ = w.conn.Close()
|
|
}
|
|
close(w.sendChan)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *WSClient) readMessages() {
|
|
for {
|
|
_, message, err := w.conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
|
|
log.Printf("WebSocket read error: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
var ack map[string]any
|
|
if err := json.Unmarshal(message, &ack); err != nil {
|
|
log.Printf("Failed to unmarshal ack: %v", err)
|
|
continue
|
|
}
|
|
|
|
if seq, ok := ack["ack"].(map[string]any)["up_to_seq"].(float64); ok {
|
|
select {
|
|
case w.ackChan <- int(seq):
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (w *WSClient) writeMessages() {
|
|
for data := range w.sendChan {
|
|
w.mu.Lock()
|
|
if w.closed {
|
|
w.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
err := w.conn.WriteMessage(websocket.TextMessage, data)
|
|
w.mu.Unlock()
|
|
|
|
if err != nil {
|
|
log.Printf("WebSocket write error: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewEmitter creates a new emitter with the given configuration.
|
|
func NewEmitter(cfg Config) (*Emitter, error) {
|
|
if cfg.ServerURL == "" {
|
|
return nil, fmt.Errorf("ServerURL is required")
|
|
}
|
|
if cfg.Framework == "" {
|
|
return nil, fmt.Errorf("Framework is required")
|
|
}
|
|
if cfg.ClientID == "" {
|
|
return nil, fmt.Errorf("ClientID is required")
|
|
}
|
|
if cfg.Host == "" {
|
|
cfg.Host = "localhost"
|
|
}
|
|
if cfg.BufferSize <= 0 {
|
|
cfg.BufferSize = 100
|
|
}
|
|
|
|
e := &Emitter{
|
|
config: cfg,
|
|
httpClient: &http.Client{Timeout: 30 * time.Second},
|
|
buffer: make([]Event, 0, cfg.BufferSize),
|
|
bufferSize: cfg.BufferSize,
|
|
}
|
|
|
|
if cfg.UseWebSocket {
|
|
wsURL := wsURLFromHTTP(cfg.ServerURL)
|
|
wsClient, err := NewWSClient(wsURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create WebSocket client: %w", err)
|
|
}
|
|
e.wsClient = wsClient
|
|
go e.wsClient.Run(context.Background())
|
|
}
|
|
|
|
return e, nil
|
|
}
|
|
|
|
// Emit sends a single event.
|
|
func (e *Emitter) Emit(ctx context.Context, event Event) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.closed {
|
|
return fmt.Errorf("emitter is closed")
|
|
}
|
|
|
|
if e.config.UseWebSocket {
|
|
data, err := json.Marshal(event)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal event: %w", err)
|
|
}
|
|
return e.wsClient.Send(data)
|
|
}
|
|
|
|
e.buffer = append(e.buffer, event)
|
|
if len(e.buffer) >= e.bufferSize {
|
|
return e.flushLocked(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Flush sends all buffered events to the server.
|
|
func (e *Emitter) Flush(ctx context.Context) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.closed {
|
|
return fmt.Errorf("emitter is closed")
|
|
}
|
|
|
|
return e.flushLocked(ctx)
|
|
}
|
|
|
|
// flushLocked sends buffered events. Caller must hold e.mu.
|
|
func (e *Emitter) flushLocked(ctx context.Context) error {
|
|
if len(e.buffer) == 0 {
|
|
return nil
|
|
}
|
|
|
|
if e.config.EnableLogging {
|
|
log.Printf("Flushing %d events", len(e.buffer))
|
|
}
|
|
|
|
events := make([]map[string]any, len(e.buffer))
|
|
for i, ev := range e.buffer {
|
|
events[i] = ev
|
|
}
|
|
|
|
resp, err := e.sendEvents(ctx, events)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send events: %w", err)
|
|
}
|
|
|
|
e.buffer = e.buffer[:0]
|
|
|
|
if resp.Rejected > 0 && e.config.EnableLogging {
|
|
log.Printf("Rejected %d events", resp.Rejected)
|
|
if len(resp.Errors) > 0 {
|
|
log.Printf("Errors: %v", resp.Errors)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close flushes any remaining events and closes the emitter.
|
|
func (e *Emitter) Close(ctx context.Context) error {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if e.closed {
|
|
return nil
|
|
}
|
|
|
|
e.closed = true
|
|
|
|
if e.wsClient != nil {
|
|
_ = e.wsClient.Close()
|
|
}
|
|
|
|
if len(e.buffer) > 0 {
|
|
_ = e.flushLocked(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type sendResponse struct {
|
|
Accepted int `json:"accepted"`
|
|
Rejected int `json:"rejected"`
|
|
Errors []struct {
|
|
Error string `json:"error"`
|
|
} `json:"errors,omitempty"`
|
|
}
|
|
|
|
func (e *Emitter) sendEvents(ctx context.Context, events []map[string]any) (*sendResponse, error) {
|
|
body, err := json.Marshal(events)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, "POST", e.config.ServerURL+"/v1/events", bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if e.config.APIKey != "" {
|
|
req.Header.Set("Authorization", "Bearer "+e.config.APIKey)
|
|
}
|
|
|
|
resp, err := e.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusAccepted {
|
|
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
|
|
}
|
|
|
|
var result sendResponse
|
|
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &result, nil
|
|
}
|
|
|
|
func wsURLFromHTTP(httpURL string) string {
|
|
switch {
|
|
case len(httpURL) >= 8 && httpURL[:8] == "https://":
|
|
return "wss://" + httpURL[8:] + "/v1/ws"
|
|
case len(httpURL) >= 7 && httpURL[:7] == "http://":
|
|
return "ws://" + httpURL[7:] + "/v1/ws"
|
|
default:
|
|
return httpURL + "/v1/ws"
|
|
}
|
|
}
|
|
|
|
// generateID creates a new UUID-like identifier.
|
|
func generateID() string {
|
|
b := make([]byte, 16)
|
|
if _, err := rand.Read(b); err != nil {
|
|
log.Printf("Failed to generate random ID: %v", err)
|
|
return fmt.Sprintf("%d", time.Now().UnixNano())
|
|
}
|
|
return hex.EncodeToString(b)
|
|
}
|