Files
patterm/internal/protocol/transport.go
Harry Bayliss 95b1967e9b Fix daemon shutdown hang and concurrent-send race
- daemon_net: close the client transport on context cancellation so the
  per-connection Recv loop unblocks; otherwise wg.Wait() in the accept loop
  hung on a still-connected client and the daemon never stopped.
- protocol: guard ConnTransport.Send with a mutex so the subscriber pump and
  command handlers can push frames concurrently without racing the bufio.Writer.

Fixes TestDaemonDetachReattachPreservesProcess (now passes under -race).
2026-05-27 13:59:47 +01:00

81 lines
1.7 KiB
Go

package protocol
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"sync"
)
var ErrTransportClosed = errors.New("protocol: transport closed")
// Transport carries framed daemon/client protocol messages.
type Transport interface {
Send(Frame) error
Recv() (Frame, error)
Close() error
}
// ConnTransport is a JSON-lines implementation over a stream connection. Send
// is guarded by a mutex so the daemon can push frames from its subscriber pump
// and its command handlers concurrently; Close may be called from any goroutine
// (e.g. on context cancellation) to unblock a pending Recv.
type ConnTransport struct {
conn net.Conn
r *bufio.Reader
wmu sync.Mutex
w *bufio.Writer
}
func NewConnTransport(conn net.Conn) *ConnTransport {
return &ConnTransport{
conn: conn,
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
}
}
func (t *ConnTransport) Send(f Frame) error {
if t == nil || t.conn == nil {
return ErrTransportClosed
}
b, err := json.Marshal(f)
if err != nil {
return fmt.Errorf("protocol: encode frame: %w", err)
}
t.wmu.Lock()
defer t.wmu.Unlock()
if _, err := t.w.Write(append(b, '\n')); err != nil {
return err
}
return t.w.Flush()
}
func (t *ConnTransport) Recv() (Frame, error) {
if t == nil || t.conn == nil {
return Frame{}, ErrTransportClosed
}
line, err := t.r.ReadBytes('\n')
if err != nil {
if errors.Is(err, io.EOF) {
return Frame{}, ErrTransportClosed
}
return Frame{}, err
}
var f Frame
if err := json.Unmarshal(line, &f); err != nil {
return Frame{}, fmt.Errorf("protocol: decode frame: %w", err)
}
return f, nil
}
func (t *ConnTransport) Close() error {
if t == nil || t.conn == nil {
return nil
}
return t.conn.Close()
}