136 lines
3.3 KiB
Go
136 lines
3.3 KiB
Go
package app
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
|
|
"github.com/hjbdev/patterm/internal/protocol"
|
|
)
|
|
|
|
const defaultClientSubscriberQueue = 256
|
|
|
|
// clientSubscriber is the daemon-to-client event bridge. Unlike daemon-local
|
|
// listeners such as timers, debug capture, and waiters, it never blocks the PTY
|
|
// pump: PTY chunks are copied before enqueue, and overflow marks the pane as
|
|
// needing a fresh snapshot.
|
|
type clientSubscriber struct {
|
|
projectKey string
|
|
project *Project
|
|
clientID string
|
|
frames chan protocol.Frame
|
|
|
|
mu sync.Mutex
|
|
snapshotRequired map[string]bool
|
|
lifecycleDirty bool
|
|
}
|
|
|
|
func newClientSubscriber(project *Project, clientID string, size int) *clientSubscriber {
|
|
if size <= 0 {
|
|
size = defaultClientSubscriberQueue
|
|
}
|
|
projectKey := ""
|
|
if project != nil {
|
|
projectKey = project.Key
|
|
}
|
|
return &clientSubscriber{
|
|
projectKey: projectKey,
|
|
project: project,
|
|
clientID: clientID,
|
|
frames: make(chan protocol.Frame, size),
|
|
snapshotRequired: make(map[string]bool),
|
|
lifecycleDirty: false,
|
|
}
|
|
}
|
|
|
|
func (s *clientSubscriber) Recv() (protocol.Frame, bool) {
|
|
f, ok := <-s.frames
|
|
return f, ok
|
|
}
|
|
|
|
func (s *clientSubscriber) SnapshotRequired(childID string) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.snapshotRequired[childID]
|
|
}
|
|
|
|
func (s *clientSubscriber) OnChildSpawned(c *Child) {
|
|
s.sendLifecycle(protocol.LifecycleSpawned, c, "")
|
|
}
|
|
|
|
func (s *clientSubscriber) OnChildExited(c *Child) {
|
|
s.sendLifecycle(protocol.LifecycleExited, c, "")
|
|
}
|
|
|
|
func (s *clientSubscriber) OnChildClosed(id string) {
|
|
s.sendFrame(protocol.Frame{Type: protocol.FrameLifecycle, Payload: mustJSON(protocol.Lifecycle{
|
|
Kind: protocol.LifecycleClosed,
|
|
ProjectKey: s.projectKey,
|
|
ChildID: id,
|
|
})})
|
|
}
|
|
|
|
func (s *clientSubscriber) OnChildStateChanged(id string, state IdleState) {
|
|
s.sendFrame(protocol.Frame{Type: protocol.FrameLifecycle, Payload: mustJSON(protocol.Lifecycle{
|
|
Kind: protocol.LifecycleStateChanged,
|
|
ProjectKey: s.projectKey,
|
|
ChildID: id,
|
|
State: string(state),
|
|
})})
|
|
}
|
|
|
|
func (s *clientSubscriber) OnPTYOut(childID string, chunk []byte) {
|
|
cp := append([]byte(nil), chunk...)
|
|
var size protocol.Size
|
|
var ownerID string
|
|
if s.project != nil {
|
|
size, ownerID, _ = s.project.PaneDisplay(childID)
|
|
}
|
|
f, err := protocol.NewFrame(protocol.FramePaneChunk, protocol.PaneChunk{PaneID: childID, Bytes: cp, Size: size, DisplayOwner: ownerID == "" || ownerID == s.clientID})
|
|
if err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case s.frames <- f:
|
|
default:
|
|
s.mu.Lock()
|
|
s.snapshotRequired[childID] = true
|
|
s.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (s *clientSubscriber) sendLifecycle(kind protocol.LifecycleKind, c *Child, state string) {
|
|
var child json.RawMessage
|
|
if c != nil {
|
|
child = mustJSON(serializeChildModel(c))
|
|
}
|
|
childID := ""
|
|
if c != nil {
|
|
childID = c.ID
|
|
}
|
|
s.sendFrame(protocol.Frame{Type: protocol.FrameLifecycle, Payload: mustJSON(protocol.Lifecycle{
|
|
Kind: kind,
|
|
ProjectKey: s.projectKey,
|
|
ChildID: childID,
|
|
Child: child,
|
|
State: state,
|
|
})})
|
|
}
|
|
|
|
func (s *clientSubscriber) sendFrame(f protocol.Frame) {
|
|
select {
|
|
case s.frames <- f:
|
|
default:
|
|
s.mu.Lock()
|
|
s.lifecycleDirty = true
|
|
s.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func mustJSON(v any) json.RawMessage {
|
|
b, err := json.Marshal(v)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return b
|
|
}
|