Fix daemon shutdown hang and concurrent-send race
- daemon_net: close the client transport on context cancellation so the per-connection Recv loop unblocks; otherwise wg.Wait() in the accept loop hung on a still-connected client and the daemon never stopped. - protocol: guard ConnTransport.Send with a mutex so the subscriber pump and command handlers can push frames concurrently without racing the bufio.Writer. Fixes TestDaemonDetachReattachPreservesProcess (now passes under -race).
This commit is contained in:
@@ -232,6 +232,14 @@ func handleDaemonAttach(ctx context.Context, registry *ProjectRegistry, t protoc
|
|||||||
_ = sendSnapshot(t, project, view.FocusedID)
|
_ = sendSnapshot(t, project, view.FocusedID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close the transport when the daemon context is cancelled (shutdown or
|
||||||
|
// `daemon stop`). Without this the t.Recv() loop below blocks forever on a
|
||||||
|
// still-connected client and the accept loop's wg.Wait() never returns.
|
||||||
|
go func() {
|
||||||
|
<-ctx.Done()
|
||||||
|
_ = t.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ErrTransportClosed = errors.New("protocol: transport closed")
|
var ErrTransportClosed = errors.New("protocol: transport closed")
|
||||||
@@ -18,10 +19,14 @@ type Transport interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConnTransport is a JSON-lines implementation over a stream connection.
|
// ConnTransport is a JSON-lines implementation over a stream connection. Send
|
||||||
|
// is guarded by a mutex so the daemon can push frames from its subscriber pump
|
||||||
|
// and its command handlers concurrently; Close may be called from any goroutine
|
||||||
|
// (e.g. on context cancellation) to unblock a pending Recv.
|
||||||
type ConnTransport struct {
|
type ConnTransport struct {
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
r *bufio.Reader
|
r *bufio.Reader
|
||||||
|
wmu sync.Mutex
|
||||||
w *bufio.Writer
|
w *bufio.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -41,6 +46,8 @@ func (t *ConnTransport) Send(f Frame) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("protocol: encode frame: %w", err)
|
return fmt.Errorf("protocol: encode frame: %w", err)
|
||||||
}
|
}
|
||||||
|
t.wmu.Lock()
|
||||||
|
defer t.wmu.Unlock()
|
||||||
if _, err := t.w.Write(append(b, '\n')); err != nil {
|
if _, err := t.w.Write(append(b, '\n')); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user