personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(think): --skip-talents flag to gate segment-talent dispatch

Adds opt-in --skip-talents NAMES to suppress dispatch of named segment-scheduled
talents during --segments / --segment runs. Primary driver: realizer backfill
for the Apr 4-26 cliff, where awareness_tender + pulse account for ~80% of
per-segment wall-clock and aren't needed for realizer output. Default (flag
absent / empty) is byte-identical to current behavior; orthogonal to and composable with --no-activity-prompts.

Co-Authored-By: Codex <codex@openai.com>

+582 -43
+507
tests/test_think_skip_talents.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for think --skip-talents behavior.""" 5 + 6 + import json 7 + from pathlib import Path 8 + 9 + import pytest 10 + 11 + DAY = "20240115" 12 + SEGMENT = "120000_300" 13 + STREAM = "default" 14 + FACET = "work" 15 + ACTIVITY_ID = "coding_120000_300" 16 + 17 + 18 + @pytest.fixture 19 + def segment_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: 20 + """Create a temporary journal with a segment directory.""" 21 + journal = tmp_path / "journal" 22 + segment_path = journal / "chronicle" / DAY / STREAM / SEGMENT 23 + (segment_path / "talents").mkdir(parents=True) 24 + 25 + monkeypatch.setenv("SOLSTONE_JOURNAL", str(journal)) 26 + monkeypatch.setenv("SOL_SKIP_SUPERVISOR_CHECK", "1") 27 + return segment_path 28 + 29 + 30 + def _segment_configs(*names: str) -> dict[str, dict]: 31 + configs = { 32 + "sense": { 33 + "priority": 10, 34 + "type": "generate", 35 + "output": "json", 36 + "schedule": "segment", 37 + }, 38 + "entities": { 39 + "priority": 20, 40 + "type": "cogitate", 41 + "schedule": "segment", 42 + }, 43 + "documents": { 44 + "priority": 20, 45 + "type": "cogitate", 46 + "schedule": "segment", 47 + }, 48 + "screen": { 49 + "priority": 20, 50 + "type": "generate", 51 + "output": "md", 52 + "schedule": "segment", 53 + }, 54 + "speaker_attribution": { 55 + "priority": 20, 56 + "type": "cogitate", 57 + "schedule": "segment", 58 + }, 59 + "awareness_tender": { 60 + "priority": 30, 61 + "type": "cogitate", 62 + "schedule": "segment", 63 + }, 64 + "pulse": { 65 + "priority": 30, 66 + "type": "cogitate", 67 + "schedule": "segment", 68 + }, 69 + } 70 + return {name: dict(configs[name]) for name in names} 71 + 72 + 73 + def _all_segment_configs() -> dict[str, dict]: 74 + return _segment_configs( 75 + "sense", 76 + "entities", 77 + "documents", 78 + "screen", 79 + "speaker_attribution", 80 + "awareness_tender", 81 + "pulse", 82 + ) 83 + 84 + 85 + def _active_sense_output() -> dict: 86 + return { 87 + "density": "active", 88 + "recommend": { 89 + "screen_record": True, 90 + "speaker_attribution": True, 91 + "pulse_update": True, 92 + }, 93 + "facets": [], 94 + } 95 + 96 + 97 + def _write_sense_output(segment_dir: Path, sense_json: dict) -> None: 98 + (segment_dir / "talents" / "sense.json").write_text( 99 + json.dumps(sense_json), 100 + encoding="utf-8", 101 + ) 102 + 103 + 104 + def _read_events(path: Path) -> list[dict]: 105 + return [ 106 + json.loads(line) 107 + for line in path.read_text(encoding="utf-8").splitlines() 108 + if line.strip() 109 + ] 110 + 111 + 112 + def _skip_events(events: list[dict]) -> list[dict]: 113 + return [ 114 + event 115 + for event in events 116 + if event["event"] == "talent.skip" 117 + and event.get("reason") == "skip_talents_flag" 118 + ] 119 + 120 + 121 + def _patch_segment_dependencies( 122 + monkeypatch: pytest.MonkeyPatch, 123 + spawned: list[str], 124 + append_calls: list[tuple] | None = None, 125 + activity_calls: list[dict] | None = None, 126 + ) -> None: 127 + from think import thinking as think 128 + 129 + monkeypatch.setattr( 130 + think, 131 + "get_talent_configs", 132 + lambda schedule=None, **kwargs: _all_segment_configs(), 133 + ) 134 + monkeypatch.setattr( 135 + think, 136 + "cortex_request", 137 + lambda prompt, name, config=None: spawned.append(name) or f"agent-{name}", 138 + ) 139 + monkeypatch.setattr( 140 + think, 141 + "wait_for_uses", 142 + lambda agent_ids, timeout=600: ({aid: "finish" for aid in agent_ids}, []), 143 + ) 144 + monkeypatch.setattr( 145 + think, 146 + "append_activity_record", 147 + lambda *args: append_calls.append(args) if append_calls is not None else None, 148 + ) 149 + monkeypatch.setattr( 150 + think, 151 + "run_activity_prompts", 152 + lambda **kwargs: ( 153 + activity_calls.append(kwargs) or True 154 + if activity_calls is not None 155 + else True 156 + ), 157 + ) 158 + monkeypatch.setattr(think, "_callosum", None) 159 + 160 + 161 + class EndedActivityStateMachine: 162 + def __init__(self, journal_root: Path) -> None: 163 + self.state: dict = {} 164 + self.last_segment_key: str | None = None 165 + self.last_segment_day: str | None = None 166 + self.journal_root = journal_root 167 + 168 + def update(self, sense_output: dict, segment: str, day: str) -> list[dict]: 169 + self.last_segment_key = segment 170 + self.last_segment_day = day 171 + self.state = { 172 + FACET: { 173 + "facet": FACET, 174 + "state": "active", 175 + "id": ACTIVITY_ID, 176 + } 177 + } 178 + return [{"state": "ended", "id": ACTIVITY_ID, "facet": FACET}] 179 + 180 + def get_completed_activities(self) -> list[dict]: 181 + return [ 182 + { 183 + "id": ACTIVITY_ID, 184 + "activity": "coding", 185 + "segments": [SEGMENT], 186 + "level_avg": 0.5, 187 + "description": "coding", 188 + "active_entities": [], 189 + "created_at": 1713200000000, 190 + } 191 + ] 192 + 193 + 194 + class MockCallosumConnection: 195 + def __init__(self, *args, **kwargs) -> None: 196 + pass 197 + 198 + def start(self, callback=None) -> None: 199 + return None 200 + 201 + def emit(self, *args, **kwargs) -> None: 202 + return None 203 + 204 + def stop(self) -> None: 205 + return None 206 + 207 + 208 + def _patch_main_dependencies( 209 + monkeypatch: pytest.MonkeyPatch, 210 + segment_dir: Path, 211 + calls: list[dict], 212 + ) -> None: 213 + from think import thinking as think 214 + 215 + def mock_run_segment_sense(day, segment, refresh, verbose, **kwargs): 216 + calls.append( 217 + { 218 + "day": day, 219 + "segment": segment, 220 + "refresh": refresh, 221 + "verbose": verbose, 222 + **kwargs, 223 + } 224 + ) 225 + return (1, 0, []) 226 + 227 + monkeypatch.setattr( 228 + think, 229 + "iter_segments", 230 + lambda day: [(STREAM, SEGMENT, segment_dir)], 231 + ) 232 + monkeypatch.setattr(think, "run_segment_sense", mock_run_segment_sense) 233 + monkeypatch.setattr(think, "check_callosum_available", lambda: True) 234 + monkeypatch.setattr(think, "CallosumConnection", MockCallosumConnection) 235 + 236 + 237 + def test_parser_forwards_skip_talents( 238 + segment_dir: Path, 239 + monkeypatch: pytest.MonkeyPatch, 240 + ) -> None: 241 + from think import thinking as think 242 + 243 + calls: list[dict] = [] 244 + _patch_main_dependencies(monkeypatch, segment_dir, calls) 245 + monkeypatch.setattr( 246 + "sys.argv", 247 + [ 248 + "sol think", 249 + "--day", 250 + DAY, 251 + "--segment", 252 + SEGMENT, 253 + "--skip-talents", 254 + "awareness_tender,pulse", 255 + ], 256 + ) 257 + 258 + think.main() 259 + 260 + assert len(calls) == 1 261 + assert calls[0]["skip_talents"] == frozenset({"awareness_tender", "pulse"}) 262 + 263 + 264 + def test_empty_flag_forwards_empty_set( 265 + segment_dir: Path, 266 + monkeypatch: pytest.MonkeyPatch, 267 + ) -> None: 268 + from think import thinking as think 269 + 270 + calls: list[dict] = [] 271 + _patch_main_dependencies(monkeypatch, segment_dir, calls) 272 + monkeypatch.setattr( 273 + "sys.argv", 274 + [ 275 + "sol think", 276 + "--day", 277 + DAY, 278 + "--segment", 279 + SEGMENT, 280 + "--skip-talents", 281 + "", 282 + ], 283 + ) 284 + 285 + think.main() 286 + 287 + assert len(calls) == 1 288 + assert calls[0]["skip_talents"] == frozenset() 289 + 290 + 291 + def test_segment_batch_skip_does_not_dispatch_or_fail( 292 + segment_dir: Path, 293 + monkeypatch: pytest.MonkeyPatch, 294 + ) -> None: 295 + from think import thinking as think 296 + from think.thinking import ThinkingJSONLWriter 297 + 298 + spawned: list[str] = [] 299 + jsonl_path = segment_dir.parent.parent / "health" / "test_skip_entities.jsonl" 300 + writer = ThinkingJSONLWriter(str(jsonl_path)) 301 + _write_sense_output(segment_dir, _active_sense_output()) 302 + (segment_dir / "audio.npz").touch() 303 + _patch_segment_dependencies(monkeypatch, spawned) 304 + monkeypatch.setattr(think, "_jsonl", writer) 305 + 306 + success, failed, failed_names = think.run_segment_sense( 307 + DAY, 308 + SEGMENT, 309 + refresh=False, 310 + verbose=False, 311 + stream=STREAM, 312 + skip_talents=frozenset({"entities"}), 313 + ) 314 + writer.close() 315 + monkeypatch.setattr(think, "_jsonl", None) 316 + 317 + assert spawned == [ 318 + "sense", 319 + "documents", 320 + "screen", 321 + "speaker_attribution", 322 + "awareness_tender", 323 + "pulse", 324 + ] 325 + assert success == 6 326 + assert failed == 0 327 + assert failed_names == [] 328 + 329 + skip_events = _skip_events(_read_events(jsonl_path)) 330 + assert len(skip_events) == 1 331 + assert skip_events[0]["name"] == "entities" 332 + 333 + 334 + def test_tail_talent_skip_does_not_dispatch_or_fail( 335 + segment_dir: Path, 336 + monkeypatch: pytest.MonkeyPatch, 337 + ) -> None: 338 + from think import thinking as think 339 + from think.thinking import ThinkingJSONLWriter 340 + 341 + spawned: list[str] = [] 342 + jsonl_path = segment_dir.parent.parent / "health" / "test_skip_pulse.jsonl" 343 + writer = ThinkingJSONLWriter(str(jsonl_path)) 344 + _write_sense_output(segment_dir, _active_sense_output()) 345 + (segment_dir / "audio.npz").touch() 346 + _patch_segment_dependencies(monkeypatch, spawned) 347 + monkeypatch.setattr(think, "_jsonl", writer) 348 + 349 + success, failed, failed_names = think.run_segment_sense( 350 + DAY, 351 + SEGMENT, 352 + refresh=False, 353 + verbose=False, 354 + stream=STREAM, 355 + skip_talents=frozenset({"pulse"}), 356 + ) 357 + writer.close() 358 + monkeypatch.setattr(think, "_jsonl", None) 359 + 360 + assert "pulse" not in spawned 361 + assert spawned == [ 362 + "sense", 363 + "entities", 364 + "documents", 365 + "screen", 366 + "speaker_attribution", 367 + "awareness_tender", 368 + ] 369 + assert success == 6 370 + assert failed == 0 371 + assert failed_names == [] 372 + 373 + skip_events = _skip_events(_read_events(jsonl_path)) 374 + assert len(skip_events) == 1 375 + assert skip_events[0]["name"] == "pulse" 376 + 377 + 378 + def test_sense_skip_uses_cached_output_for_downstream( 379 + segment_dir: Path, 380 + monkeypatch: pytest.MonkeyPatch, 381 + ) -> None: 382 + from think import thinking as think 383 + from think.thinking import ThinkingJSONLWriter 384 + 385 + spawned: list[str] = [] 386 + jsonl_path = segment_dir.parent.parent / "health" / "test_skip_sense.jsonl" 387 + writer = ThinkingJSONLWriter(str(jsonl_path)) 388 + _write_sense_output(segment_dir, _active_sense_output()) 389 + (segment_dir / "audio.npz").touch() 390 + _patch_segment_dependencies(monkeypatch, spawned) 391 + monkeypatch.setattr(think, "_jsonl", writer) 392 + 393 + success, failed, failed_names = think.run_segment_sense( 394 + DAY, 395 + SEGMENT, 396 + refresh=False, 397 + verbose=False, 398 + stream=STREAM, 399 + skip_talents=frozenset({"sense"}), 400 + ) 401 + writer.close() 402 + monkeypatch.setattr(think, "_jsonl", None) 403 + 404 + assert "sense" not in spawned 405 + assert spawned == [ 406 + "entities", 407 + "documents", 408 + "screen", 409 + "speaker_attribution", 410 + "awareness_tender", 411 + "pulse", 412 + ] 413 + assert success == 6 414 + assert failed == 0 415 + assert failed_names == [] 416 + 417 + skip_events = _skip_events(_read_events(jsonl_path)) 418 + assert len(skip_events) == 1 419 + assert skip_events[0]["name"] == "sense" 420 + 421 + 422 + def test_skip_talents_composes_with_no_activity_prompts( 423 + segment_dir: Path, 424 + monkeypatch: pytest.MonkeyPatch, 425 + ) -> None: 426 + from think import thinking as think 427 + from think.thinking import ThinkingJSONLWriter 428 + 429 + spawned: list[str] = [] 430 + append_calls: list[tuple] = [] 431 + activity_calls: list[dict] = [] 432 + jsonl_path = segment_dir.parent.parent / "health" / "test_composed.jsonl" 433 + writer = ThinkingJSONLWriter(str(jsonl_path)) 434 + _write_sense_output(segment_dir, _active_sense_output()) 435 + (segment_dir / "audio.npz").touch() 436 + _patch_segment_dependencies(monkeypatch, spawned, append_calls, activity_calls) 437 + monkeypatch.setattr(think, "_jsonl", writer) 438 + 439 + success, failed, failed_names = think.run_segment_sense( 440 + DAY, 441 + SEGMENT, 442 + refresh=False, 443 + verbose=False, 444 + stream=STREAM, 445 + state_machine=EndedActivityStateMachine(segment_dir.parents[3]), 446 + skip_activity_prompts=True, 447 + skip_talents=frozenset({"entities"}), 448 + ) 449 + writer.close() 450 + monkeypatch.setattr(think, "_jsonl", None) 451 + 452 + assert "entities" not in spawned 453 + assert len(append_calls) >= 1 454 + assert activity_calls == [] 455 + assert success == 6 456 + assert failed == 0 457 + assert failed_names == [] 458 + 459 + events = _read_events(jsonl_path) 460 + assert [event["name"] for event in _skip_events(events)] == ["entities"] 461 + assert any( 462 + event["event"] == "activity.prompts_skipped" 463 + and event["activity"] == ACTIVITY_ID 464 + and event["facet"] == FACET 465 + for event in events 466 + ) 467 + 468 + 469 + def test_unknown_name_is_silent_noop( 470 + segment_dir: Path, 471 + monkeypatch: pytest.MonkeyPatch, 472 + ) -> None: 473 + from think import thinking as think 474 + from think.thinking import ThinkingJSONLWriter 475 + 476 + spawned: list[str] = [] 477 + jsonl_path = segment_dir.parent.parent / "health" / "test_unknown_noop.jsonl" 478 + writer = ThinkingJSONLWriter(str(jsonl_path)) 479 + _write_sense_output(segment_dir, _active_sense_output()) 480 + (segment_dir / "audio.npz").touch() 481 + _patch_segment_dependencies(monkeypatch, spawned) 482 + monkeypatch.setattr(think, "_jsonl", writer) 483 + 484 + success, failed, failed_names = think.run_segment_sense( 485 + DAY, 486 + SEGMENT, 487 + refresh=False, 488 + verbose=False, 489 + stream=STREAM, 490 + skip_talents=frozenset({"bogus_name"}), 491 + ) 492 + writer.close() 493 + monkeypatch.setattr(think, "_jsonl", None) 494 + 495 + assert spawned == [ 496 + "sense", 497 + "entities", 498 + "documents", 499 + "screen", 500 + "speaker_attribution", 501 + "awareness_tender", 502 + "pulse", 503 + ] 504 + assert success == 7 505 + assert failed == 0 506 + assert failed_names == [] 507 + assert _skip_events(_read_events(jsonl_path)) == []
+75 -43
think/thinking.py
··· 227 227 return socket_path.exists() 228 228 229 229 230 + _SKIPPED: object = object() 230 231 _SEND_RETRY_DELAYS = (0.5, 1.0) # seconds between retries (3 attempts total) 231 232 232 233 ··· 470 471 state_machine: ActivityStateMachine | None = None, 471 472 *, 472 473 skip_activity_prompts: bool = False, 474 + skip_talents: frozenset[str] = frozenset(), 473 475 ) -> tuple[int, int, list[str]]: 474 476 """Run Sense-first linear orchestrator for a single segment. 475 477 ··· 486 488 def _cfg(name: str) -> dict | None: 487 489 return all_prompts.get(name) 488 490 489 - def _dispatch_agent(name: str, config: dict) -> str | None: 491 + def _dispatch_agent(name: str, config: dict) -> str | None | object: 492 + if name in skip_talents: 493 + _log_skip( 494 + name, 495 + "skip_talents_flag", 496 + "Skipped by --skip-talents", 497 + day=day, 498 + segment=segment, 499 + ) 500 + return _SKIPPED 501 + 490 502 is_generate = config["type"] == "generate" 491 503 request_config: dict = {"day": day, "segment": segment} 492 504 if is_generate: ··· 575 587 duration_ms=duration_ms, 576 588 ) 577 589 return (0, 1, ["sense (send)"]) 578 - 579 - emit( 580 - "talent_started", 581 - mode=target_schedule, 582 - day=day, 583 - segment=segment, 584 - name="sense", 585 - use_id=sense_agent_id, 586 - ) 587 - _jsonl_log( 588 - "talent.dispatch", 589 - mode=target_schedule, 590 - day=day, 591 - segment=segment, 592 - name="sense", 593 - use_id=sense_agent_id, 594 - ) 595 - _update_status(current_agents=["sense"]) 596 - 597 - s, f, fn = _drain_priority_batch( 598 - [(sense_agent_id, "sense", sense_config, None)], 599 - target_schedule, 600 - day, 601 - segment, 602 - stream, 603 - timeout, 604 - ) 605 - total_success += s 606 - total_failed += f 607 - all_failed_names.extend(fn) 608 - _update_status(agents_completed=total_success + total_failed, current_agents=[]) 609 - 610 - if f > 0: 611 - duration_ms = int((time.time() - start_time) * 1000) 590 + elif sense_agent_id is not _SKIPPED: 612 591 emit( 613 - "completed", 592 + "talent_started", 614 593 mode=target_schedule, 615 594 day=day, 616 595 segment=segment, 617 - success=total_success, 618 - failed=total_failed, 619 - failed_names=all_failed_names, 620 - duration_ms=duration_ms, 596 + name="sense", 597 + use_id=sense_agent_id, 598 + ) 599 + _jsonl_log( 600 + "talent.dispatch", 601 + mode=target_schedule, 602 + day=day, 603 + segment=segment, 604 + name="sense", 605 + use_id=sense_agent_id, 621 606 ) 622 - return (total_success, total_failed, all_failed_names) 607 + _update_status(current_agents=["sense"]) 608 + 609 + s, f, fn = _drain_priority_batch( 610 + [(sense_agent_id, "sense", sense_config, None)], 611 + target_schedule, 612 + day, 613 + segment, 614 + stream, 615 + timeout, 616 + ) 617 + total_success += s 618 + total_failed += f 619 + all_failed_names.extend(fn) 620 + _update_status(agents_completed=total_success + total_failed, current_agents=[]) 621 + 622 + if f > 0: 623 + duration_ms = int((time.time() - start_time) * 1000) 624 + emit( 625 + "completed", 626 + mode=target_schedule, 627 + day=day, 628 + segment=segment, 629 + success=total_success, 630 + failed=total_failed, 631 + failed_names=all_failed_names, 632 + duration_ms=duration_ms, 633 + ) 634 + return (total_success, total_failed, all_failed_names) 623 635 624 636 sense_output_path = get_output_path( 625 637 day_dir, ··· 853 865 spawned: list[tuple[str, str, dict, str | None]] = [] 854 866 for agent_name, config in agents_to_run: 855 867 use_id = _dispatch_agent(agent_name, config) 868 + if use_id is _SKIPPED: 869 + continue 856 870 if use_id is None: 857 871 _log_skip( 858 872 agent_name, ··· 1016 1030 total_failed += 1 1017 1031 all_failed_names.append("awareness_tender (send)") 1018 1032 _update_status(agents_completed=total_success + total_failed) 1019 - else: 1033 + elif at_agent_id is not _SKIPPED: 1020 1034 emit( 1021 1035 "talent_started", 1022 1036 mode=target_schedule, ··· 1064 1078 total_failed += 1 1065 1079 all_failed_names.append("pulse (send)") 1066 1080 _update_status(agents_completed=total_success + total_failed) 1067 - else: 1081 + elif pulse_agent_id is not _SKIPPED: 1068 1082 emit( 1069 1083 "talent_started", 1070 1084 mode=target_schedule, ··· 2835 2849 ), 2836 2850 ) 2837 2851 parser.add_argument( 2852 + "--skip-talents", 2853 + type=str, 2854 + default="", 2855 + help=( 2856 + "Comma-separated segment-scheduled talent names to suppress during " 2857 + "--segments/--segment runs (e.g., 'awareness_tender,pulse' for " 2858 + "realizer-backfill speedup). Recognized: sense, entities, documents, " 2859 + "screen, speaker_attribution, awareness_tender, pulse. Skipping 'sense' " 2860 + "relies on a cached talents/sense.json from a prior run." 2861 + ), 2862 + ) 2863 + parser.add_argument( 2838 2864 "--updated", 2839 2865 action="store_true", 2840 2866 help="List days with pending daily processing and exit", ··· 2916 2942 if args.no_activity_prompts and args.activity: 2917 2943 parser.error("--no-activity-prompts cannot be combined with --activity") 2918 2944 2945 + skip_talents: frozenset[str] = frozenset( 2946 + name.strip() for name in (args.skip_talents or "").split(",") if name.strip() 2947 + ) 2948 + 2919 2949 if args.activity and (args.segment or args.segments or args.flush): 2920 2950 parser.error( 2921 2951 "--activity is incompatible with --segment, --segments, and --flush" ··· 3042 3072 timeout=None if args.no_timeout else 610, 3043 3073 state_machine=batch_state_machine, 3044 3074 skip_activity_prompts=args.no_activity_prompts, 3075 + skip_talents=skip_talents, 3045 3076 ) 3046 3077 # Touch stream.updated marker after each segment 3047 3078 try: ··· 3156 3187 timeout=None if args.no_timeout else 610, 3157 3188 state_machine=ActivityStateMachine(journal_root=Path(get_journal())), 3158 3189 skip_activity_prompts=args.no_activity_prompts, 3190 + skip_talents=skip_talents, 3159 3191 ) 3160 3192 else: 3161 3193 success_count, fail_count, failed_names = run_daily_prompts(