personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(activities): durable state machine across segment subprocesses

Realizer has produced zero records since 2026-04-04 because
ActivityStateMachine was constructed fresh per `sol think --segment`
subprocess, so self.state was always empty and the four ended-change
paths (gap-reset, idle, facet-gone, type-change) never fired.

Make ActivityStateMachine durable across subprocess boundaries: hydrate
self.state, last_segment_key, and last_segment_day from
<journal>/awareness/activity_state.json at __init__ when journal_root is
provided, and rewrite the snapshot in-place after each finalization.
Live single-segment construction passes journal_root=Path(get_journal());
batch (--segments) construction stays in-memory and does not touch the
awareness file.

Also: rename state-entry keys (_facet → facet, _segment → segment,
_segments → segments) consistently across stored state and the runtime
changes stream; _change stays runtime-only. Capture routing_day =
state_machine.last_segment_day or day before update() at both
finalization blocks so cross-midnight ended records land in the prior
day's <facet>/activities/<D>.jsonl. Tolerate the production legacy
flat-list shape on first hydration (one-shot promotion).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+456 -97
+16 -15
tests/test_activity_state_machine.py
··· 40 40 assert changes[0]["activity"] == "coding" 41 41 assert changes[0]["state"] == "active" 42 42 assert changes[0]["since"] == "090000_300" 43 - assert changes[0]["_facet"] == "work" 43 + assert changes[0]["facet"] == "work" 44 44 45 45 46 46 class TestContinuation: ··· 87 87 ended = [c for c in changes if c["state"] == "ended"] 88 88 assert len(ended) == 1 89 89 assert ended[0]["_change"] == "ended_idle" 90 - assert sm.get_current_state() == [] 90 + assert sm.state == {} 91 91 92 92 93 93 class TestTimeGap: ··· 137 137 changes = sm.update(_sense(facets=facets), "090000_300", "20260304") 138 138 139 139 assert len(changes) == 2 140 - facet_names = {c["_facet"] for c in changes} 140 + facet_names = {c["facet"] for c in changes} 141 141 assert facet_names == {"work", "personal"} 142 142 143 143 ··· 156 156 157 157 ended = [c for c in changes if c["_change"] == "ended_facet_gone"] 158 158 assert len(ended) == 1 159 - assert ended[0]["_facet"] == "personal" 159 + assert ended[0]["facet"] == "personal" 160 160 161 161 162 - class TestGetCurrentState: 163 - def test_returns_clean_entries(self): 162 + class TestStateShape: 163 + def test_active_entries_use_persisted_field_names(self): 164 164 from think.activity_state_machine import ActivityStateMachine 165 165 166 166 sm = ActivityStateMachine() 167 167 sm.update(_sense(), "090000_300", "20260304") 168 - state = sm.get_current_state() 169 168 170 - assert len(state) == 1 171 - entry = state[0] 169 + assert set(sm.state) == {"work"} 170 + entry = sm.state["work"] 172 171 assert "id" in entry 173 172 assert "activity" in entry 174 173 assert "state" in entry and entry["state"] == "active" 175 174 assert "since" in entry 176 175 assert "level" in entry 177 176 assert "active_entities" in entry 178 - assert "_change" not in entry 177 + assert entry["facet"] == "work" 178 + assert entry["segment"] == "090000_300" 179 + assert entry["segments"] == ["090000_300"] 180 + assert entry["_change"] == "new" 179 181 assert "_facet" not in entry 180 182 assert "_segment" not in entry 183 + assert "_segments" not in entry 181 184 182 185 183 186 class TestGetCompletedActivities: ··· 232 235 assert len(completed) == 1 233 236 assert len(completed[0]["segments"]) == 10 234 237 235 - def test_segments_not_in_current_state(self): 238 + def test_segments_accumulate_in_state(self): 236 239 from think.activity_state_machine import ActivityStateMachine 237 240 238 241 sm = ActivityStateMachine() 239 242 sm.update(_sense(content_type="coding"), "090000_300", "20260304") 240 243 sm.update(_sense(content_type="coding"), "090500_300", "20260304") 241 244 242 - state = sm.get_current_state() 243 - assert len(state) == 1 244 - assert "_segments" not in state[0] 245 + assert sm.state["work"]["segments"] == ["090000_300", "090500_300"] 245 246 246 247 247 248 class TestPseudoFacet: ··· 252 253 changes = sm.update(_sense(facets=[]), "090000_300", "20260304") 253 254 254 255 assert len(changes) == 1 255 - assert changes[0]["_facet"] == "__" 256 + assert changes[0]["facet"] == "__" 256 257 257 258 258 259 class TestEntityTracking:
+276
tests/test_activity_state_machine_durability.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Durability tests for ActivityStateMachine snapshots.""" 5 + 6 + import json 7 + from pathlib import Path 8 + 9 + from think.activities import ( 10 + append_activity_record, 11 + load_activity_records, 12 + make_activity_id, 13 + ) 14 + from think.activity_state_machine import ActivityStateMachine 15 + from think.thinking import _write_json_atomic 16 + 17 + 18 + def _sense(content_type: str = "coding", density: str = "active", facet: str = "test"): 19 + return { 20 + "density": density, 21 + "content_type": content_type, 22 + "activity_summary": f"{content_type} work", 23 + "entities": [], 24 + "facets": [{"facet": facet, "activity": content_type, "level": "high"}], 25 + "meeting_detected": content_type == "meeting", 26 + "speakers": [], 27 + "recommend": {}, 28 + } 29 + 30 + 31 + def _persist_snapshot(journal_root: Path, state_machine: ActivityStateMachine) -> None: 32 + snapshot = { 33 + "last_segment_key": state_machine.last_segment_key, 34 + "last_segment_day": state_machine.last_segment_day, 35 + "active": { 36 + facet: {k: v for k, v in entry.items() if k != "_change"} 37 + for facet, entry in state_machine.state.items() 38 + }, 39 + } 40 + _write_json_atomic(journal_root / "awareness" / "activity_state.json", snapshot) 41 + 42 + 43 + def _append_ended_records( 44 + state_machine: ActivityStateMachine, changes: list[dict], day: str 45 + ) -> None: 46 + completed_lookup = {} 47 + for record in state_machine.get_completed_activities(): 48 + completed_lookup.setdefault(record["id"], record) 49 + for change in changes: 50 + if change.get("state") != "ended": 51 + continue 52 + record = completed_lookup.get(change["id"]) 53 + if record: 54 + append_activity_record(change.get("facet", "__"), day, record) 55 + 56 + 57 + def test_state_survives_subprocess_boundary(tmp_path: Path, monkeypatch): 58 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 59 + 60 + sm1 = ActivityStateMachine(journal_root=tmp_path) 61 + sm1.update(_sense(), "090000_300", "20260427") 62 + assert set(sm1.state) == {"test"} 63 + _persist_snapshot(tmp_path, sm1) 64 + 65 + sm2 = ActivityStateMachine(journal_root=tmp_path) 66 + assert sm2.last_segment_key == "090000_300" 67 + assert sm2.last_segment_day == "20260427" 68 + assert sm2.state["test"]["segments"] == ["090000_300"] 69 + 70 + changes = sm2.update(_sense(density="idle"), "090500_300", "20260427") 71 + ended = [change for change in changes if change.get("state") == "ended"] 72 + assert len(ended) == 1 73 + _append_ended_records(sm2, changes, "20260427") 74 + _persist_snapshot(tmp_path, sm2) 75 + 76 + records = load_activity_records("test", "20260427") 77 + assert len(records) == 1 78 + assert records[0]["id"] == make_activity_id("coding", "090000_300") 79 + assert records[0]["segments"] == ["090000_300"] 80 + 81 + sm3 = ActivityStateMachine(journal_root=tmp_path) 82 + assert sm3.state == {} 83 + assert sm3.last_segment_key == "090500_300" 84 + assert sm3.last_segment_day == "20260427" 85 + 86 + 87 + def test_day_boundary_routes_ended_record_to_prior_day(tmp_path: Path, monkeypatch): 88 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 89 + 90 + sm1 = ActivityStateMachine(journal_root=tmp_path) 91 + sm1.update(_sense(), "233000_300", "20260304") 92 + _persist_snapshot(tmp_path, sm1) 93 + 94 + sm2 = ActivityStateMachine(journal_root=tmp_path) 95 + routing_day = sm2.last_segment_day or "20260305" 96 + changes = sm2.update(_sense(), "001500_300", "20260305") 97 + 98 + ended = [change for change in changes if change.get("state") == "ended"] 99 + active = [change for change in changes if change.get("state") == "active"] 100 + assert len(ended) == 1 101 + assert len(active) == 1 102 + _append_ended_records(sm2, changes, routing_day) 103 + _persist_snapshot(tmp_path, sm2) 104 + 105 + prior_day_records = load_activity_records("test", "20260304") 106 + current_day_records = load_activity_records("test", "20260305") 107 + assert len(prior_day_records) == 1 108 + assert current_day_records == [] 109 + assert prior_day_records[0]["segments"] == ["233000_300"] 110 + assert sm2.state["test"]["since"] == "001500_300" 111 + assert sm2.last_segment_day == "20260305" 112 + 113 + 114 + def test_legacy_flat_list_format_promotes(tmp_path: Path, monkeypatch): 115 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 116 + legacy_state = [ 117 + { 118 + "id": make_activity_id("coding", "090000_300"), 119 + "activity": "coding", 120 + "state": "active", 121 + "since": "090000_300", 122 + "description": "legacy coding", 123 + "level": "medium", 124 + "active_entities": [], 125 + } 126 + ] 127 + _write_json_atomic(tmp_path / "awareness" / "activity_state.json", legacy_state) 128 + 129 + sm = ActivityStateMachine(journal_root=tmp_path) 130 + assert set(sm.state) == {"__"} 131 + assert sm.last_segment_key is None 132 + assert sm.last_segment_day is None 133 + assert sm.state["__"]["facet"] == "__" 134 + assert sm.state["__"]["segments"] == ["090000_300"] 135 + 136 + changes = sm.update(_sense(content_type="meeting"), "090500_300", "20260427") 137 + ended = [change for change in changes if change.get("state") == "ended"] 138 + assert len(ended) == 1 139 + _append_ended_records(sm, changes, "20260427") 140 + 141 + records = load_activity_records("__", "20260427") 142 + assert len(records) == 1 143 + assert records[0]["segments"] == ["090000_300"] 144 + assert sm.state["test"]["since"] == "090500_300" 145 + 146 + 147 + def test_three_active_segments_then_idle_writes_one_record(tmp_path: Path, monkeypatch): 148 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 149 + day = "20260427" 150 + 151 + for segment in ("090000_300", "090505_300", "091010_300"): 152 + state_machine = ActivityStateMachine(journal_root=tmp_path) 153 + state_machine.update(_sense(facet="work"), segment, day) 154 + _persist_snapshot(tmp_path, state_machine) 155 + 156 + state_machine = ActivityStateMachine(journal_root=tmp_path) 157 + routing_day = state_machine.last_segment_day or day 158 + changes = state_machine.update( 159 + _sense(density="idle", facet="work"), "091515_300", day 160 + ) 161 + ended = [change for change in changes if change.get("state") == "ended"] 162 + assert len(ended) == 1 163 + _append_ended_records(state_machine, changes, routing_day) 164 + _persist_snapshot(tmp_path, state_machine) 165 + 166 + records = load_activity_records("work", day) 167 + assert len(records) == 1 168 + assert records[0]["segments"] == ["090000_300", "090505_300", "091010_300"] 169 + assert len(records[0]["segments"]) == 3 170 + assert state_machine.state == {} 171 + 172 + 173 + def test_crash_between_append_and_snapshot_is_idempotent(tmp_path: Path, monkeypatch): 174 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 175 + day = "20260427" 176 + 177 + sm1 = ActivityStateMachine(journal_root=tmp_path) 178 + sm1.update(_sense(facet="work"), "090000_300", day) 179 + _persist_snapshot(tmp_path, sm1) 180 + 181 + sm2 = ActivityStateMachine(journal_root=tmp_path) 182 + routing_day = sm2.last_segment_day or day 183 + changes = sm2.update(_sense(density="idle", facet="work"), "090505_300", day) 184 + _append_ended_records(sm2, changes, routing_day) 185 + assert len(load_activity_records("work", day)) == 1 186 + 187 + sm3 = ActivityStateMachine(journal_root=tmp_path) 188 + routing_day = sm3.last_segment_day or day 189 + retry_changes = sm3.update(_sense(density="idle", facet="work"), "090505_300", day) 190 + _append_ended_records(sm3, retry_changes, routing_day) 191 + records = load_activity_records("work", day) 192 + assert len(records) == 1 193 + assert records[0]["id"] == make_activity_id("coding", "090000_300") 194 + 195 + _persist_snapshot(tmp_path, sm3) 196 + sm4 = ActivityStateMachine(journal_root=tmp_path) 197 + assert sm4.state == {} 198 + assert sm4.last_segment_key == "090505_300" 199 + assert sm4.last_segment_day == day 200 + 201 + 202 + def test_batch_construction_has_no_journal_root_and_skips_snapshot( 203 + tmp_path: Path, monkeypatch 204 + ): 205 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 206 + marker = { 207 + "last_segment_key": "marker", 208 + "last_segment_day": "20990101", 209 + "active": {}, 210 + } 211 + state_path = tmp_path / "awareness" / "activity_state.json" 212 + _write_json_atomic(state_path, marker) 213 + mtime_before = state_path.stat().st_mtime_ns 214 + 215 + state_machine = ActivityStateMachine() 216 + assert state_machine.journal_root is None 217 + state_machine.update(_sense(facet="work"), "090000_300", "20260427") 218 + state_machine.update(_sense(facet="work"), "090505_300", "20260427") 219 + state_machine.update(_sense(density="idle", facet="work"), "091010_300", "20260427") 220 + 221 + if state_machine.journal_root is not None: 222 + _persist_snapshot(state_machine.journal_root, state_machine) 223 + 224 + assert state_path.stat().st_mtime_ns == mtime_before 225 + assert json.loads(state_path.read_text(encoding="utf-8")) == marker 226 + 227 + 228 + def test_run_segment_sense_emits_activity_events(tmp_path: Path, monkeypatch): 229 + from think import thinking as think 230 + 231 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(tmp_path)) 232 + day = "20260427" 233 + stream = "default" 234 + segment = "090505_300" 235 + 236 + sm1 = ActivityStateMachine(journal_root=tmp_path) 237 + sm1.update(_sense(facet="work"), "090000_300", day) 238 + _persist_snapshot(tmp_path, sm1) 239 + 240 + talents_dir = tmp_path / "chronicle" / day / stream / segment / "talents" 241 + talents_dir.mkdir(parents=True) 242 + (talents_dir / "sense.json").write_text( 243 + json.dumps(_sense(density="idle", facet="work")), 244 + encoding="utf-8", 245 + ) 246 + 247 + events = [] 248 + monkeypatch.setattr( 249 + think, 250 + "get_talent_configs", 251 + lambda schedule=None, **kwargs: { 252 + "sense": {"priority": 10, "type": "generate", "output": "json"} 253 + }, 254 + ) 255 + monkeypatch.setattr(think, "_cortex_request_with_retry", lambda **kwargs: "sense-1") 256 + monkeypatch.setattr( 257 + think, "_drain_priority_batch", lambda *args, **kwargs: (1, 0, []) 258 + ) 259 + monkeypatch.setattr( 260 + think, "_jsonl_log", lambda event, **fields: events.append(event) 261 + ) 262 + monkeypatch.setattr(think, "run_activity_prompts", lambda **kwargs: True) 263 + monkeypatch.setattr(think, "_callosum", None) 264 + 265 + success, failed, failed_names = think.run_segment_sense( 266 + day, 267 + segment, 268 + refresh=False, 269 + verbose=False, 270 + stream=stream, 271 + state_machine=ActivityStateMachine(journal_root=tmp_path), 272 + ) 273 + 274 + assert (success, failed, failed_names) == (1, 0, []) 275 + assert "activity.detected" in events 276 + assert "activity.persisted" in events
+8 -5
tests/test_pipeline_smoke.py
··· 169 169 journal = tmp_path / "journal" 170 170 monkeypatch.setenv("SOLSTONE_JOURNAL", str(journal)) 171 171 172 - state_machine = ActivityStateMachine() 172 + state_machine = ActivityStateMachine(journal_root=journal) 173 173 activity_calls = [] 174 174 175 175 def _segment_configs(): ··· 328 328 state_path = journal / "awareness" / "activity_state.json" 329 329 assert state_path.exists() 330 330 state = json.loads(state_path.read_text()) 331 - assert len(state) == 1 332 - assert state[0]["activity"] == "coding" 333 - assert state[0]["state"] == "active" 334 - assert state[0]["id"] == make_activity_id("coding", "100000_300") 331 + assert state["last_segment_key"] == "100000_300" 332 + assert state["last_segment_day"] == DAY 333 + assert set(state["active"]) == {"work"} 334 + active = state["active"]["work"] 335 + assert active["activity"] == "coding" 336 + assert active["state"] == "active" 337 + assert active["id"] == make_activity_id("coding", "100000_300")
+9 -9
tests/test_think_activity.py
··· 714 714 # Find the ended change 715 715 ended = [c for c in changes if c.get("state") == "ended"] 716 716 assert len(ended) == 1 717 - facet = ended[0]["_facet"] 717 + facet = ended[0]["facet"] 718 718 719 719 # Persist completed record (what thinking.py now does) 720 720 completed = sm.get_completed_activities() ··· 777 777 778 778 # Simulate thinking.py facet_by_id logic 779 779 facet_by_id = { 780 - c["id"]: c.get("_facet", "__") 780 + c["id"]: c.get("facet", "__") 781 781 for c in changes 782 782 if c.get("state") == "ended" 783 783 } ··· 808 808 changes = sm.update(self._sense(density="idle"), "091000_300", "20260304") 809 809 810 810 facet_by_id = { 811 - c["id"]: c.get("_facet", "__") 811 + c["id"]: c.get("facet", "__") 812 812 for c in changes 813 813 if c.get("state") == "ended" 814 814 } ··· 859 859 self._sense(content_type="meeting"), "090500_300", "20260304" 860 860 ) 861 861 facet_by_id = { 862 - c["id"]: c.get("_facet", "__") 862 + c["id"]: c.get("facet", "__") 863 863 for c in changes1 864 864 if c.get("state") == "ended" 865 865 } ··· 873 873 ) 874 874 # No ended changes in this update 875 875 facet_by_id2 = { 876 - c["id"]: c.get("_facet", "__") 876 + c["id"]: c.get("facet", "__") 877 877 for c in changes2 878 878 if c.get("state") == "ended" 879 879 } ··· 938 938 changes = sm.update(self._sense(density="idle"), "090500_300", "20260304") 939 939 940 940 facet_by_id = { 941 - c["id"]: c.get("_facet", "__") 941 + c["id"]: c.get("facet", "__") 942 942 for c in changes 943 943 if c.get("state") == "ended" 944 944 } ··· 976 976 977 977 # Use the fixed ended_pairs approach (matches thinking.py) 978 978 ended_pairs = [ 979 - (c["id"], c.get("_facet", "__")) 979 + (c["id"], c.get("facet", "__")) 980 980 for c in changes 981 981 if c.get("state") == "ended" 982 982 ] ··· 1111 1111 ) 1112 1112 # Persist first completed 1113 1113 facet_by_id = { 1114 - c["id"]: c.get("_facet", "__") 1114 + c["id"]: c.get("facet", "__") 1115 1115 for c in changes1 1116 1116 if c.get("state") == "ended" 1117 1117 } ··· 1139 1139 "20260304", 1140 1140 ) 1141 1141 facet_by_id2 = { 1142 - c["id"]: c.get("_facet", "__") 1142 + c["id"]: c.get("facet", "__") 1143 1143 for c in changes2 1144 1144 if c.get("state") == "ended" 1145 1145 }
+36 -11
tests/test_think_segment.py
··· 168 168 updates = [] 169 169 170 170 class StubStateMachine: 171 + def __init__(self): 172 + self.state = {} 173 + self.last_segment_key = None 174 + self.last_segment_day = None 175 + self.journal_root = segment_dir.parents[3] 176 + 171 177 def update(self, sense_output, segment, day): 172 178 updates.append((sense_output, segment, day)) 173 - return [] 174 - 175 - def get_current_state(self): 179 + self.last_segment_key = segment 180 + self.last_segment_day = day 176 181 return [] 177 182 178 183 def get_completed_activities(self): ··· 230 235 ) 231 236 assert activity_state_path.exists() 232 237 state_data = json.loads(activity_state_path.read_text()) 233 - assert state_data == [] 238 + assert state_data == { 239 + "last_segment_key": "120000_300", 240 + "last_segment_day": "20240115", 241 + "active": {}, 242 + } 234 243 235 244 def test_conditional_screen_dispatch(self, segment_dir, monkeypatch): 236 245 from think import thinking as think ··· 492 501 activity_calls = [] 493 502 494 503 class StubStateMachine: 504 + def __init__(self): 505 + self.state = {} 506 + self.last_segment_key = None 507 + self.last_segment_day = None 508 + self.journal_root = segment_dir.parents[3] 509 + 495 510 def update(self, sense_output, segment, day): 496 511 updates.append((sense_output, segment, day)) 497 - return [{"state": "ended", "id": "coding_120000_300", "_facet": "work"}] 498 - 499 - def get_current_state(self): 500 - return [{"facet": "work", "state": "active", "id": "coding_120000_300"}] 512 + self.last_segment_key = segment 513 + self.last_segment_day = day 514 + self.state = { 515 + "work": { 516 + "facet": "work", 517 + "state": "active", 518 + "id": "coding_120000_300", 519 + } 520 + } 521 + return [{"state": "ended", "id": "coding_120000_300", "facet": "work"}] 501 522 502 523 def get_completed_activities(self): 503 524 return [ ··· 570 591 ) 571 592 assert activity_state_path.exists() 572 593 state_data = json.loads(activity_state_path.read_text()) 573 - assert state_data == [ 574 - {"facet": "work", "state": "active", "id": "coding_120000_300"} 575 - ] 594 + assert state_data == { 595 + "last_segment_key": "120000_300", 596 + "last_segment_day": "20240115", 597 + "active": { 598 + "work": {"facet": "work", "state": "active", "id": "coding_120000_300"} 599 + }, 600 + } 576 601 577 602 def test_generator_triggers_incremental_indexing(self, segment_dir, monkeypatch): 578 603 from think import thinking as think
+65 -31
think/activity_state_machine.py
··· 3 3 4 4 """Deterministic activity state machine replacing LLM-based activity tracking.""" 5 5 6 + import json 7 + import logging 6 8 import time 9 + from pathlib import Path 7 10 8 11 from think.activities import LEVEL_VALUES, make_activity_id 9 12 from think.utils import segment_parse ··· 13 16 14 17 15 18 class ActivityStateMachine: 16 - def __init__(self) -> None: 19 + def __init__(self, journal_root: Path | None = None) -> None: 17 20 self.state: dict[str, dict] = {} 18 21 self.last_segment_key: str | None = None 19 22 self.last_segment_day: str | None = None 20 - self.history: list[dict] = [] 21 23 self._completed: list[dict] = [] 24 + self.journal_root = Path(journal_root) if journal_root else None 25 + 26 + if self.journal_root is None: 27 + return 28 + 29 + state_path = self.journal_root / "awareness" / "activity_state.json" 30 + try: 31 + data = json.loads(state_path.read_text(encoding="utf-8")) 32 + except FileNotFoundError: 33 + return 34 + except (OSError, json.JSONDecodeError) as exc: 35 + logging.debug("Failed to hydrate activity state: %s", exc) 36 + return 37 + 38 + if isinstance(data, list): 39 + active = { 40 + str(entry.get("facet") or "__"): entry 41 + for entry in data 42 + if isinstance(entry, dict) 43 + } 44 + elif isinstance(data, dict): 45 + raw_active = data.get("active") 46 + active = raw_active if isinstance(raw_active, dict) else {} 47 + self.last_segment_key = data.get("last_segment_key") 48 + self.last_segment_day = data.get("last_segment_day") 49 + else: 50 + logging.debug("Ignoring unexpected activity state shape") 51 + return 52 + 53 + for facet, raw_entry in active.items(): 54 + if not isinstance(raw_entry, dict): 55 + continue 56 + entry = dict(raw_entry) 57 + if not all( 58 + key in entry for key in ("id", "activity", "since", "description") 59 + ): 60 + logging.debug( 61 + "Ignoring incomplete activity state entry for facet %s", facet 62 + ) 63 + continue 64 + entry.setdefault("facet", facet) 65 + entry.setdefault("segment", entry.get("since")) 66 + entry.setdefault("segments", [entry["since"]]) 67 + self.state[str(facet)] = entry 22 68 23 69 def _parse_segment_seconds(self, segment_key: str) -> int | None: 24 70 start_time, _end_time = segment_parse(segment_key) ··· 62 108 "since": prior["since"], 63 109 "description": prior["description"], 64 110 "_change": change, 65 - "_facet": facet, 66 - "_segment": segment_key, 111 + "facet": facet, 112 + "segment": segment_key, 67 113 } 68 114 changes.append(entry) 69 115 self._completed.append(self._make_completed_record(prior)) ··· 75 121 return { 76 122 "id": entry["id"], 77 123 "activity": entry["activity"], 78 - "segments": entry.get("_segments", [entry["since"]]), 124 + "segments": entry.get("segments", [entry["since"]]), 79 125 "level_avg": LEVEL_VALUES.get(entry.get("level", "medium"), 0.5), 80 126 "description": entry["description"], 81 127 "active_entities": entry.get("active_entities", []), ··· 109 155 changes.extend(self._end_all(segment_key, "ended_idle")) 110 156 self.last_segment_key = segment_key 111 157 self.last_segment_day = day 112 - self.history.extend(dict(change) for change in changes) 113 158 return changes 114 159 115 160 facet_map = {} ··· 127 172 "since": prior["since"], 128 173 "description": prior["description"], 129 174 "_change": "ended_facet_gone", 130 - "_facet": facet, 131 - "_segment": segment_key, 175 + "facet": facet, 176 + "segment": segment_key, 132 177 } 133 178 changes.append(entry) 134 179 self._completed.append(self._make_completed_record(prior)) ··· 149 194 "since": prior["since"], 150 195 "description": prior["description"], 151 196 "_change": "ended_type_change", 152 - "_facet": facet, 153 - "_segment": segment_key, 197 + "facet": facet, 198 + "segment": segment_key, 154 199 } 155 200 changes.append(ended) 156 201 self._completed.append(self._make_completed_record(prior)) ··· 164 209 "level": level, 165 210 "active_entities": entity_names, 166 211 "_change": "new", 167 - "_facet": facet, 168 - "_segment": segment_key, 169 - "_segments": [segment_key], 212 + "facet": facet, 213 + "segment": segment_key, 214 + "segments": [segment_key], 170 215 } 171 216 self.state[facet] = new_entry 172 217 changes.append(dict(new_entry)) ··· 175 220 prior["level"] = level 176 221 prior["active_entities"] = entity_names 177 222 prior["_change"] = "continuing" 178 - prior["_segment"] = segment_key 179 - prior.setdefault("_segments", [prior["since"]]) 180 - if segment_key not in prior["_segments"]: 181 - prior["_segments"].append(segment_key) 223 + prior["segment"] = segment_key 224 + prior.setdefault("segments", [prior["since"]]) 225 + if segment_key not in prior["segments"]: 226 + prior["segments"].append(segment_key) 182 227 changes.append(dict(prior)) 183 228 else: 184 229 new_entry = { ··· 190 235 "level": level, 191 236 "active_entities": entity_names, 192 237 "_change": "new", 193 - "_facet": facet, 194 - "_segment": segment_key, 195 - "_segments": [segment_key], 238 + "facet": facet, 239 + "segment": segment_key, 240 + "segments": [segment_key], 196 241 } 197 242 self.state[facet] = new_entry 198 243 changes.append(dict(new_entry)) 199 244 200 245 self.last_segment_key = segment_key 201 246 self.last_segment_day = day 202 - self.history.extend(dict(change) for change in changes) 203 247 return changes 204 - 205 - def get_current_state(self) -> list[dict]: 206 - result = [] 207 - for facet in sorted(self.state): 208 - entry = self.state[facet] 209 - clean = {k: v for k, v in entry.items() if not k.startswith("_")} 210 - if "active_entities" in clean: 211 - clean["active_entities"] = list(clean["active_entities"]) 212 - result.append(clean) 213 - return result 214 248 215 249 def get_completed_activities(self) -> list[dict]: 216 250 return list(self._completed)
+46 -26
think/thinking.py
··· 667 667 segment=segment, 668 668 ) 669 669 if state_machine is not None: 670 + routing_day = state_machine.last_segment_day or day 670 671 idle_changes = state_machine.update(sense_json, segment, day) 671 672 # Persist completed activity records from idle transitions 672 673 ended_pairs = [ 673 - (c["id"], c.get("_facet", "__")) 674 + (c["id"], c.get("facet", "__")) 674 675 for c in idle_changes 675 676 if c.get("state") == "ended" 676 677 ] ··· 689 690 ) 690 691 rec = completed_lookup.get(activity_id) 691 692 if rec: 692 - append_activity_record(facet, day, rec) 693 + append_activity_record(facet, routing_day, rec) 693 694 _jsonl_log( 694 695 "activity.persisted", 695 696 mode=target_schedule, ··· 706 707 facet, 707 708 ) 708 709 run_activity_prompts( 709 - day=day, 710 + day=routing_day, 710 711 activity_id=str(activity_id), 711 712 facet=str(facet), 712 713 refresh=refresh, 713 714 verbose=verbose, 714 715 max_concurrency=max_concurrency, 715 716 ) 716 - # Persist activity state even on idle segments 717 - try: 718 - awareness_dir = Path(get_journal()) / "awareness" 719 - _write_json_atomic( 720 - awareness_dir / "activity_state.json", 721 - state_machine.get_current_state(), 722 - ) 723 - except Exception: 724 - logging.debug("Failed to persist activity state", exc_info=True) 717 + if state_machine.journal_root is not None: 718 + try: 719 + snapshot = { 720 + "last_segment_key": state_machine.last_segment_key, 721 + "last_segment_day": state_machine.last_segment_day, 722 + "active": { 723 + facet: {k: v for k, v in entry.items() if k != "_change"} 724 + for facet, entry in state_machine.state.items() 725 + }, 726 + } 727 + _write_json_atomic( 728 + state_machine.journal_root 729 + / "awareness" 730 + / "activity_state.json", 731 + snapshot, 732 + ) 733 + except Exception: 734 + logging.debug( 735 + "Failed to write activity state snapshot", exc_info=True 736 + ) 725 737 726 738 duration_ms = int((time.time() - start_time) * 1000) 727 739 emit( ··· 900 912 ) 901 913 902 914 if state_machine is not None: 915 + routing_day = state_machine.last_segment_day or day 903 916 changes = state_machine.update(sense_json, segment, day) 904 917 # Persist completed activity records before running activity agents 905 918 ended_pairs = [ 906 - (c["id"], c.get("_facet", "__")) 919 + (c["id"], c.get("facet", "__")) 907 920 for c in changes 908 921 if c.get("state") == "ended" 909 922 ] ··· 922 935 ) 923 936 rec = completed_lookup.get(activity_id) 924 937 if rec: 925 - append_activity_record(facet, day, rec) 938 + append_activity_record(facet, routing_day, rec) 926 939 _jsonl_log( 927 940 "activity.persisted", 928 941 mode=target_schedule, ··· 931 944 activity=str(activity_id), 932 945 facet=str(facet), 933 946 ) 934 - # Persist activity state for awareness.md consumption 935 - try: 936 - awareness_dir = Path(get_journal()) / "awareness" 937 - _write_json_atomic( 938 - awareness_dir / "activity_state.json", 939 - state_machine.get_current_state(), 940 - ) 941 - except Exception: 942 - logging.debug("Failed to persist activity state", exc_info=True) 947 + if state_machine.journal_root is not None: 948 + try: 949 + snapshot = { 950 + "last_segment_key": state_machine.last_segment_key, 951 + "last_segment_day": state_machine.last_segment_day, 952 + "active": { 953 + facet: {k: v for k, v in entry.items() if k != "_change"} 954 + for facet, entry in state_machine.state.items() 955 + }, 956 + } 957 + _write_json_atomic( 958 + state_machine.journal_root / "awareness" / "activity_state.json", 959 + snapshot, 960 + ) 961 + except Exception: 962 + logging.debug("Failed to write activity state snapshot", exc_info=True) 943 963 for change in changes: 944 964 if change.get("state") != "ended": 945 965 continue 946 - facet = change.get("_facet") 966 + facet = change.get("facet") 947 967 activity_id = change.get("id") 948 968 if not facet or not activity_id: 949 969 continue ··· 953 973 facet, 954 974 ) 955 975 run_activity_prompts( 956 - day=day, 976 + day=routing_day, 957 977 activity_id=str(activity_id), 958 978 facet=str(facet), 959 979 refresh=refresh, ··· 3100 3120 max_concurrency=args.jobs, 3101 3121 stream=resolved_stream, 3102 3122 timeout=None if args.no_timeout else 610, 3103 - state_machine=ActivityStateMachine(), 3123 + state_machine=ActivityStateMachine(journal_root=Path(get_journal())), 3104 3124 ) 3105 3125 else: 3106 3126 success_count, fail_count, failed_names = run_daily_prompts(