package protocol import ( "bufio" "encoding/json" "errors" "fmt" "io" "net" "sync" ) var ErrTransportClosed = errors.New("protocol: transport closed") // Transport carries framed daemon/client protocol messages. type Transport interface { Send(Frame) error Recv() (Frame, error) Close() error } // ConnTransport is a JSON-lines implementation over a stream connection. Send // is guarded by a mutex so the daemon can push frames from its subscriber pump // and its command handlers concurrently; Close may be called from any goroutine // (e.g. on context cancellation) to unblock a pending Recv. type ConnTransport struct { conn net.Conn r *bufio.Reader wmu sync.Mutex w *bufio.Writer } func NewConnTransport(conn net.Conn) *ConnTransport { return &ConnTransport{ conn: conn, r: bufio.NewReader(conn), w: bufio.NewWriter(conn), } } func (t *ConnTransport) Send(f Frame) error { if t == nil || t.conn == nil { return ErrTransportClosed } b, err := json.Marshal(f) if err != nil { return fmt.Errorf("protocol: encode frame: %w", err) } t.wmu.Lock() defer t.wmu.Unlock() if _, err := t.w.Write(append(b, '\n')); err != nil { return err } return t.w.Flush() } func (t *ConnTransport) Recv() (Frame, error) { if t == nil || t.conn == nil { return Frame{}, ErrTransportClosed } line, err := t.r.ReadBytes('\n') if err != nil { if errors.Is(err, io.EOF) { return Frame{}, ErrTransportClosed } return Frame{}, err } var f Frame if err := json.Unmarshal(line, &f); err != nil { return Frame{}, fmt.Errorf("protocol: decode frame: %w", err) } return f, nil } func (t *ConnTransport) Close() error { if t == nil || t.conn == nil { return nil } return t.conn.Close() }