package mcp import ( "bufio" "encoding/json" "fmt" "net" "sync" "syscall" "testing" "time" "github.com/hjbdev/patterm/internal/scratchpad" ) func TestHandleConnDispatchesRequestsConcurrently(t *testing.T) { serverConn, clientConn := net.Pipe() t.Cleanup(func() { _ = clientConn.Close() }) host := &blockingToolHost{ waitEntered: make(chan struct{}), waitRelease: make(chan struct{}), } s := &Server{} s.SetHost(host) done := make(chan struct{}) go func() { s.handleConn(serverConn) close(done) }() reader := bufio.NewReader(clientConn) writeLine(t, clientConn, `{"patterm_identity":"ident"}`) writeLine(t, clientConn, `{"jsonrpc":"2.0","id":1,"method":"wait_for_pattern","params":{"process_id":"p_slow","pattern":"never","timeout_seconds":300}}`) select { case <-host.waitEntered: case <-time.After(time.Second): t.Fatal("wait_for_pattern did not enter fake host") } writeLine(t, clientConn, `{"jsonrpc":"2.0","id":2,"method":"get_process_status","params":{"process_id":"p_fast"}}`) fast := readJSONRPCResponse(t, clientConn, reader, time.Second) if got := string(fast.ID); got != "2" { t.Fatalf("first response id = %s, want 2; response=%s", got, fast.Raw) } if fast.Error != nil { t.Fatalf("fast response returned error: %+v", fast.Error) } _ = clientConn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) if line, err := reader.ReadBytes('\n'); err == nil { t.Fatalf("slow response arrived before release: %s", line) } close(host.waitRelease) slow := readJSONRPCResponse(t, clientConn, reader, time.Second) if got := string(slow.ID); got != "1" { t.Fatalf("second response id = %s, want 1; response=%s", got, slow.Raw) } if slow.Error != nil { t.Fatalf("slow response returned error: %+v", slow.Error) } _ = clientConn.Close() select { case <-done: case <-time.After(time.Second): t.Fatal("handleConn did not exit after client close") } } type jsonRPCResponse struct { Raw string ID json.RawMessage `json:"id"` Result map[string]any `json:"result"` Error *jsonRPCErrorShape `json:"error"` } type jsonRPCErrorShape struct { Code int `json:"code"` Message string `json:"message"` } func writeLine(t *testing.T, conn net.Conn, line string) { t.Helper() _ = conn.SetWriteDeadline(time.Now().Add(time.Second)) if _, err := fmt.Fprintln(conn, line); err != nil { t.Fatalf("write %s: %v", line, err) } } func readJSONRPCResponse(t *testing.T, conn net.Conn, reader *bufio.Reader, timeout time.Duration) jsonRPCResponse { t.Helper() _ = conn.SetReadDeadline(time.Now().Add(timeout)) line, err := reader.ReadBytes('\n') if err != nil { t.Fatalf("read response: %v", err) } var resp jsonRPCResponse resp.Raw = string(line) if err := json.Unmarshal(line, &resp); err != nil { t.Fatalf("parse response %s: %v", line, err) } return resp } type blockingToolHost struct { waitEntered chan struct{} waitRelease chan struct{} waitOnce sync.Once } func (h *blockingToolHost) ResolveCallerIdentity(identity string) string { return "caller-" + identity } func (h *blockingToolHost) CallerRole(string) CallerRole { return RoleOrchestrator } func (h *blockingToolHost) SpawnAgent(string, SpawnAgentArgs) (ProcessInfo, error) { return ProcessInfo{}, nil } func (h *blockingToolHost) SpawnProcess(string, SpawnProcessArgs) (ProcessInfo, error) { return ProcessInfo{}, nil } func (h *blockingToolHost) StartProcess(string, string) (ProcessInfo, error) { return ProcessInfo{}, nil } func (h *blockingToolHost) RestartProcess(string, string, syscall.Signal) (ProcessInfo, error) { return ProcessInfo{}, nil } func (h *blockingToolHost) StopProcess(string, string, syscall.Signal) (ProcessInfo, error) { return ProcessInfo{}, nil } func (h *blockingToolHost) CloseProcess(string, string) error { return nil } func (h *blockingToolHost) RenameProcess(string, string, string) error { return nil } func (h *blockingToolHost) SelectProcess(string, string) error { return nil } func (h *blockingToolHost) ListProcesses(string, string) []ProcessInfo { return nil } func (h *blockingToolHost) GetProcessStatus(string, string) (ProcessStatus, error) { return ProcessStatus{ProcessInfo: ProcessInfo{ID: "p_fast", Status: "running"}}, nil } func (h *blockingToolHost) GetProjectStatus(string, bool) (ProjectStatus, error) { return ProjectStatus{}, nil } func (h *blockingToolHost) GetProcessOutput(string, ProcessOutputArgs) (ProcessOutput, error) { return ProcessOutput{}, nil } func (h *blockingToolHost) GetProcessRawOutput(string, RawOutputArgs) (RawOutput, error) { return RawOutput{}, nil } func (h *blockingToolHost) SearchOutput(string, SearchOutputArgs) (SearchResult, error) { return SearchResult{}, nil } func (h *blockingToolHost) WaitForPattern(string, string, string, float64, string) (bool, string, error) { h.waitOnce.Do(func() { close(h.waitEntered) }) <-h.waitRelease return true, "matched", nil } func (h *blockingToolHost) GetProcessPorts(string, string) ([]PortSighting, error) { return nil, nil } func (h *blockingToolHost) SendInput(string, SendInputArgs) (SendInputResult, error) { return SendInputResult{}, nil } func (h *blockingToolHost) SendMessage(string, string, string) error { return nil } func (h *blockingToolHost) RequestHumanAttention(string, string, string) error { return nil } func (h *blockingToolHost) TimerWait(string, float64, string) (string, error) { return "", nil } func (h *blockingToolHost) TimerSet(string, TimerSetArgs) (TimerHandle, error) { return TimerHandle{}, nil } func (h *blockingToolHost) TimerFireWhenIdleAny(string, TimerFireWhenIdleArgs) (TimerFireWhenIdleResponse, error) { return TimerFireWhenIdleResponse{}, nil } func (h *blockingToolHost) TimerFireWhenIdleAll(string, TimerFireWhenIdleArgs) (TimerFireWhenIdleResponse, error) { return TimerFireWhenIdleResponse{}, nil } func (h *blockingToolHost) TimerCancel(string, string) error { return nil } func (h *blockingToolHost) TimerPause(string, string) error { return nil } func (h *blockingToolHost) TimerResume(string, string) error { return nil } func (h *blockingToolHost) TimerList(string) ([]TimerInfo, error) { return nil, nil } func (h *blockingToolHost) ScratchpadList() ([]scratchpad.Entry, error) { return nil, nil } func (h *blockingToolHost) ScratchpadRead(ScratchpadReadArgs) (ScratchpadReadResult, error) { return ScratchpadReadResult{}, nil } func (h *blockingToolHost) ScratchpadWrite(string, string, string) (string, error) { return "", nil } func (h *blockingToolHost) ScratchpadAppend(string, string) error { return nil } func (h *blockingToolHost) ScratchpadDelete(string) error { return nil } func (h *blockingToolHost) WhoAmI(string, bool) WhoAmI { return WhoAmI{} } func (h *blockingToolHost) Help(string, string) HelpResponse { return HelpResponse{} }