personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Fix activity record persistence, created_at type, and segment accumulation

Three bugs silently broke the activity agent pipeline after the segment-sense
rewrite (Apr 3):

1. Persistence gap: completed activity records were never written to disk
because dream.py never called append_activity_record(). Added persistence
in both the main path (before run_activity_prompts) and idle path. Only
records with a matching ended change in the current update are persisted,
avoiding orphaned writes from the cumulative completed list.

2. created_at type: _make_completed_record() emitted created_at as an ISO
string, but routes.py and all consumers expect integer milliseconds.
Changed to int(time.time() * 1000).

3. Segment accumulation: completed records only included the first segment.
Added _segments tracking throughout the activity lifecycle so completed
records include every segment the activity spanned.

+143 -5
+44
tests/test_activity_state_machine.py
··· 198 198 assert "description" in rec 199 199 assert "active_entities" in rec 200 200 assert "created_at" in rec 201 + assert isinstance(rec["created_at"], int) 202 + 203 + 204 + class TestSegmentAccumulation: 205 + def test_continuing_accumulates_segments(self): 206 + from think.activity_state_machine import ActivityStateMachine 207 + 208 + sm = ActivityStateMachine() 209 + sm.update(_sense(content_type="coding"), "090000_300", "20260304") 210 + sm.update(_sense(content_type="coding"), "090500_300", "20260304") 211 + sm.update(_sense(content_type="coding"), "091000_300", "20260304") 212 + # End by type change 213 + sm.update(_sense(content_type="meeting"), "091500_300", "20260304") 214 + 215 + completed = sm.get_completed_activities() 216 + assert len(completed) == 1 217 + rec = completed[0] 218 + assert rec["segments"] == ["090000_300", "090500_300", "091000_300"] 219 + 220 + def test_ten_segments_produces_ten_keys(self): 221 + from think.activity_state_machine import ActivityStateMachine 222 + 223 + sm = ActivityStateMachine() 224 + for i in range(10): 225 + minutes = i * 5 226 + seg = f"09{minutes:02d}00_300" 227 + sm.update(_sense(content_type="coding"), seg, "20260304") 228 + # End with idle 229 + sm.update(_sense(density="idle"), "095000_300", "20260304") 230 + 231 + completed = sm.get_completed_activities() 232 + assert len(completed) == 1 233 + assert len(completed[0]["segments"]) == 10 234 + 235 + def test_segments_not_in_current_state(self): 236 + from think.activity_state_machine import ActivityStateMachine 237 + 238 + sm = ActivityStateMachine() 239 + sm.update(_sense(content_type="coding"), "090000_300", "20260304") 240 + sm.update(_sense(content_type="coding"), "090500_300", "20260304") 241 + 242 + state = sm.get_current_state() 243 + assert len(state) == 1 244 + assert "_segments" not in state[0] 201 245 202 246 203 247 class TestPseudoFacet:
+67
tests/test_dream_activity.py
··· 378 378 assert "group_started" in events 379 379 assert "agent_started" in events 380 380 assert "agent_completed" in events 381 + 381 382 assert "group_completed" in events 382 383 assert "completed" in events 383 384 ··· 386 387 assert started_kw["mode"] == "activity" 387 388 assert started_kw["activity"] == "coding_100000_300" 388 389 assert started_kw["facet"] == "work" 390 + 391 + 392 + class TestActivityPersistence: 393 + """Verify state machine completed records persist and load correctly.""" 394 + 395 + def test_completed_record_persisted_and_found(self, monkeypatch): 396 + import tempfile 397 + 398 + from think.activities import append_activity_record, load_activity_records 399 + from think.activity_state_machine import ActivityStateMachine 400 + 401 + with tempfile.TemporaryDirectory() as tmpdir: 402 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", tmpdir) 403 + 404 + sm = ActivityStateMachine() 405 + sm.update( 406 + { 407 + "density": "active", 408 + "content_type": "coding", 409 + "activity_summary": "Writing tests", 410 + "entities": [], 411 + "facets": [ 412 + {"facet": "work", "activity": "coding", "level": "high"} 413 + ], 414 + "meeting_detected": False, 415 + "speakers": [], 416 + "recommend": {}, 417 + }, 418 + "090000_300", 419 + "20260304", 420 + ) 421 + changes = sm.update( 422 + { 423 + "density": "active", 424 + "content_type": "meeting", 425 + "activity_summary": "Stand-up", 426 + "entities": [], 427 + "facets": [ 428 + {"facet": "work", "activity": "meeting", "level": "medium"} 429 + ], 430 + "meeting_detected": True, 431 + "speakers": [], 432 + "recommend": {}, 433 + }, 434 + "090500_300", 435 + "20260304", 436 + ) 437 + 438 + # Find the ended change 439 + ended = [c for c in changes if c.get("state") == "ended"] 440 + assert len(ended) == 1 441 + facet = ended[0]["_facet"] 442 + 443 + # Persist completed record (what dream.py now does) 444 + completed = sm.get_completed_activities() 445 + assert len(completed) == 1 446 + rec = completed[0] 447 + assert isinstance(rec["created_at"], int) 448 + append_activity_record(facet, "20260304", rec) 449 + 450 + # Verify load finds it (what run_activity_prompts does) 451 + records = load_activity_records(facet, "20260304") 452 + assert len(records) == 1 453 + assert records[0]["id"] == rec["id"] 454 + assert records[0]["activity"] == "coding" 455 + assert isinstance(records[0]["created_at"], int) 389 456 390 457 391 458 # ---------------------------------------------------------------------------
+8 -3
think/activity_state_machine.py
··· 3 3 4 4 """Deterministic activity state machine replacing LLM-based activity tracking.""" 5 5 6 - from datetime import datetime, timezone 6 + import time 7 7 8 8 from think.activities import LEVEL_VALUES, make_activity_id 9 9 from think.utils import segment_parse ··· 75 75 return { 76 76 "id": entry["id"], 77 77 "activity": entry["activity"], 78 - "segments": [entry["since"]], 78 + "segments": entry.get("_segments", [entry["since"]]), 79 79 "level_avg": LEVEL_VALUES.get(entry.get("level", "medium"), 0.5), 80 80 "description": entry["description"], 81 81 "active_entities": entry.get("active_entities", []), 82 - "created_at": datetime.now(tz=timezone.utc).isoformat(), 82 + "created_at": int(time.time() * 1000), 83 83 } 84 84 85 85 def update( ··· 166 166 "_change": "new", 167 167 "_facet": facet, 168 168 "_segment": segment_key, 169 + "_segments": [segment_key], 169 170 } 170 171 self.state[facet] = new_entry 171 172 changes.append(dict(new_entry)) ··· 175 176 prior["active_entities"] = entity_names 176 177 prior["_change"] = "continuing" 177 178 prior["_segment"] = segment_key 179 + prior.setdefault("_segments", [prior["since"]]) 180 + if segment_key not in prior["_segments"]: 181 + prior["_segments"].append(segment_key) 178 182 changes.append(dict(prior)) 179 183 else: 180 184 new_entry = { ··· 188 192 "_change": "new", 189 193 "_facet": facet, 190 194 "_segment": segment_key, 195 + "_segments": [segment_key], 191 196 } 192 197 self.state[facet] = new_entry 193 198 changes.append(dict(new_entry))
+24 -2
think/dream.py
··· 20 20 from datetime import date, datetime, timedelta 21 21 from pathlib import Path 22 22 23 - from think.activities import get_activity_output_path, load_activity_records 23 + from think.activities import ( 24 + append_activity_record, 25 + get_activity_output_path, 26 + load_activity_records, 27 + ) 24 28 from think.activity_state_machine import ActivityStateMachine 25 29 from think.callosum import CallosumConnection 26 30 from think.cluster import cluster_segments ··· 537 541 write_idle_stubs(seg_dir) 538 542 logging.info("Segment %s is idle, skipping remaining agents", segment) 539 543 if state_machine is not None: 540 - state_machine.update(sense_json, segment, day) 544 + idle_changes = state_machine.update(sense_json, segment, day) 545 + # Persist completed activity records from idle transitions 546 + facet_by_id = { 547 + c["id"]: c.get("_facet", "__") 548 + for c in idle_changes 549 + if c.get("state") == "ended" 550 + } 551 + for rec in state_machine.get_completed_activities(): 552 + if rec["id"] in facet_by_id: 553 + append_activity_record(facet_by_id[rec["id"]], day, rec) 541 554 # Persist activity state even on idle segments 542 555 try: 543 556 awareness_dir = Path(get_journal()) / "awareness" ··· 645 658 646 659 if state_machine is not None: 647 660 changes = state_machine.update(sense_json, segment, day) 661 + # Persist completed activity records before running activity agents 662 + facet_by_id = { 663 + c["id"]: c.get("_facet", "__") 664 + for c in changes 665 + if c.get("state") == "ended" 666 + } 667 + for rec in state_machine.get_completed_activities(): 668 + if rec["id"] in facet_by_id: 669 + append_activity_record(facet_by_id[rec["id"]], day, rec) 648 670 # Persist activity state for awareness.md consumption 649 671 try: 650 672 awareness_dir = Path(get_journal()) / "awareness"