- daemon_net: close the client transport on context cancellation so the per-connection Recv loop unblocks; otherwise wg.Wait() in the accept loop hung on a still-connected client and the daemon never stopped. - protocol: guard ConnTransport.Send with a mutex so the subscriber pump and command handlers can push frames concurrently without racing the bufio.Writer. Fixes TestDaemonDetachReattachPreservesProcess (now passes under -race).
81 lines
1.7 KiB
Go
81 lines
1.7 KiB
Go
package protocol
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
)
|
|
|
|
var ErrTransportClosed = errors.New("protocol: transport closed")
|
|
|
|
// Transport carries framed daemon/client protocol messages.
|
|
type Transport interface {
|
|
Send(Frame) error
|
|
Recv() (Frame, error)
|
|
Close() error
|
|
}
|
|
|
|
// ConnTransport is a JSON-lines implementation over a stream connection. Send
|
|
// is guarded by a mutex so the daemon can push frames from its subscriber pump
|
|
// and its command handlers concurrently; Close may be called from any goroutine
|
|
// (e.g. on context cancellation) to unblock a pending Recv.
|
|
type ConnTransport struct {
|
|
conn net.Conn
|
|
r *bufio.Reader
|
|
wmu sync.Mutex
|
|
w *bufio.Writer
|
|
}
|
|
|
|
func NewConnTransport(conn net.Conn) *ConnTransport {
|
|
return &ConnTransport{
|
|
conn: conn,
|
|
r: bufio.NewReader(conn),
|
|
w: bufio.NewWriter(conn),
|
|
}
|
|
}
|
|
|
|
func (t *ConnTransport) Send(f Frame) error {
|
|
if t == nil || t.conn == nil {
|
|
return ErrTransportClosed
|
|
}
|
|
b, err := json.Marshal(f)
|
|
if err != nil {
|
|
return fmt.Errorf("protocol: encode frame: %w", err)
|
|
}
|
|
t.wmu.Lock()
|
|
defer t.wmu.Unlock()
|
|
if _, err := t.w.Write(append(b, '\n')); err != nil {
|
|
return err
|
|
}
|
|
return t.w.Flush()
|
|
}
|
|
|
|
func (t *ConnTransport) Recv() (Frame, error) {
|
|
if t == nil || t.conn == nil {
|
|
return Frame{}, ErrTransportClosed
|
|
}
|
|
line, err := t.r.ReadBytes('\n')
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return Frame{}, ErrTransportClosed
|
|
}
|
|
return Frame{}, err
|
|
}
|
|
var f Frame
|
|
if err := json.Unmarshal(line, &f); err != nil {
|
|
return Frame{}, fmt.Errorf("protocol: decode frame: %w", err)
|
|
}
|
|
return f, nil
|
|
}
|
|
|
|
func (t *ConnTransport) Close() error {
|
|
if t == nil || t.conn == nil {
|
|
return nil
|
|
}
|
|
return t.conn.Close()
|
|
}
|