handleConn processed requests serially, so a slow tool (e.g. wait_for_pattern with a 300s timeout) monopolized the single per-agent MCP connection and every queued call timed out behind it. Handle each request in its own goroutine, serialize responses through a per-conn write mutex (full response written atomically, partial writes handled), copy the request line before handing it off (bufio reuses its buffer), and wait on a WaitGroup before closing the conn so in-flight handlers finish cleanly. Greeting stays sequential; notifications still get no response. Resolves the [MCP TIMEOUT] TODO item.
191 lines
6.7 KiB
Go
191 lines
6.7 KiB
Go
package mcp
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"syscall"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/hjbdev/patterm/internal/scratchpad"
|
|
)
|
|
|
|
func TestHandleConnDispatchesRequestsConcurrently(t *testing.T) {
|
|
serverConn, clientConn := net.Pipe()
|
|
t.Cleanup(func() { _ = clientConn.Close() })
|
|
|
|
host := &blockingToolHost{
|
|
waitEntered: make(chan struct{}),
|
|
waitRelease: make(chan struct{}),
|
|
}
|
|
s := &Server{}
|
|
s.SetHost(host)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
s.handleConn(serverConn)
|
|
close(done)
|
|
}()
|
|
|
|
reader := bufio.NewReader(clientConn)
|
|
writeLine(t, clientConn, `{"patterm_identity":"ident"}`)
|
|
writeLine(t, clientConn, `{"jsonrpc":"2.0","id":1,"method":"wait_for_pattern","params":{"process_id":"p_slow","pattern":"never","timeout_seconds":300}}`)
|
|
select {
|
|
case <-host.waitEntered:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("wait_for_pattern did not enter fake host")
|
|
}
|
|
|
|
writeLine(t, clientConn, `{"jsonrpc":"2.0","id":2,"method":"get_process_status","params":{"process_id":"p_fast"}}`)
|
|
fast := readJSONRPCResponse(t, clientConn, reader, time.Second)
|
|
if got := string(fast.ID); got != "2" {
|
|
t.Fatalf("first response id = %s, want 2; response=%s", got, fast.Raw)
|
|
}
|
|
if fast.Error != nil {
|
|
t.Fatalf("fast response returned error: %+v", fast.Error)
|
|
}
|
|
|
|
_ = clientConn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
|
|
if line, err := reader.ReadBytes('\n'); err == nil {
|
|
t.Fatalf("slow response arrived before release: %s", line)
|
|
}
|
|
|
|
close(host.waitRelease)
|
|
slow := readJSONRPCResponse(t, clientConn, reader, time.Second)
|
|
if got := string(slow.ID); got != "1" {
|
|
t.Fatalf("second response id = %s, want 1; response=%s", got, slow.Raw)
|
|
}
|
|
if slow.Error != nil {
|
|
t.Fatalf("slow response returned error: %+v", slow.Error)
|
|
}
|
|
|
|
_ = clientConn.Close()
|
|
select {
|
|
case <-done:
|
|
case <-time.After(time.Second):
|
|
t.Fatal("handleConn did not exit after client close")
|
|
}
|
|
}
|
|
|
|
type jsonRPCResponse struct {
|
|
Raw string
|
|
ID json.RawMessage `json:"id"`
|
|
Result map[string]any `json:"result"`
|
|
Error *jsonRPCErrorShape `json:"error"`
|
|
}
|
|
|
|
type jsonRPCErrorShape struct {
|
|
Code int `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
func writeLine(t *testing.T, conn net.Conn, line string) {
|
|
t.Helper()
|
|
_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
|
|
if _, err := fmt.Fprintln(conn, line); err != nil {
|
|
t.Fatalf("write %s: %v", line, err)
|
|
}
|
|
}
|
|
|
|
func readJSONRPCResponse(t *testing.T, conn net.Conn, reader *bufio.Reader, timeout time.Duration) jsonRPCResponse {
|
|
t.Helper()
|
|
_ = conn.SetReadDeadline(time.Now().Add(timeout))
|
|
line, err := reader.ReadBytes('\n')
|
|
if err != nil {
|
|
t.Fatalf("read response: %v", err)
|
|
}
|
|
var resp jsonRPCResponse
|
|
resp.Raw = string(line)
|
|
if err := json.Unmarshal(line, &resp); err != nil {
|
|
t.Fatalf("parse response %s: %v", line, err)
|
|
}
|
|
return resp
|
|
}
|
|
|
|
type blockingToolHost struct {
|
|
waitEntered chan struct{}
|
|
waitRelease chan struct{}
|
|
waitOnce sync.Once
|
|
}
|
|
|
|
func (h *blockingToolHost) ResolveCallerIdentity(identity string) string { return "caller-" + identity }
|
|
func (h *blockingToolHost) CallerRole(string) CallerRole { return RoleOrchestrator }
|
|
func (h *blockingToolHost) SpawnAgent(string, SpawnAgentArgs) (ProcessInfo, error) {
|
|
return ProcessInfo{}, nil
|
|
}
|
|
func (h *blockingToolHost) SpawnProcess(string, SpawnProcessArgs) (ProcessInfo, error) {
|
|
return ProcessInfo{}, nil
|
|
}
|
|
func (h *blockingToolHost) StartProcess(string, string) (ProcessInfo, error) {
|
|
return ProcessInfo{}, nil
|
|
}
|
|
func (h *blockingToolHost) RestartProcess(string, string, syscall.Signal) (ProcessInfo, error) {
|
|
return ProcessInfo{}, nil
|
|
}
|
|
func (h *blockingToolHost) StopProcess(string, string, syscall.Signal) (ProcessInfo, error) {
|
|
return ProcessInfo{}, nil
|
|
}
|
|
func (h *blockingToolHost) CloseProcess(string, string) error { return nil }
|
|
func (h *blockingToolHost) RenameProcess(string, string, string) error { return nil }
|
|
func (h *blockingToolHost) SelectProcess(string, string) error { return nil }
|
|
func (h *blockingToolHost) ListProcesses(string, string) []ProcessInfo { return nil }
|
|
func (h *blockingToolHost) GetProcessStatus(string, string) (ProcessStatus, error) {
|
|
return ProcessStatus{ProcessInfo: ProcessInfo{ID: "p_fast", Status: "running"}}, nil
|
|
}
|
|
func (h *blockingToolHost) GetProjectStatus(string) (ProjectStatus, error) {
|
|
return ProjectStatus{}, nil
|
|
}
|
|
func (h *blockingToolHost) GetProcessOutput(string, string, string, int64) (ProcessOutput, error) {
|
|
return ProcessOutput{}, nil
|
|
}
|
|
func (h *blockingToolHost) GetProcessRawOutput(string, string, int64) (RawOutput, error) {
|
|
return RawOutput{}, nil
|
|
}
|
|
func (h *blockingToolHost) SearchOutput(string, string, string, string, int) (SearchResult, error) {
|
|
return SearchResult{}, nil
|
|
}
|
|
func (h *blockingToolHost) WaitForPattern(string, string, string, float64, string) (bool, string, error) {
|
|
h.waitOnce.Do(func() { close(h.waitEntered) })
|
|
<-h.waitRelease
|
|
return true, "matched", nil
|
|
}
|
|
func (h *blockingToolHost) GetProcessPorts(string, string) ([]PortSighting, error) {
|
|
return nil, nil
|
|
}
|
|
func (h *blockingToolHost) SendInput(string, SendInputArgs) (SendInputResult, error) {
|
|
return SendInputResult{}, nil
|
|
}
|
|
func (h *blockingToolHost) SendMessage(string, string, string) error { return nil }
|
|
func (h *blockingToolHost) RequestHumanAttention(string, string, string) error { return nil }
|
|
func (h *blockingToolHost) TimerWait(string, float64, string) (string, error) {
|
|
return "", nil
|
|
}
|
|
func (h *blockingToolHost) TimerSet(string, TimerSetArgs) (TimerHandle, error) {
|
|
return TimerHandle{}, nil
|
|
}
|
|
func (h *blockingToolHost) TimerFireWhenIdleAny(string, TimerFireWhenIdleArgs) (TimerFireWhenIdleResponse, error) {
|
|
return TimerFireWhenIdleResponse{}, nil
|
|
}
|
|
func (h *blockingToolHost) TimerFireWhenIdleAll(string, TimerFireWhenIdleArgs) (TimerFireWhenIdleResponse, error) {
|
|
return TimerFireWhenIdleResponse{}, nil
|
|
}
|
|
func (h *blockingToolHost) TimerCancel(string, string) error { return nil }
|
|
func (h *blockingToolHost) TimerPause(string, string) error { return nil }
|
|
func (h *blockingToolHost) TimerResume(string, string) error { return nil }
|
|
func (h *blockingToolHost) TimerList(string) ([]TimerInfo, error) {
|
|
return nil, nil
|
|
}
|
|
func (h *blockingToolHost) ScratchpadList() ([]scratchpad.Entry, error) { return nil, nil }
|
|
func (h *blockingToolHost) ScratchpadRead(string) (string, string, error) {
|
|
return "", "", nil
|
|
}
|
|
func (h *blockingToolHost) ScratchpadWrite(string, string, string) (string, error) {
|
|
return "", nil
|
|
}
|
|
func (h *blockingToolHost) ScratchpadAppend(string, string) error { return nil }
|
|
func (h *blockingToolHost) ScratchpadDelete(string) error { return nil }
|
|
func (h *blockingToolHost) WhoAmI(string) WhoAmI { return WhoAmI{} }
|
|
func (h *blockingToolHost) Help(string, string) HelpResponse { return HelpResponse{} }
|