wip
This commit is contained in:
@@ -12,9 +12,11 @@ import (
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/hjbdev/patterm/internal/persist"
|
||||
"github.com/hjbdev/patterm/internal/vt"
|
||||
)
|
||||
|
||||
@@ -38,8 +40,25 @@ type Session struct {
|
||||
|
||||
// listeners is the set of UI listeners that want to hear about child
|
||||
// lifecycle events (spawn/exit) — exactly one (the TUI) in v1.
|
||||
// listeners is an atomic.Pointer to a frozen slice. Subscribe
|
||||
// copy-on-writes the slice; emit* paths use a single atomic Load.
|
||||
// This drops one mutex acquisition per PTY chunk on the hot path.
|
||||
listenersMu sync.Mutex
|
||||
listeners []ChildEventListener
|
||||
listeners atomic.Pointer[[]ChildEventListener]
|
||||
|
||||
// persistStore records top-level command entries to a per-project
|
||||
// JSON file so they can be re-spawned after patterm restarts.
|
||||
// Optional; nil means "no persistence" (used by unit tests).
|
||||
persistStore *persist.Store
|
||||
}
|
||||
|
||||
// SetPersistStore attaches a process-persistence store. Future Spawn /
|
||||
// Close / Rename / SetAutoRestart calls on top-level command entries
|
||||
// will mirror the change into the store.
|
||||
func (s *Session) SetPersistStore(p *persist.Store) {
|
||||
s.mu.Lock()
|
||||
s.persistStore = p
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// ChildEventListener is implemented by the TUI to react to lifecycle
|
||||
@@ -65,32 +84,58 @@ func NewSession(projectDir, projectKey string) *Session {
|
||||
func (s *Session) Subscribe(l ChildEventListener) {
|
||||
s.listenersMu.Lock()
|
||||
defer s.listenersMu.Unlock()
|
||||
s.listeners = append(s.listeners, l)
|
||||
prev := s.listenersSnapshot()
|
||||
next := make([]ChildEventListener, 0, len(prev)+1)
|
||||
next = append(next, prev...)
|
||||
next = append(next, l)
|
||||
s.listeners.Store(&next)
|
||||
}
|
||||
|
||||
// Unsubscribe removes a previously-registered listener. Safe to call
|
||||
// with a listener that wasn't registered (no-op).
|
||||
func (s *Session) Unsubscribe(l ChildEventListener) {
|
||||
s.listenersMu.Lock()
|
||||
defer s.listenersMu.Unlock()
|
||||
prev := s.listenersSnapshot()
|
||||
if len(prev) == 0 {
|
||||
return
|
||||
}
|
||||
next := make([]ChildEventListener, 0, len(prev))
|
||||
for _, e := range prev {
|
||||
if e != l {
|
||||
next = append(next, e)
|
||||
}
|
||||
}
|
||||
s.listeners.Store(&next)
|
||||
}
|
||||
|
||||
// listenersSnapshot returns the frozen listener slice. Safe to call
|
||||
// without the listeners mutex.
|
||||
func (s *Session) listenersSnapshot() []ChildEventListener {
|
||||
p := s.listeners.Load()
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return *p
|
||||
}
|
||||
|
||||
func (s *Session) emitSpawn(c *Child) {
|
||||
s.listenersMu.Lock()
|
||||
ls := append([]ChildEventListener(nil), s.listeners...)
|
||||
s.listenersMu.Unlock()
|
||||
for _, l := range ls {
|
||||
for _, l := range s.listenersSnapshot() {
|
||||
l.OnChildSpawned(c)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) emitExit(c *Child) {
|
||||
s.listenersMu.Lock()
|
||||
ls := append([]ChildEventListener(nil), s.listeners...)
|
||||
s.listenersMu.Unlock()
|
||||
for _, l := range ls {
|
||||
for _, l := range s.listenersSnapshot() {
|
||||
l.OnChildExited(c)
|
||||
}
|
||||
}
|
||||
|
||||
// emitPTYOut dispatches a fresh PTY chunk to every listener. Listeners
|
||||
// MUST NOT retain `chunk` past return — the slice is owned by the
|
||||
// pumpChild read buffer and is overwritten on the next read.
|
||||
func (s *Session) emitPTYOut(id string, chunk []byte) {
|
||||
s.listenersMu.Lock()
|
||||
ls := append([]ChildEventListener(nil), s.listeners...)
|
||||
s.listenersMu.Unlock()
|
||||
for _, l := range ls {
|
||||
for _, l := range s.listenersSnapshot() {
|
||||
l.OnPTYOut(id, chunk)
|
||||
}
|
||||
}
|
||||
@@ -162,14 +207,67 @@ func (s *Session) Spawn(spec SpawnSpec, cols, rows uint16) (*Child, error) {
|
||||
s.mu.Lock()
|
||||
s.children[id] = c
|
||||
s.order = append(s.order, id)
|
||||
store := s.persistStore
|
||||
s.mu.Unlock()
|
||||
|
||||
// Wire persistence callback BEFORE registering so SetName /
|
||||
// SetAutoRestart calls that race the listener still hit the store.
|
||||
if store != nil {
|
||||
c.setPersistFn(func(ch *Child) {
|
||||
s.persistEntry(ch)
|
||||
})
|
||||
s.persistEntry(c)
|
||||
}
|
||||
|
||||
s.emitSpawn(c)
|
||||
go s.pumpChild(c, runID)
|
||||
go s.reapChild(c, runID)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// persistEntry writes (or refreshes) the persist record for c if it
|
||||
// qualifies — top-level command entries only. No-op when no store is
|
||||
// attached.
|
||||
func (s *Session) persistEntry(c *Child) {
|
||||
s.mu.Lock()
|
||||
store := s.persistStore
|
||||
s.mu.Unlock()
|
||||
if store == nil || !shouldPersist(c) {
|
||||
return
|
||||
}
|
||||
e := persist.Entry{
|
||||
ID: c.ID,
|
||||
Name: c.DisplayName(),
|
||||
Argv: append([]string(nil), c.Argv...),
|
||||
WorkDir: c.WorkDir,
|
||||
PresetRef: c.PresetRef,
|
||||
AutoRestart: c.AutoRestart(),
|
||||
}
|
||||
if err := store.Save(e); err != nil {
|
||||
logf("persist save %s: %v", c.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) forgetPersisted(id string) {
|
||||
s.mu.Lock()
|
||||
store := s.persistStore
|
||||
s.mu.Unlock()
|
||||
if store == nil {
|
||||
return
|
||||
}
|
||||
if err := store.Remove(id); err != nil {
|
||||
logf("persist remove %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// shouldPersist gates which Child entries get mirrored into the
|
||||
// persist store. v1 only restores top-level command entries — agents
|
||||
// and terminals are ephemeral by design, and sub-agent-spawned
|
||||
// commands belong to their orchestrator's lifecycle.
|
||||
func shouldPersist(c *Child) bool {
|
||||
return c != nil && c.Kind == KindCommand && c.ParentID == ""
|
||||
}
|
||||
|
||||
// Start (re)attaches a PTY to an entry that is currently stopped or
|
||||
// exited. Errors if the entry is already live.
|
||||
func (s *Session) Start(id string, cols, rows uint16) error {
|
||||
@@ -238,6 +336,7 @@ func (s *Session) Close(id string, sig syscall.Signal) error {
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
s.forgetPersisted(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -257,6 +356,12 @@ func (s *Session) pumpChild(c *Child, runID uint64) {
|
||||
if pty == nil {
|
||||
return
|
||||
}
|
||||
// One PTY read buffer per pump goroutine. Consumers downstream
|
||||
// (em.Write is synchronous through CGO; recordWrite append-copies
|
||||
// into the ring; renderer.Render copies into its pending buffer)
|
||||
// all complete or copy before returning, so the buffer can be
|
||||
// reused without aliasing live data. See ChildEventListener.OnPTYOut
|
||||
// docstring — listeners must not retain `chunk`.
|
||||
buf := make([]byte, 64*1024)
|
||||
for {
|
||||
n, err := pty.Read(buf)
|
||||
@@ -264,8 +369,7 @@ func (s *Session) pumpChild(c *Child, runID uint64) {
|
||||
if !c.isCurrentRun(runID) {
|
||||
return
|
||||
}
|
||||
chunk := make([]byte, n)
|
||||
copy(chunk, buf[:n])
|
||||
chunk := buf[:n]
|
||||
if em := c.Emulator(); em != nil {
|
||||
if _, werr := em.Write(chunk); werr != nil {
|
||||
logf("emulator.Write(child %s): %v", c.ID, werr)
|
||||
|
||||
Reference in New Issue
Block a user