Scaffold loopback daemon client split
This commit is contained in:
122
internal/app/client_subscriber.go
Normal file
122
internal/app/client_subscriber.go
Normal file
@@ -0,0 +1,122 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"github.com/hjbdev/patterm/internal/protocol"
|
||||
)
|
||||
|
||||
const defaultClientSubscriberQueue = 256
|
||||
|
||||
// clientSubscriber is the daemon-to-client event bridge. Unlike daemon-local
|
||||
// listeners such as timers, debug capture, and waiters, it never blocks the PTY
|
||||
// pump: PTY chunks are copied before enqueue, and overflow marks the pane as
|
||||
// needing a fresh snapshot.
|
||||
type clientSubscriber struct {
|
||||
projectKey string
|
||||
frames chan protocol.Frame
|
||||
|
||||
mu sync.Mutex
|
||||
snapshotRequired map[string]bool
|
||||
lifecycleDirty bool
|
||||
}
|
||||
|
||||
func newClientSubscriber(projectKey string, size int) *clientSubscriber {
|
||||
if size <= 0 {
|
||||
size = defaultClientSubscriberQueue
|
||||
}
|
||||
return &clientSubscriber{
|
||||
projectKey: projectKey,
|
||||
frames: make(chan protocol.Frame, size),
|
||||
snapshotRequired: make(map[string]bool),
|
||||
lifecycleDirty: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) Recv() (protocol.Frame, bool) {
|
||||
f, ok := <-s.frames
|
||||
return f, ok
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) SnapshotRequired(childID string) bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.snapshotRequired[childID]
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) OnChildSpawned(c *Child) {
|
||||
s.sendLifecycle(protocol.LifecycleSpawned, c, "")
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) OnChildExited(c *Child) {
|
||||
s.sendLifecycle(protocol.LifecycleExited, c, "")
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) OnChildClosed(id string) {
|
||||
s.sendFrame(protocol.Frame{Type: protocol.FrameLifecycle, Payload: mustJSON(protocol.Lifecycle{
|
||||
Kind: protocol.LifecycleClosed,
|
||||
ProjectKey: s.projectKey,
|
||||
ChildID: id,
|
||||
})})
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) OnChildStateChanged(id string, state IdleState) {
|
||||
s.sendFrame(protocol.Frame{Type: protocol.FrameLifecycle, Payload: mustJSON(protocol.Lifecycle{
|
||||
Kind: protocol.LifecycleStateChanged,
|
||||
ProjectKey: s.projectKey,
|
||||
ChildID: id,
|
||||
State: string(state),
|
||||
})})
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) OnPTYOut(childID string, chunk []byte) {
|
||||
cp := append([]byte(nil), chunk...)
|
||||
f, err := protocol.NewFrame(protocol.FramePaneChunk, protocol.PaneChunk{PaneID: childID, Bytes: cp})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case s.frames <- f:
|
||||
default:
|
||||
s.mu.Lock()
|
||||
s.snapshotRequired[childID] = true
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) sendLifecycle(kind protocol.LifecycleKind, c *Child, state string) {
|
||||
var child json.RawMessage
|
||||
if c != nil {
|
||||
child = mustJSON(serializeChildModel(c))
|
||||
}
|
||||
childID := ""
|
||||
if c != nil {
|
||||
childID = c.ID
|
||||
}
|
||||
s.sendFrame(protocol.Frame{Type: protocol.FrameLifecycle, Payload: mustJSON(protocol.Lifecycle{
|
||||
Kind: kind,
|
||||
ProjectKey: s.projectKey,
|
||||
ChildID: childID,
|
||||
Child: child,
|
||||
State: state,
|
||||
})})
|
||||
}
|
||||
|
||||
func (s *clientSubscriber) sendFrame(f protocol.Frame) {
|
||||
select {
|
||||
case s.frames <- f:
|
||||
default:
|
||||
s.mu.Lock()
|
||||
s.lifecycleDirty = true
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func mustJSON(v any) json.RawMessage {
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return b
|
||||
}
|
||||
Reference in New Issue
Block a user