personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add day+segment fields to observe events and log to events.jsonl

Adds day and segment fields consistently to all segment-related observe
events (detected, described, transcribed). Sense now creates segment
directory before emitting detected event. Supervisor logs any observe
event with day+segment to <day>/<segment>/events.jsonl if dir exists.

- sense.py: Create segment dir, add day/segment/remote to detected
- describe.py: Add day/segment/remote to described event
- transcribe.py: Add day/segment/remote to transcribed event
- supervisor.py: Add segment event logger using dir existence as guard
- CALLOSUM.md: Document new fields and event logging
- AGENTS.md: Add robustness principle about minimizing sync assumptions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+89 -23
+1
AGENTS.md
··· 208 208 * **DRY, KISS, YAGNI**: Extract common logic, prefer simple solutions, don't over-engineer 209 209 * **Single Responsibility**: Functions/classes do one thing well 210 210 * **Conciseness & Maintainability**: Clear code over clever code 211 + * **Robustness**: Minimize assumptions that must be kept in sync across the codebase, avoid fragility and increasing maintenance burden. 211 212 * **Security**: Never expose secrets, validate/sanitize all inputs 212 213 * **Performance**: Profile before optimizing 213 214 * **Git**: Small focused commits, descriptive branch names. Run git commands directly (not `git -C`) since you're already in the repo.
+3 -2
docs/CALLOSUM.md
··· 62 62 - From `sense.py`: `describe`, `transcribe` - Processing pipeline state (with `running`/`queued` sub-fields) 63 63 - `observing`: `day`, `segment`, `files` - Recording window boundary crossed with saved files 64 64 - Remote events include `remote` (remote name) from `apps/remote/routes.py` 65 - - `detected`: `file`, `handler`, `ref` - File detected and handler spawned 66 - - `described`/`transcribed`: `input`, `output`, `duration_ms` - Processing complete 65 + - `detected`: `day`, `segment`, `file`, `handler`, `ref`, `remote` - File detected and handler spawned 66 + - `described`/`transcribed`: `day`, `segment`, `input`, `output`, `duration_ms`, `remote` - Processing complete 67 67 - `observed`: `day`, `segment`, `duration` - All files for segment fully processed 68 68 - Batch mode (--day) events include `batch=true` to indicate non-live origin 69 69 - Remote events include `remote` (remote name) ··· 72 72 **Health Model:** Fail-fast - observers exit if capture stalls (e.g., files not growing). Supervisor checks event freshness only. 73 73 **Path Format:** Relative to `JOURNAL_PATH` (e.g., `20251102/163045_300_center_DP-3_screen.webm` for multi-monitor recordings) 74 74 **Correlation:** `detected.ref` matches `logs.exec.ref` for the same handler process; `observed.segment` groups all files from same capture window 75 + **Event Log:** Any observe event with `day` + `segment` fields is logged to `<day>/<segment>/events.jsonl` by supervisor (if directory exists) 75 76 76 77 ### `importer` - Media import and transcription processing 77 78 **Source:** `think/importer.py`
+16 -7
observe/describe.py
··· 814 814 815 815 duration_ms = int((time.time() - start_time) * 1000) 816 816 817 - callosum_send( 818 - "observe", 819 - "described", 820 - input=str(rel_input), 821 - output=str(rel_output), 822 - duration_ms=duration_ms, 823 - ) 817 + # Extract day from video path (video_path.parent is day dir) 818 + day = video_path.parent.name 819 + 820 + event_fields = { 821 + "input": str(rel_input), 822 + "output": str(rel_output), 823 + "duration_ms": duration_ms, 824 + } 825 + if day: 826 + event_fields["day"] = day 827 + if segment: 828 + event_fields["segment"] = segment 829 + remote = os.getenv("REMOTE_NAME") 830 + if remote: 831 + event_fields["remote"] = remote 832 + callosum_send("observe", "described", **event_fields) 824 833 except Exception as e: 825 834 logger.error(f"Failed to process {video_path}: {e}", exc_info=True) 826 835 raise
+18 -7
observe/sense.py
··· 185 185 # Generate correlation ID for this handler run 186 186 ref = str(int(time.time() * 1000)) 187 187 188 + # Create segment directory before emitting detected event 189 + # This ensures the directory exists for event logging 190 + if day and segment: 191 + segment_dir = self.journal_dir / day / segment 192 + segment_dir.mkdir(exist_ok=True) 193 + 188 194 # Emit detected event with file and ref 189 195 if self.callosum: 190 196 try: ··· 192 198 except ValueError: 193 199 rel_file = file_path 194 200 195 - self.callosum.emit( 196 - "observe", 197 - "detected", 198 - file=str(rel_file), 199 - handler=handler_name, 200 - ref=ref, 201 - ) 201 + event_fields = { 202 + "file": str(rel_file), 203 + "handler": handler_name, 204 + "ref": ref, 205 + } 206 + if day: 207 + event_fields["day"] = day 208 + if segment: 209 + event_fields["segment"] = segment 210 + if remote: 211 + event_fields["remote"] = remote 212 + self.callosum.emit("observe", "detected", **event_fields) 202 213 203 214 # Replace {file} placeholder with actual file path 204 215 cmd = [str(file_path) if arg == "{file}" else arg for arg in command]
+16 -7
observe/transcribe.py
··· 567 567 rel_input = final_path 568 568 rel_output = json_path 569 569 570 - callosum_send( 571 - "observe", 572 - "transcribed", 573 - input=str(rel_input), 574 - output=str(rel_output), 575 - duration_ms=duration_ms, 576 - ) 570 + # Extract day from audio path (raw_path.parent is day dir) 571 + day = raw_path.parent.name 572 + 573 + event_fields = { 574 + "input": str(rel_input), 575 + "output": str(rel_output), 576 + "duration_ms": duration_ms, 577 + } 578 + if day: 579 + event_fields["day"] = day 580 + if segment: 581 + event_fields["segment"] = segment 582 + remote = os.getenv("REMOTE_NAME") 583 + if remote: 584 + event_fields["remote"] = remote 585 + callosum_send("observe", "transcribed", **event_fields) 577 586 578 587 579 588 def main():
+35
think/supervisor.py
··· 5 5 6 6 import argparse 7 7 import asyncio 8 + import json 8 9 import logging 9 10 import os 10 11 import signal ··· 1047 1048 _observe_status_state["ever_received"] = True 1048 1049 1049 1050 1051 + def _handle_segment_event_log(message: dict) -> None: 1052 + """Log observe events with day+segment to segment/events.jsonl. 1053 + 1054 + Any observe tract message with both day and segment fields gets logged 1055 + to JOURNAL_PATH/day/segment/events.jsonl if that directory exists. 1056 + """ 1057 + if message.get("tract") != "observe": 1058 + return 1059 + 1060 + day = message.get("day") 1061 + segment = message.get("segment") 1062 + 1063 + if not day or not segment: 1064 + return 1065 + 1066 + try: 1067 + journal_path = _get_journal_path() 1068 + segment_dir = journal_path / day / segment 1069 + 1070 + # Only log if segment directory exists 1071 + if not segment_dir.is_dir(): 1072 + return 1073 + 1074 + events_file = segment_dir / "events.jsonl" 1075 + 1076 + # Append event as JSON line 1077 + with open(events_file, "a", encoding="utf-8") as f: 1078 + f.write(json.dumps(message, ensure_ascii=False) + "\n") 1079 + 1080 + except Exception as e: 1081 + logging.debug(f"Failed to log segment event: {e}") 1082 + 1083 + 1050 1084 def _handle_callosum_message(message: dict) -> None: 1051 1085 """Dispatch incoming Callosum messages to appropriate handlers.""" 1052 1086 _handle_task_request(message) 1053 1087 _handle_supervisor_request(message) 1054 1088 _handle_segment_observed(message) 1055 1089 _handle_observe_status(message) 1090 + _handle_segment_event_log(message) 1056 1091 1057 1092 1058 1093 async def supervise(