Unified Agent + reusable Go agent core.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: ignore closed pipe reads in bash streaming

Lyric 35f47502 2db1c730

+78
+14
tools/builtin/bash.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 + "errors" 7 8 "fmt" 8 9 "io" 9 10 "os" ··· 264 265 if err == io.EOF { 265 266 return nil 266 267 } 268 + if isBenignCommandStreamReadError(err) { 269 + return nil 270 + } 267 271 select { 268 272 case <-ctx.Done(): 269 273 return nil ··· 283 287 out.WriteString("\n\nstderr:\n") 284 288 out.WriteString(payload.Stderr) 285 289 return out.String() 290 + } 291 + 292 + func isBenignCommandStreamReadError(err error) bool { 293 + if err == nil { 294 + return false 295 + } 296 + if errors.Is(err, os.ErrClosed) { 297 + return true 298 + } 299 + return strings.Contains(strings.ToLower(err.Error()), "file already closed") 286 300 } 287 301 288 302 func buildBashSubtaskResult(taskID string, payload bashExecutionPayload, execErr error) *agent.SubtaskResult {
+64
tools/builtin/bash_test.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "errors" 7 + "fmt" 8 + "io" 6 9 "os" 7 10 "path/filepath" 8 11 "strings" ··· 28 31 events []agent.Event 29 32 } 30 33 34 + type closeAfterPayloadReader struct { 35 + payload []byte 36 + read bool 37 + } 38 + 31 39 func (s *recordingEventSink) HandleEvent(_ context.Context, event agent.Event) { 32 40 s.mu.Lock() 33 41 defer s.mu.Unlock() ··· 40 48 out := make([]agent.Event, len(s.events)) 41 49 copy(out, s.events) 42 50 return out 51 + } 52 + 53 + func (r *closeAfterPayloadReader) Read(p []byte) (int, error) { 54 + if r.read { 55 + return 0, io.EOF 56 + } 57 + r.read = true 58 + n := copy(p, r.payload) 59 + return n, os.ErrClosed 43 60 } 44 61 45 62 func TestContainsTokenBoundary(t *testing.T) { ··· 252 269 } 253 270 if !stderrSeen { 254 271 t.Fatalf("stderr stream event missing, events=%#v", events) 272 + } 273 + } 274 + 275 + func TestBashTool_CaptureCommandStream_IgnoresClosedPipeRead(t *testing.T) { 276 + tool := NewBashTool(true, 5*time.Second, 4096) 277 + sink := &recordingEventSink{} 278 + ctx := agent.WithEventSinkContext(context.Background(), sink) 279 + dst := &limitedBuffer{Limit: 4096} 280 + 281 + err := tool.captureCommandStream(ctx, "stdout", &closeAfterPayloadReader{ 282 + payload: []byte("alpha\n"), 283 + }, dst) 284 + if err != nil { 285 + t.Fatalf("captureCommandStream() error = %v", err) 286 + } 287 + if got := string(dst.Bytes()); got != "alpha\n" { 288 + t.Fatalf("captured output = %q, want %q", got, "alpha\n") 289 + } 290 + 291 + events := sink.snapshot() 292 + if len(events) != 1 { 293 + t.Fatalf("event count = %d, want 1", len(events)) 294 + } 295 + if events[0].Stream != "stdout" || !strings.Contains(events[0].Text, "alpha") { 296 + t.Fatalf("unexpected event: %#v", events[0]) 297 + } 298 + } 299 + 300 + func TestIsBenignCommandStreamReadError(t *testing.T) { 301 + cases := []struct { 302 + name string 303 + err error 304 + want bool 305 + }{ 306 + {name: "nil", err: nil, want: false}, 307 + {name: "os err closed", err: os.ErrClosed, want: true}, 308 + {name: "wrapped os err closed", err: fmt.Errorf("wrapped: %w", os.ErrClosed), want: true}, 309 + {name: "message only", err: errors.New("read |0: file already closed"), want: true}, 310 + {name: "other", err: errors.New("boom"), want: false}, 311 + } 312 + 313 + for _, tc := range cases { 314 + t.Run(tc.name, func(t *testing.T) { 315 + if got := isBenignCommandStreamReadError(tc.err); got != tc.want { 316 + t.Fatalf("isBenignCommandStreamReadError(%v) = %v, want %v", tc.err, got, tc.want) 317 + } 318 + }) 255 319 } 256 320 } 257 321