personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cortex: tolerate BrokenPipe during talent event emit

Keep writing JSONL sidecar events after stdout closes by marking the pipe dead on BrokenPipe/EPIPE and skipping subsequent stdout writes. This lets cortex-side teardown or stream closure avoid masking the talent's own cleanup path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+51 -2
+41
tests/test_talents_emit.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + 6 + from think.talents import JSONEventWriter 7 + 8 + 9 + class _BrokenPipeStdout: 10 + def __init__(self) -> None: 11 + self.writes: list[str] = [] 12 + 13 + def write(self, text: str) -> int: 14 + self.writes.append(text) 15 + return len(text) 16 + 17 + def flush(self) -> None: 18 + raise BrokenPipeError() 19 + 20 + 21 + def test_json_event_writer_emit_broken_pipe_keeps_sidecar(monkeypatch, tmp_path): 22 + stdout = _BrokenPipeStdout() 23 + monkeypatch.setattr("sys.stdout", stdout) 24 + sidecar = tmp_path / "events.jsonl" 25 + writer = JSONEventWriter(str(sidecar)) 26 + 27 + try: 28 + writer.emit({"event": "error", "error": "pipe closed"}) 29 + assert writer._pipe_dead is True 30 + first_write_count = len(stdout.writes) 31 + 32 + writer.emit({"event": "finish", "result": "sidecar only"}) 33 + assert len(stdout.writes) == first_write_count 34 + finally: 35 + writer.close() 36 + 37 + rows = [json.loads(line) for line in sidecar.read_text().splitlines()] 38 + assert rows == [ 39 + {"event": "error", "error": "pipe closed"}, 40 + {"event": "finish", "result": "sidecar only"}, 41 + ]
+10 -2
think/talents.py
··· 15 15 16 16 import argparse 17 17 import asyncio 18 + import errno 18 19 import json 19 20 import logging 20 21 import os ··· 72 73 def __init__(self, path: Optional[str] = None) -> None: 73 74 self.path = path 74 75 self.file = None 76 + self._pipe_dead = False 75 77 if path: 76 78 try: 77 79 Path(path).parent.mkdir(parents=True, exist_ok=True) ··· 81 83 82 84 def emit(self, data: Event) -> None: 83 85 line = json.dumps(data, ensure_ascii=False) 84 - print(line) 85 - sys.stdout.flush() # Ensure immediate output for cortex 86 + if not self._pipe_dead: 87 + try: 88 + print(line) 89 + sys.stdout.flush() # Ensure immediate output for cortex 90 + except (BrokenPipeError, OSError) as exc: 91 + if not isinstance(exc, BrokenPipeError) and exc.errno != errno.EPIPE: 92 + raise 93 + self._pipe_dead = True 86 94 if self.file: 87 95 try: 88 96 self.file.write(line + "\n")