Add daemon client protocol frames
This commit is contained in:
67
internal/protocol/loopback.go
Normal file
67
internal/protocol/loopback.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package protocol
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
const defaultLoopbackBuffer = 64
|
||||
|
||||
// NewLoopbackPair returns connected in-process transports. Frames cross the
|
||||
// same Send/Recv boundary as network transports, but payload bytes are passed
|
||||
// directly without JSON re-encoding.
|
||||
func NewLoopbackPair() (client Transport, daemon Transport) {
|
||||
c2d := make(chan Frame, defaultLoopbackBuffer)
|
||||
d2c := make(chan Frame, defaultLoopbackBuffer)
|
||||
return &loopbackTransport{send: c2d, recv: d2c}, &loopbackTransport{send: d2c, recv: c2d}
|
||||
}
|
||||
|
||||
type loopbackTransport struct {
|
||||
send chan<- Frame
|
||||
recv <-chan Frame
|
||||
once sync.Once
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (t *loopbackTransport) init() {
|
||||
if t.done == nil {
|
||||
t.done = make(chan struct{})
|
||||
}
|
||||
}
|
||||
|
||||
func (t *loopbackTransport) Send(f Frame) error {
|
||||
t.init()
|
||||
select {
|
||||
case <-t.done:
|
||||
return ErrTransportClosed
|
||||
case t.send <- cloneFrame(f):
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t *loopbackTransport) Recv() (Frame, error) {
|
||||
t.init()
|
||||
select {
|
||||
case <-t.done:
|
||||
return Frame{}, ErrTransportClosed
|
||||
case f, ok := <-t.recv:
|
||||
if !ok {
|
||||
return Frame{}, ErrTransportClosed
|
||||
}
|
||||
return f, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (t *loopbackTransport) Close() error {
|
||||
t.init()
|
||||
t.once.Do(func() {
|
||||
close(t.done)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func cloneFrame(f Frame) Frame {
|
||||
if len(f.Payload) > 0 {
|
||||
f.Payload = append([]byte(nil), f.Payload...)
|
||||
}
|
||||
return f
|
||||
}
|
||||
Reference in New Issue
Block a user