From 7b5a22618f8454d0b97a232c95ebd5915f8b4606 Mon Sep 17 00:00:00 2001 From: Harry Bayliss Date: Mon, 25 May 2026 12:39:31 +0100 Subject: [PATCH] Dispatch MCP requests concurrently per connection handleConn processed requests serially, so a slow tool (e.g. wait_for_pattern with a 300s timeout) monopolized the single per-agent MCP connection and every queued call timed out behind it. Handle each request in its own goroutine, serialize responses through a per-conn write mutex (full response written atomically, partial writes handled), copy the request line before handing it off (bufio reuses its buffer), and wait on a WaitGroup before closing the conn so in-flight handlers finish cleanly. Greeting stays sequential; notifications still get no response. Resolves the [MCP TIMEOUT] TODO item. --- CHANGELOG.md | 2 + TODO.md | 11 --- internal/mcp/mcp.go | 47 +++++++--- internal/mcp/mcp_test.go | 190 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 226 insertions(+), 24 deletions(-) create mode 100644 internal/mcp/mcp_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fdb4c2..13a1658 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ loosely follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html). over MCP. ### Fixed +- Slow MCP tool calls such as `wait_for_pattern` no longer block later + tool calls on the same MCP connection. - Closing an agent now escalates from SIGTERM to SIGKILL when needed, so agents that ignore SIGTERM disappear from the running tab bar after one Close action while keeping their exited pane readable. diff --git a/TODO.md b/TODO.md index e832166..7f9b34a 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,4 @@ - [ ] Codex idle detection seems to trigger too soon, see below [CODEX IDLE] -- [ ] Issue with mcp timing out [MCP TIMEOUT] - [ ] When opening a codex sub agent, the message gets input to the field, but the message is never submitted. - This appears to be inconsistent. Sometimes it works, sometimes it doesn't. Might be because of popups on codex sub agents? - Question: when it fails, is a Codex startup popup visible (trust/workspace, auth/model selection, permissions), or is the normal composer focused? @@ -60,13 +59,3 @@ Crunched for 1m 57s ✔ Set up worktree for issue 136 implementation │ ✔ Draft implementation plan │ … +2 completed │ - - -# [MCP TIMEOUT] -⚙ patterm_send_input [key=enter, kind=key, process_id=p_a6726d, submit=false, tail_mode=stream, text=, wait_ms=1000] │ - │ -⚙ patterm_wait_for_pattern [pattern=Findings|No findings|No issues|Residual risk, process_id=p_a6726d, scope=scrollback, timeout_seconds=300] │ -MCP error -32001: Request timed out │ - │ -⚙ patterm_get_process_status [process_id=p_a6726d] │ -MCP error -32001: Request timed out │ diff --git a/internal/mcp/mcp.go b/internal/mcp/mcp.go index c548ec3..c1df603 100644 --- a/internal/mcp/mcp.go +++ b/internal/mcp/mcp.go @@ -96,10 +96,34 @@ func (s *Server) acceptLoop() { // identity token (SPEC §10); we resolve it to a child id and stash that // as the caller for every subsequent tool call. func (s *Server) handleConn(conn net.Conn) { - defer conn.Close() + var writeMu sync.Mutex + var wg sync.WaitGroup + defer func() { + wg.Wait() + _ = conn.Close() + }() r := bufio.NewReader(conn) var callerID string + writeResp := func(resp []byte) bool { + if resp == nil { + return true + } + resp = append(resp, '\n') + writeMu.Lock() + defer writeMu.Unlock() + for len(resp) > 0 { + n, err := conn.Write(resp) + if err != nil { + return false + } + if n == 0 { + return false + } + resp = resp[n:] + } + return true + } greeting, err := r.ReadBytes('\n') if err != nil { @@ -115,24 +139,21 @@ func (s *Server) handleConn(conn net.Conn) { } else { // Treat as a real request from an unknown caller. resp := s.dispatch("", greeting) - if resp != nil { - resp = append(resp, '\n') - if _, werr := conn.Write(resp); werr != nil { - return - } + if !writeResp(resp) { + return } } for { line, err := r.ReadBytes('\n') if len(line) > 0 { - resp := s.dispatch(callerID, line) - if resp != nil { - resp = append(resp, '\n') - if _, werr := conn.Write(resp); werr != nil { - return - } - } + req := append([]byte(nil), line...) + wg.Add(1) + go func() { + defer wg.Done() + resp := s.dispatch(callerID, req) + _ = writeResp(resp) + }() } if err != nil { return diff --git a/internal/mcp/mcp_test.go b/internal/mcp/mcp_test.go new file mode 100644 index 0000000..066f080 --- /dev/null +++ b/internal/mcp/mcp_test.go @@ -0,0 +1,190 @@ +package mcp + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "sync" + "syscall" + "testing" + "time" + + "github.com/hjbdev/patterm/internal/scratchpad" +) + +func TestHandleConnDispatchesRequestsConcurrently(t *testing.T) { + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { _ = clientConn.Close() }) + + host := &blockingToolHost{ + waitEntered: make(chan struct{}), + waitRelease: make(chan struct{}), + } + s := &Server{} + s.SetHost(host) + done := make(chan struct{}) + go func() { + s.handleConn(serverConn) + close(done) + }() + + reader := bufio.NewReader(clientConn) + writeLine(t, clientConn, `{"patterm_identity":"ident"}`) + writeLine(t, clientConn, `{"jsonrpc":"2.0","id":1,"method":"wait_for_pattern","params":{"process_id":"p_slow","pattern":"never","timeout_seconds":300}}`) + select { + case <-host.waitEntered: + case <-time.After(time.Second): + t.Fatal("wait_for_pattern did not enter fake host") + } + + writeLine(t, clientConn, `{"jsonrpc":"2.0","id":2,"method":"get_process_status","params":{"process_id":"p_fast"}}`) + fast := readJSONRPCResponse(t, clientConn, reader, time.Second) + if got := string(fast.ID); got != "2" { + t.Fatalf("first response id = %s, want 2; response=%s", got, fast.Raw) + } + if fast.Error != nil { + t.Fatalf("fast response returned error: %+v", fast.Error) + } + + _ = clientConn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) + if line, err := reader.ReadBytes('\n'); err == nil { + t.Fatalf("slow response arrived before release: %s", line) + } + + close(host.waitRelease) + slow := readJSONRPCResponse(t, clientConn, reader, time.Second) + if got := string(slow.ID); got != "1" { + t.Fatalf("second response id = %s, want 1; response=%s", got, slow.Raw) + } + if slow.Error != nil { + t.Fatalf("slow response returned error: %+v", slow.Error) + } + + _ = clientConn.Close() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("handleConn did not exit after client close") + } +} + +type jsonRPCResponse struct { + Raw string + ID json.RawMessage `json:"id"` + Result map[string]any `json:"result"` + Error *jsonRPCErrorShape `json:"error"` +} + +type jsonRPCErrorShape struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func writeLine(t *testing.T, conn net.Conn, line string) { + t.Helper() + _ = conn.SetWriteDeadline(time.Now().Add(time.Second)) + if _, err := fmt.Fprintln(conn, line); err != nil { + t.Fatalf("write %s: %v", line, err) + } +} + +func readJSONRPCResponse(t *testing.T, conn net.Conn, reader *bufio.Reader, timeout time.Duration) jsonRPCResponse { + t.Helper() + _ = conn.SetReadDeadline(time.Now().Add(timeout)) + line, err := reader.ReadBytes('\n') + if err != nil { + t.Fatalf("read response: %v", err) + } + var resp jsonRPCResponse + resp.Raw = string(line) + if err := json.Unmarshal(line, &resp); err != nil { + t.Fatalf("parse response %s: %v", line, err) + } + return resp +} + +type blockingToolHost struct { + waitEntered chan struct{} + waitRelease chan struct{} + waitOnce sync.Once +} + +func (h *blockingToolHost) ResolveCallerIdentity(identity string) string { return "caller-" + identity } +func (h *blockingToolHost) CallerRole(string) CallerRole { return RoleOrchestrator } +func (h *blockingToolHost) SpawnAgent(string, SpawnAgentArgs) (ProcessInfo, error) { + return ProcessInfo{}, nil +} +func (h *blockingToolHost) SpawnProcess(string, SpawnProcessArgs) (ProcessInfo, error) { + return ProcessInfo{}, nil +} +func (h *blockingToolHost) StartProcess(string, string) (ProcessInfo, error) { + return ProcessInfo{}, nil +} +func (h *blockingToolHost) RestartProcess(string, string, syscall.Signal) (ProcessInfo, error) { + return ProcessInfo{}, nil +} +func (h *blockingToolHost) StopProcess(string, string, syscall.Signal) (ProcessInfo, error) { + return ProcessInfo{}, nil +} +func (h *blockingToolHost) CloseProcess(string, string) error { return nil } +func (h *blockingToolHost) RenameProcess(string, string, string) error { return nil } +func (h *blockingToolHost) SelectProcess(string, string) error { return nil } +func (h *blockingToolHost) ListProcesses(string, string) []ProcessInfo { return nil } +func (h *blockingToolHost) GetProcessStatus(string, string) (ProcessStatus, error) { + return ProcessStatus{ProcessInfo: ProcessInfo{ID: "p_fast", Status: "running"}}, nil +} +func (h *blockingToolHost) GetProjectStatus(string) (ProjectStatus, error) { + return ProjectStatus{}, nil +} +func (h *blockingToolHost) GetProcessOutput(string, string, string, int64) (ProcessOutput, error) { + return ProcessOutput{}, nil +} +func (h *blockingToolHost) GetProcessRawOutput(string, string, int64) (RawOutput, error) { + return RawOutput{}, nil +} +func (h *blockingToolHost) SearchOutput(string, string, string, string, int) (SearchResult, error) { + return SearchResult{}, nil +} +func (h *blockingToolHost) WaitForPattern(string, string, string, float64, string) (bool, string, error) { + h.waitOnce.Do(func() { close(h.waitEntered) }) + <-h.waitRelease + return true, "matched", nil +} +func (h *blockingToolHost) GetProcessPorts(string, string) ([]PortSighting, error) { + return nil, nil +} +func (h *blockingToolHost) SendInput(string, SendInputArgs) (SendInputResult, error) { + return SendInputResult{}, nil +} +func (h *blockingToolHost) SendMessage(string, string, string) error { return nil } +func (h *blockingToolHost) RequestHumanAttention(string, string, string) error { return nil } +func (h *blockingToolHost) TimerWait(string, float64, string) (string, error) { + return "", nil +} +func (h *blockingToolHost) TimerSet(string, TimerSetArgs) (TimerHandle, error) { + return TimerHandle{}, nil +} +func (h *blockingToolHost) TimerFireWhenIdleAny(string, TimerFireWhenIdleArgs) (TimerFireWhenIdleResponse, error) { + return TimerFireWhenIdleResponse{}, nil +} +func (h *blockingToolHost) TimerFireWhenIdleAll(string, TimerFireWhenIdleArgs) (TimerFireWhenIdleResponse, error) { + return TimerFireWhenIdleResponse{}, nil +} +func (h *blockingToolHost) TimerCancel(string, string) error { return nil } +func (h *blockingToolHost) TimerPause(string, string) error { return nil } +func (h *blockingToolHost) TimerResume(string, string) error { return nil } +func (h *blockingToolHost) TimerList(string) ([]TimerInfo, error) { + return nil, nil +} +func (h *blockingToolHost) ScratchpadList() ([]scratchpad.Entry, error) { return nil, nil } +func (h *blockingToolHost) ScratchpadRead(string) (string, string, error) { + return "", "", nil +} +func (h *blockingToolHost) ScratchpadWrite(string, string, string) (string, error) { + return "", nil +} +func (h *blockingToolHost) ScratchpadAppend(string, string) error { return nil } +func (h *blockingToolHost) ScratchpadDelete(string) error { return nil } +func (h *blockingToolHost) WhoAmI(string) WhoAmI { return WhoAmI{} } +func (h *blockingToolHost) Help(string, string) HelpResponse { return HelpResponse{} }