personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(dream): add JSONL structured logging

Add file-backed JSONL logging for dream runs, including run lifecycle events, phase timing, agent dispatch/completion/failure records, skip logging, and segment sense activity tracking.

Cover the new writer and segment JSONL behavior with focused dream tests.

+785 -12
+177 -1
tests/test_dream_segment.py
··· 500 500 return [{"facet": "work", "state": "active", "id": "coding_120000_300"}] 501 501 502 502 def get_completed_activities(self): 503 - return [{"id": "coding_120000_300", "activity": "coding", "segments": ["120000_300"], "level_avg": 0.5, "description": "coding", "active_entities": [], "created_at": 1713200000000}] 503 + return [ 504 + { 505 + "id": "coding_120000_300", 506 + "activity": "coding", 507 + "segments": ["120000_300"], 508 + "level_avg": 0.5, 509 + "description": "coding", 510 + "active_entities": [], 511 + "created_at": 1713200000000, 512 + } 513 + ] 504 514 505 515 _write_sense_output( 506 516 segment_dir, ··· 730 740 def start(self, callback=None): 731 741 return None 732 742 743 + def emit(self, *args, **kwargs): 744 + return None 745 + 733 746 def stop(self): 734 747 return None 735 748 ··· 777 790 def start(self, callback=None): 778 791 return None 779 792 793 + def emit(self, *args, **kwargs): 794 + return None 795 + 780 796 def stop(self): 781 797 return None 782 798 ··· 807 823 pass 808 824 809 825 def start(self, callback=None): 826 + return None 827 + 828 + def emit(self, *args, **kwargs): 810 829 return None 811 830 812 831 def stop(self): ··· 847 866 assert iter_calls == 0 848 867 assert len(calls) == 1 849 868 assert calls[0]["stream"] == "explicit_stream" 869 + 870 + 871 + class TestDreamJSONLWriter: 872 + """Tests for DreamJSONLWriter.""" 873 + 874 + def test_noop_when_no_path(self): 875 + from think.dream import DreamJSONLWriter 876 + 877 + writer = DreamJSONLWriter(None) 878 + writer.log("test.event", foo="bar") 879 + writer.close() 880 + 881 + assert writer.skip_count == 0 882 + 883 + def test_writes_jsonl_to_file(self, tmp_path): 884 + from think.dream import DreamJSONLWriter 885 + 886 + path = tmp_path / "test.jsonl" 887 + writer = DreamJSONLWriter(str(path)) 888 + writer.log("run.start", mode="segment", day="20240115") 889 + writer.log("agent.skip", name="screen", reason="not_recommended", detail="test") 890 + writer.close() 891 + 892 + lines = path.read_text().strip().split("\n") 893 + assert len(lines) == 2 894 + 895 + first = json.loads(lines[0]) 896 + assert first["event"] == "run.start" 897 + assert "ts" in first 898 + assert isinstance(first["ts"], int) 899 + assert first["mode"] == "segment" 900 + 901 + second = json.loads(lines[1]) 902 + assert second["event"] == "agent.skip" 903 + assert writer.skip_count == 1 904 + 905 + def test_creates_parent_dirs(self, tmp_path): 906 + from think.dream import DreamJSONLWriter 907 + 908 + path = tmp_path / "nested" / "dir" / "test.jsonl" 909 + writer = DreamJSONLWriter(str(path)) 910 + writer.log("test.event") 911 + writer.close() 912 + 913 + assert path.exists() 914 + 915 + 916 + class TestDreamJSONLEvents: 917 + """Tests for JSONL event emission during segment orchestration.""" 918 + 919 + def test_density_idle_skip_event(self, segment_dir, monkeypatch): 920 + """JSONL emits agent.skip with reason=density_idle for idle segments.""" 921 + from think import dream 922 + from think.dream import DreamJSONLWriter 923 + 924 + jsonl_path = segment_dir.parent.parent / "health" / "test_idle.jsonl" 925 + writer = DreamJSONLWriter(str(jsonl_path)) 926 + 927 + _write_sense_output( 928 + segment_dir, 929 + {"density": "idle", "recommend": {}, "facets": []}, 930 + ) 931 + 932 + monkeypatch.setattr( 933 + dream, 934 + "get_talent_configs", 935 + lambda schedule=None, **kwargs: _segment_configs("sense"), 936 + ) 937 + monkeypatch.setattr( 938 + dream, 939 + "cortex_request", 940 + lambda prompt, name, config=None: "agent-sense", 941 + ) 942 + monkeypatch.setattr( 943 + dream, 944 + "wait_for_agents", 945 + lambda agent_ids, timeout=600: ({aid: "finish" for aid in agent_ids}, []), 946 + ) 947 + monkeypatch.setattr(dream, "_callosum", None) 948 + monkeypatch.setattr(dream, "_jsonl", writer) 949 + 950 + dream.run_segment_sense( 951 + "20240115", 952 + "120000_300", 953 + refresh=False, 954 + verbose=False, 955 + stream="default", 956 + ) 957 + writer.close() 958 + 959 + events = [ 960 + json.loads(line) 961 + for line in jsonl_path.read_text(encoding="utf-8").strip().splitlines() 962 + ] 963 + skips = [event for event in events if event["event"] == "agent.skip"] 964 + 965 + assert any(skip["reason"] == "density_idle" for skip in skips) 966 + 967 + def test_sense_complete_and_skip_events(self, segment_dir, monkeypatch): 968 + from think import dream 969 + from think.dream import DreamJSONLWriter 970 + 971 + jsonl_path = segment_dir.parent.parent / "health" / "test_dream.jsonl" 972 + writer = DreamJSONLWriter(str(jsonl_path)) 973 + 974 + _write_sense_output( 975 + segment_dir, 976 + { 977 + "density": "active", 978 + "recommend": { 979 + "screen_record": False, 980 + "speaker_attribution": False, 981 + "pulse_update": False, 982 + }, 983 + "facets": [], 984 + }, 985 + ) 986 + 987 + monkeypatch.setattr( 988 + dream, 989 + "get_talent_configs", 990 + lambda schedule=None, **kwargs: _segment_configs("sense", "entities"), 991 + ) 992 + monkeypatch.setattr( 993 + dream, 994 + "cortex_request", 995 + lambda prompt, name, config=None: f"agent-{name}", 996 + ) 997 + monkeypatch.setattr( 998 + dream, 999 + "wait_for_agents", 1000 + lambda agent_ids, timeout=600: ({aid: "finish" for aid in agent_ids}, []), 1001 + ) 1002 + monkeypatch.setattr(dream, "_callosum", None) 1003 + monkeypatch.setattr(dream, "_jsonl", writer) 1004 + 1005 + dream.run_segment_sense( 1006 + "20240115", 1007 + "120000_300", 1008 + refresh=False, 1009 + verbose=False, 1010 + stream="default", 1011 + ) 1012 + writer.close() 1013 + 1014 + events = [ 1015 + json.loads(line) 1016 + for line in jsonl_path.read_text(encoding="utf-8").strip().splitlines() 1017 + ] 1018 + assert "sense.complete" in [event["event"] for event in events] 1019 + 1020 + skips = [event for event in events if event["event"] == "agent.skip"] 1021 + skip_pairs = {(event["name"], event["reason"]) for event in skips} 1022 + assert ("documents", "no_config") in skip_pairs 1023 + assert ("screen", "not_recommended") in skip_pairs 1024 + assert ("speaker_attribution", "not_recommended") in skip_pairs 1025 + assert ("pulse", "not_recommended") in skip_pairs
+21 -3
tests/test_runner.py
··· 331 331 ("cmd", "expected_name"), 332 332 [ 333 333 (["sol", "dream", "--day", "20240115"], "daily_dream"), 334 - (["sol", "dream", "--day", "20240115", "--segment", "120000_300"], "segment_dream"), 334 + ( 335 + ["sol", "dream", "--day", "20240115", "--segment", "120000_300"], 336 + "segment_dream", 337 + ), 335 338 (["sol", "dream", "--weekly"], "weekly_dream"), 336 - (["sol", "dream", "--activity", "id", "--facet", "work", "--day", "20240115"], "activity_dream"), 337 - (["sol", "dream", "--day", "20240115", "--segment", "120000_300", "--flush"], "flush_dream"), 339 + ( 340 + [ 341 + "sol", 342 + "dream", 343 + "--activity", 344 + "id", 345 + "--facet", 346 + "work", 347 + "--day", 348 + "20240115", 349 + ], 350 + "activity_dream", 351 + ), 352 + ( 353 + ["sol", "dream", "--day", "20240115", "--segment", "120000_300", "--flush"], 354 + "flush_dream", 355 + ), 338 356 (["sol", "dream", "--day", "20240115", "--segments"], "segment_dream"), 339 357 ], 340 358 )
+587 -8
think/dream.py
··· 45 45 get_rev, 46 46 iso_date, 47 47 iter_segments, 48 + now_ms, 48 49 setup_cli, 49 50 updated_days, 50 51 ) ··· 55 56 _status: dict = {} 56 57 _status_lock = threading.Lock() 57 58 _stop_status = threading.Event() 59 + 60 + 61 + class DreamJSONLWriter: 62 + """Write JSONL events to a file. File-only, fail-silent.""" 63 + 64 + def __init__(self, path: str | None = None) -> None: 65 + self.file = None 66 + self.skip_count = 0 67 + if path: 68 + try: 69 + Path(path).parent.mkdir(parents=True, exist_ok=True) 70 + self.file = open(path, "a", encoding="utf-8") 71 + except Exception: 72 + pass 73 + 74 + def log(self, event: str, **fields) -> None: 75 + if not self.file: 76 + return 77 + data = {"event": event, "ts": now_ms(), **fields} 78 + if event == "agent.skip": 79 + self.skip_count += 1 80 + try: 81 + self.file.write(json.dumps(data, ensure_ascii=False) + "\n") 82 + self.file.flush() 83 + except Exception: 84 + pass 85 + 86 + def close(self) -> None: 87 + if self.file: 88 + try: 89 + self.file.close() 90 + except Exception: 91 + pass 92 + 93 + 94 + _jsonl: DreamJSONLWriter | None = None 95 + 96 + 97 + def _jsonl_log(event: str, **fields) -> None: 98 + """Write a JSONL event if the writer is active.""" 99 + if _jsonl: 100 + _jsonl.log(event, **fields) 101 + 102 + 103 + def _log_skip(name: str, reason: str, detail: str, **extra) -> None: 104 + """Emit an agent.skip JSONL event.""" 105 + _jsonl_log("agent.skip", name=name, reason=reason, detail=detail, **extra) 58 106 59 107 60 108 def _update_status(**fields) -> None: ··· 253 301 state="timeout", 254 302 **({"facet": timed_facet} if timed_facet else {}), 255 303 ) 304 + _jsonl_log( 305 + "agent.fail", 306 + mode=target_schedule, 307 + day=day, 308 + segment=segment, 309 + name=timed_name, 310 + agent_id=agent_id, 311 + state="timeout", 312 + **({"facet": timed_facet} if timed_facet else {}), 313 + ) 256 314 257 315 for agent_id, prompt_name, config, agent_facet in spawned: 258 316 if agent_id in timed_out: ··· 264 322 success += 1 265 323 emit( 266 324 "agent_completed", 325 + mode=target_schedule, 326 + day=day, 327 + segment=segment, 328 + name=prompt_name, 329 + agent_id=agent_id, 330 + state="finish", 331 + **({"facet": agent_facet} if agent_facet else {}), 332 + ) 333 + _jsonl_log( 334 + "agent.complete", 267 335 mode=target_schedule, 268 336 day=day, 269 337 segment=segment, ··· 308 376 state=end_state, 309 377 **({"facet": agent_facet} if agent_facet else {}), 310 378 ) 379 + _jsonl_log( 380 + "agent.fail", 381 + mode=target_schedule, 382 + day=day, 383 + segment=segment, 384 + name=prompt_name, 385 + agent_id=agent_id, 386 + state=end_state, 387 + **({"facet": agent_facet} if agent_facet else {}), 388 + ) 311 389 312 390 return (success, failed, failed_names) 313 391 ··· 427 505 sense_config = _cfg("sense") 428 506 if sense_config is None: 429 507 logging.error("Sense agent not found in segment configs") 508 + _log_skip( 509 + "sense", 510 + "no_config", 511 + "Sense agent not found in segment configs", 512 + mode=target_schedule, 513 + day=day, 514 + segment=segment, 515 + ) 430 516 return (0, 1, ["sense (not_configured)"]) 431 517 432 518 day_dir = day_path(day) ··· 459 545 460 546 sense_agent_id = _dispatch_agent("sense", sense_config) 461 547 if sense_agent_id is None: 548 + _log_skip( 549 + "sense", 550 + "send_failed", 551 + "All cortex request attempts failed", 552 + mode=target_schedule, 553 + day=day, 554 + segment=segment, 555 + ) 462 556 duration_ms = int((time.time() - start_time) * 1000) 463 557 emit( 464 558 "completed", ··· 480 574 name="sense", 481 575 agent_id=sense_agent_id, 482 576 ) 577 + _jsonl_log( 578 + "agent.dispatch", 579 + mode=target_schedule, 580 + day=day, 581 + segment=segment, 582 + name="sense", 583 + agent_id=sense_agent_id, 584 + ) 483 585 _update_status(current_agents=["sense"]) 484 586 485 587 s, f, fn = _drain_priority_batch( ··· 536 638 537 639 write_sense_outputs(sense_json, seg_dir, stream=stream) 538 640 density = sense_json.get("density") or "active" 641 + _jsonl_log( 642 + "sense.complete", 643 + mode=target_schedule, 644 + day=day, 645 + segment=segment, 646 + density=density, 647 + recommend=sense_json.get("recommend") or {}, 648 + ) 539 649 540 650 if density == "idle" and not refresh: 541 651 write_idle_stubs(seg_dir) 542 652 logging.info("Segment %s is idle, skipping remaining agents", segment) 653 + _log_skip( 654 + "*", 655 + "density_idle", 656 + f"Segment {segment} is idle, skipping remaining agents", 657 + mode=target_schedule, 658 + day=day, 659 + segment=segment, 660 + ) 543 661 if state_machine is not None: 544 662 idle_changes = state_machine.update(sense_json, segment, day) 545 663 # Persist completed activity records from idle transitions ··· 552 670 for rec in state_machine.get_completed_activities(): 553 671 completed_lookup.setdefault(rec["id"], rec) 554 672 for activity_id, facet in ended_pairs: 673 + _jsonl_log( 674 + "activity.detected", 675 + mode=target_schedule, 676 + day=day, 677 + segment=segment, 678 + activity=str(activity_id), 679 + facet=str(facet), 680 + state="ended", 681 + ) 555 682 rec = completed_lookup.get(activity_id) 556 683 if rec: 557 684 append_activity_record(facet, day, rec) 685 + _jsonl_log( 686 + "activity.persisted", 687 + mode=target_schedule, 688 + day=day, 689 + segment=segment, 690 + activity=str(activity_id), 691 + facet=str(facet), 692 + ) 558 693 # Run activity agents for completed activities 559 694 for activity_id, facet in ended_pairs: 560 695 logging.info( ··· 594 729 return (total_success, total_failed, all_failed_names) 595 730 596 731 recommend = sense_json.get("recommend") or {} 732 + has_audio_embeddings = _has_audio_embeddings(seg_dir) 597 733 agents_to_run: list[tuple[str, dict]] = [] 598 734 599 735 entities_config = _cfg("entities") 600 736 if entities_config: 601 737 agents_to_run.append(("entities", entities_config)) 738 + else: 739 + _log_skip( 740 + "entities", 741 + "no_config", 742 + "entities config not found", 743 + mode=target_schedule, 744 + day=day, 745 + segment=segment, 746 + ) 602 747 603 748 documents_config = _cfg("documents") 604 749 if documents_config: 605 750 agents_to_run.append(("documents", documents_config)) 751 + else: 752 + _log_skip( 753 + "documents", 754 + "no_config", 755 + "documents config not found", 756 + mode=target_schedule, 757 + day=day, 758 + segment=segment, 759 + ) 606 760 607 761 if recommend.get("screen_record"): 608 762 screen_config = _cfg("screen") 609 763 if screen_config: 610 764 agents_to_run.append(("screen", screen_config)) 765 + else: 766 + _log_skip( 767 + "screen", 768 + "no_config", 769 + "screen config not found", 770 + mode=target_schedule, 771 + day=day, 772 + segment=segment, 773 + ) 774 + else: 775 + _log_skip( 776 + "screen", 777 + "not_recommended", 778 + "screen_record not recommended by sense", 779 + mode=target_schedule, 780 + day=day, 781 + segment=segment, 782 + ) 611 783 612 - if recommend.get("speaker_attribution") and _has_audio_embeddings(seg_dir): 784 + if recommend.get("speaker_attribution") and has_audio_embeddings: 613 785 speaker_config = _cfg("speaker_attribution") 614 786 if speaker_config: 615 787 agents_to_run.append(("speaker_attribution", speaker_config)) 788 + else: 789 + _log_skip( 790 + "speaker_attribution", 791 + "no_config", 792 + "speaker_attribution config not found", 793 + mode=target_schedule, 794 + day=day, 795 + segment=segment, 796 + ) 797 + else: 798 + if not recommend.get("speaker_attribution"): 799 + _log_skip( 800 + "speaker_attribution", 801 + "not_recommended", 802 + "speaker_attribution not recommended by sense", 803 + mode=target_schedule, 804 + day=day, 805 + segment=segment, 806 + ) 807 + elif not has_audio_embeddings: 808 + _log_skip( 809 + "speaker_attribution", 810 + "not_recommended", 811 + "no audio embeddings available", 812 + mode=target_schedule, 813 + day=day, 814 + segment=segment, 815 + ) 616 816 617 817 total_expected = 1 + len(agents_to_run) 618 818 if recommend.get("pulse_update") and pulse_config: ··· 623 823 for agent_name, config in agents_to_run: 624 824 agent_id = _dispatch_agent(agent_name, config) 625 825 if agent_id is None: 826 + _log_skip( 827 + agent_name, 828 + "send_failed", 829 + f"All cortex request attempts failed for {agent_name}", 830 + mode=target_schedule, 831 + day=day, 832 + segment=segment, 833 + ) 626 834 total_failed += 1 627 835 all_failed_names.append(f"{agent_name} (send)") 628 836 _update_status(agents_completed=total_success + total_failed) ··· 631 839 spawned.append((agent_id, agent_name, config, None)) 632 840 emit( 633 841 "agent_started", 842 + mode=target_schedule, 843 + day=day, 844 + segment=segment, 845 + name=agent_name, 846 + agent_id=agent_id, 847 + ) 848 + _jsonl_log( 849 + "agent.dispatch", 634 850 mode=target_schedule, 635 851 day=day, 636 852 segment=segment, ··· 687 903 for rec in state_machine.get_completed_activities(): 688 904 completed_lookup.setdefault(rec["id"], rec) 689 905 for activity_id, facet in ended_pairs: 906 + _jsonl_log( 907 + "activity.detected", 908 + mode=target_schedule, 909 + day=day, 910 + segment=segment, 911 + activity=str(activity_id), 912 + facet=str(facet), 913 + state="ended", 914 + ) 690 915 rec = completed_lookup.get(activity_id) 691 916 if rec: 692 917 append_activity_record(facet, day, rec) 918 + _jsonl_log( 919 + "activity.persisted", 920 + mode=target_schedule, 921 + day=day, 922 + segment=segment, 923 + activity=str(activity_id), 924 + facet=str(facet), 925 + ) 693 926 # Persist activity state for awareness.md consumption 694 927 try: 695 928 awareness_dir = Path(get_journal()) / "awareness" ··· 724 957 if awareness_tender_config: 725 958 at_agent_id = _dispatch_agent("awareness_tender", awareness_tender_config) 726 959 if at_agent_id is None: 960 + _log_skip( 961 + "awareness_tender", 962 + "send_failed", 963 + "All cortex request attempts failed for awareness_tender", 964 + mode=target_schedule, 965 + day=day, 966 + segment=segment, 967 + ) 727 968 total_failed += 1 728 969 all_failed_names.append("awareness_tender (send)") 729 970 _update_status(agents_completed=total_success + total_failed) ··· 736 977 name="awareness_tender", 737 978 agent_id=at_agent_id, 738 979 ) 980 + _jsonl_log( 981 + "agent.dispatch", 982 + mode=target_schedule, 983 + day=day, 984 + segment=segment, 985 + name="awareness_tender", 986 + agent_id=at_agent_id, 987 + ) 739 988 _update_status(current_agents=["awareness_tender"]) 740 989 s, f, fn = _drain_priority_batch( 741 990 [(at_agent_id, "awareness_tender", awareness_tender_config, None)], ··· 756 1005 if recommend.get("pulse_update") and pulse_config: 757 1006 pulse_agent_id = _dispatch_agent("pulse", pulse_config) 758 1007 if pulse_agent_id is None: 1008 + _log_skip( 1009 + "pulse", 1010 + "send_failed", 1011 + "All cortex request attempts failed for pulse", 1012 + mode=target_schedule, 1013 + day=day, 1014 + segment=segment, 1015 + ) 759 1016 total_failed += 1 760 1017 all_failed_names.append("pulse (send)") 761 1018 _update_status(agents_completed=total_success + total_failed) ··· 768 1025 name="pulse", 769 1026 agent_id=pulse_agent_id, 770 1027 ) 1028 + _jsonl_log( 1029 + "agent.dispatch", 1030 + mode=target_schedule, 1031 + day=day, 1032 + segment=segment, 1033 + name="pulse", 1034 + agent_id=pulse_agent_id, 1035 + ) 771 1036 _update_status(current_agents=["pulse"]) 772 1037 s, f, fn = _drain_priority_batch( 773 1038 [(pulse_agent_id, "pulse", pulse_config, None)], ··· 784 1049 agents_completed=total_success + total_failed, 785 1050 current_agents=[], 786 1051 ) 1052 + elif not recommend.get("pulse_update"): 1053 + _log_skip( 1054 + "pulse", 1055 + "not_recommended", 1056 + "pulse_update not recommended by sense", 1057 + mode=target_schedule, 1058 + day=day, 1059 + segment=segment, 1060 + ) 1061 + elif not pulse_config: 1062 + _log_skip( 1063 + "pulse", 1064 + "no_config", 1065 + "pulse config not found", 1066 + mode=target_schedule, 1067 + day=day, 1068 + segment=segment, 1069 + ) 787 1070 788 1071 duration_ms = int((time.time() - start_time) * 1000) 789 1072 emit( ··· 893 1176 priority=priority, 894 1177 count=len(prompts_list), 895 1178 ) 1179 + _jsonl_log( 1180 + "group.start", 1181 + mode=target_schedule, 1182 + day=day, 1183 + priority=priority, 1184 + count=len(prompts_list), 1185 + ) 896 1186 897 1187 spawned: list[ 898 1188 tuple[str, str, dict, str | None] ··· 910 1200 logging.info( 911 1201 f"Skipping {prompt_name}: stream '{stream}' matches exclude_streams" 912 1202 ) 1203 + _log_skip( 1204 + prompt_name, 1205 + "stream_excluded", 1206 + f"stream '{stream}' matches exclude_streams", 1207 + mode=target_schedule, 1208 + day=day, 1209 + ) 913 1210 continue 914 1211 915 1212 try: ··· 921 1218 logging.info( 922 1219 f"Skipping {prompt_name} for {facet_name}: " 923 1220 f"no activity on {day_formatted}" 1221 + ) 1222 + _log_skip( 1223 + prompt_name, 1224 + "no_active_facets", 1225 + f"no activity on {iso_date(day)}", 1226 + mode=target_schedule, 1227 + day=day, 1228 + facet=facet_name, 924 1229 ) 925 1230 continue 926 1231 ··· 954 1259 config=request_config, 955 1260 ) 956 1261 if agent_id is None: 1262 + _log_skip( 1263 + prompt_name, 1264 + "send_failed", 1265 + f"All cortex request attempts failed for {prompt_name}", 1266 + mode=target_schedule, 1267 + day=day, 1268 + facet=facet_name, 1269 + ) 957 1270 group_failed += 1 958 1271 all_failed_names.append( 959 1272 f"{prompt_name}/{facet_name} (send)" ··· 968 1281 agent_id=agent_id, 969 1282 facet=facet_name, 970 1283 ) 1284 + _jsonl_log( 1285 + "agent.dispatch", 1286 + mode=target_schedule, 1287 + day=day, 1288 + name=prompt_name, 1289 + agent_id=agent_id, 1290 + facet=facet_name, 1291 + ) 971 1292 logging.info( 972 1293 f"Started {prompt_name} for {facet_name} (ID: {agent_id})" 973 1294 ) ··· 1022 1343 config=request_config, 1023 1344 ) 1024 1345 if agent_id is None: 1346 + _log_skip( 1347 + prompt_name, 1348 + "send_failed", 1349 + f"All cortex request attempts failed for {prompt_name}", 1350 + mode=target_schedule, 1351 + day=day, 1352 + ) 1025 1353 group_failed += 1 1026 1354 all_failed_names.append(f"{prompt_name} (send)") 1027 1355 continue ··· 1033 1361 name=prompt_name, 1034 1362 agent_id=agent_id, 1035 1363 ) 1364 + _jsonl_log( 1365 + "agent.dispatch", 1366 + mode=target_schedule, 1367 + day=day, 1368 + name=prompt_name, 1369 + agent_id=agent_id, 1370 + ) 1036 1371 logging.info(f"Started {prompt_name} (ID: {agent_id})") 1037 1372 1038 1373 # Drain batch when concurrency limit reached ··· 1081 1416 1082 1417 emit( 1083 1418 "group_completed", 1419 + mode=target_schedule, 1420 + day=day, 1421 + priority=priority, 1422 + success=group_success, 1423 + failed=group_failed, 1424 + ) 1425 + _jsonl_log( 1426 + "group.complete", 1084 1427 mode=target_schedule, 1085 1428 day=day, 1086 1429 priority=priority, ··· 1189 1532 priority=priority, 1190 1533 count=len(prompts_list), 1191 1534 ) 1535 + _jsonl_log( 1536 + "group.start", 1537 + mode=target_schedule, 1538 + day=day, 1539 + priority=priority, 1540 + count=len(prompts_list), 1541 + ) 1192 1542 1193 1543 spawned: list[ 1194 1544 tuple[str, str, dict, str | None] ··· 1206 1556 logging.info( 1207 1557 f"Skipping {prompt_name}: stream '{stream}' matches exclude_streams" 1208 1558 ) 1559 + _log_skip( 1560 + prompt_name, 1561 + "stream_excluded", 1562 + f"stream '{stream}' matches exclude_streams", 1563 + mode=target_schedule, 1564 + day=day, 1565 + ) 1209 1566 continue 1210 1567 1211 1568 try: ··· 1217 1574 logging.info( 1218 1575 f"Skipping {prompt_name} for {facet_name}: " 1219 1576 f"no activity on {day_formatted}" 1577 + ) 1578 + _log_skip( 1579 + prompt_name, 1580 + "no_active_facets", 1581 + f"no activity on {iso_date(day)}", 1582 + mode=target_schedule, 1583 + day=day, 1584 + facet=facet_name, 1220 1585 ) 1221 1586 continue 1222 1587 ··· 1250 1615 config=request_config, 1251 1616 ) 1252 1617 if agent_id is None: 1618 + _log_skip( 1619 + prompt_name, 1620 + "send_failed", 1621 + f"All cortex request attempts failed for {prompt_name}", 1622 + mode=target_schedule, 1623 + day=day, 1624 + facet=facet_name, 1625 + ) 1253 1626 group_failed += 1 1254 1627 all_failed_names.append( 1255 1628 f"{prompt_name}/{facet_name} (send)" ··· 1258 1631 spawned.append((agent_id, prompt_name, config, facet_name)) 1259 1632 emit( 1260 1633 "agent_started", 1634 + mode=target_schedule, 1635 + day=day, 1636 + name=prompt_name, 1637 + agent_id=agent_id, 1638 + facet=facet_name, 1639 + ) 1640 + _jsonl_log( 1641 + "agent.dispatch", 1261 1642 mode=target_schedule, 1262 1643 day=day, 1263 1644 name=prompt_name, ··· 1318 1699 config=request_config, 1319 1700 ) 1320 1701 if agent_id is None: 1702 + _log_skip( 1703 + prompt_name, 1704 + "send_failed", 1705 + f"All cortex request attempts failed for {prompt_name}", 1706 + mode=target_schedule, 1707 + day=day, 1708 + ) 1321 1709 group_failed += 1 1322 1710 all_failed_names.append(f"{prompt_name} (send)") 1323 1711 continue ··· 1329 1717 name=prompt_name, 1330 1718 agent_id=agent_id, 1331 1719 ) 1720 + _jsonl_log( 1721 + "agent.dispatch", 1722 + mode=target_schedule, 1723 + day=day, 1724 + name=prompt_name, 1725 + agent_id=agent_id, 1726 + ) 1332 1727 logging.info(f"Started {prompt_name} (ID: {agent_id})") 1333 1728 1334 1729 # Drain batch when concurrency limit reached ··· 1377 1772 1378 1773 emit( 1379 1774 "group_completed", 1775 + mode=target_schedule, 1776 + day=day, 1777 + priority=priority, 1778 + success=group_success, 1779 + failed=group_failed, 1780 + ) 1781 + _jsonl_log( 1782 + "group.complete", 1380 1783 mode=target_schedule, 1381 1784 day=day, 1382 1785 priority=priority, ··· 1528 1931 priority=priority, 1529 1932 count=len(prompts_list), 1530 1933 ) 1934 + _jsonl_log( 1935 + "group.start", 1936 + mode="activity", 1937 + day=day, 1938 + activity=activity_id, 1939 + facet=facet, 1940 + priority=priority, 1941 + count=len(prompts_list), 1942 + ) 1531 1943 1532 1944 spawned: list[tuple[str, str, dict]] = [] # (agent_id, name, config) 1533 1945 group_success = 0 ··· 1553 1965 ) 1554 1966 emit( 1555 1967 "agent_completed", 1968 + mode="activity", 1969 + day=day, 1970 + activity=activity_id, 1971 + facet=facet, 1972 + name=timed_name, 1973 + agent_id=agent_id, 1974 + state="timeout", 1975 + ) 1976 + _jsonl_log( 1977 + "agent.fail", 1556 1978 mode="activity", 1557 1979 day=day, 1558 1980 activity=activity_id, ··· 1603 2025 agent_id=agent_id, 1604 2026 state=end_state, 1605 2027 ) 2028 + _jsonl_log( 2029 + "agent.complete" if end_state == "finish" else "agent.fail", 2030 + mode="activity", 2031 + day=day, 2032 + activity=activity_id, 2033 + facet=facet, 2034 + name=prompt_name, 2035 + agent_id=agent_id, 2036 + state=end_state, 2037 + ) 1606 2038 1607 2039 spawned = [] 1608 2040 ··· 1651 2083 config=request_config, 1652 2084 ) 1653 2085 if agent_id is None: 2086 + _log_skip( 2087 + prompt_name, 2088 + "send_failed", 2089 + f"All cortex request attempts failed for {prompt_name}", 2090 + mode="activity", 2091 + day=day, 2092 + activity=activity_id, 2093 + facet=facet, 2094 + ) 1654 2095 total_failed += 1 1655 2096 continue 1656 2097 spawned.append((agent_id, prompt_name, config)) ··· 1663 2104 name=prompt_name, 1664 2105 agent_id=agent_id, 1665 2106 ) 2107 + _jsonl_log( 2108 + "agent.dispatch", 2109 + mode="activity", 2110 + day=day, 2111 + activity=activity_id, 2112 + facet=facet, 2113 + name=prompt_name, 2114 + agent_id=agent_id, 2115 + ) 1666 2116 logging.info(f"Started {prompt_name} (ID: {agent_id})") 1667 2117 1668 2118 # Drain batch when concurrency limit reached ··· 1697 2147 1698 2148 emit( 1699 2149 "group_completed", 2150 + mode="activity", 2151 + day=day, 2152 + activity=activity_id, 2153 + facet=facet, 2154 + priority=priority, 2155 + success=group_success, 2156 + failed=group_failed, 2157 + ) 2158 + _jsonl_log( 2159 + "group.complete", 1700 2160 mode="activity", 1701 2161 day=day, 1702 2162 activity=activity_id, ··· 1813 2273 config=request_config, 1814 2274 ) 1815 2275 if agent_id is None: 2276 + _log_skip( 2277 + prompt_name, 2278 + "send_failed", 2279 + f"All cortex request attempts failed for {prompt_name}", 2280 + mode="flush", 2281 + day=day, 2282 + segment=segment, 2283 + ) 1816 2284 total_failed += 1 1817 2285 continue 1818 2286 spawned.append((agent_id, prompt_name, config)) ··· 1824 2292 name=prompt_name, 1825 2293 agent_id=agent_id, 1826 2294 ) 2295 + _jsonl_log( 2296 + "agent.dispatch", 2297 + mode="flush", 2298 + day=day, 2299 + segment=segment, 2300 + name=prompt_name, 2301 + agent_id=agent_id, 2302 + ) 1827 2303 logging.info(f"Started flush agent {prompt_name} (ID: {agent_id})") 1828 2304 1829 2305 except Exception as e: ··· 1838 2314 if timed_out: 1839 2315 logging.warning(f"Flush: {len(timed_out)} agents timed out") 1840 2316 total_failed += len(timed_out) 2317 + for agent_id in timed_out: 2318 + timed_name = next( 2319 + (n for aid, n, _ in spawned if aid == agent_id), "unknown" 2320 + ) 2321 + _jsonl_log( 2322 + "agent.fail", 2323 + mode="flush", 2324 + day=day, 2325 + segment=segment, 2326 + name=timed_name, 2327 + agent_id=agent_id, 2328 + state="timeout", 2329 + ) 1841 2330 1842 2331 for agent_id, prompt_name, config in spawned: 1843 2332 if agent_id in timed_out: ··· 1854 2343 1855 2344 emit( 1856 2345 "agent_completed", 2346 + mode="flush", 2347 + day=day, 2348 + segment=segment, 2349 + name=prompt_name, 2350 + agent_id=agent_id, 2351 + state=end_state, 2352 + ) 2353 + _jsonl_log( 2354 + "agent.complete" if end_state == "finish" else "agent.fail", 1857 2355 mode="flush", 1858 2356 day=day, 1859 2357 segment=segment, ··· 2262 2760 2263 2761 2264 2762 def main() -> None: 2265 - global _callosum 2763 + global _callosum, _jsonl 2266 2764 2267 2765 parser = parse_args() 2268 2766 args = setup_cli(parser) ··· 2353 2851 weekly=args.weekly, 2354 2852 ) 2355 2853 sys.exit(0) 2854 + 2855 + if args.activity: 2856 + _run_mode = "activity" 2857 + elif args.flush: 2858 + _run_mode = "flush" 2859 + elif args.segments: 2860 + _run_mode = "segment" 2861 + elif args.weekly: 2862 + _run_mode = "weekly" 2863 + elif args.segment: 2864 + _run_mode = "segment" 2865 + else: 2866 + _run_mode = "daily" 2867 + 2868 + _run_ref = str(now_ms()) 2869 + _run_start_time = time.time() 2870 + _run_result = {"success": 0, "failed": 0} 2871 + jsonl_path = str(day_path(day) / "health" / f"{_run_ref}_{_run_mode}_dream.jsonl") 2872 + _jsonl = DreamJSONLWriter(jsonl_path) 2356 2873 2357 2874 # Start callosum connection 2358 2875 _callosum = CallosumConnection(defaults={"rev": get_rev()}) ··· 2360 2877 _stop_status.clear() 2361 2878 status_thread = threading.Thread(target=_emit_periodic_status, daemon=True) 2362 2879 status_thread.start() 2880 + _jsonl_log("run.start", mode=_run_mode, day=day, ref=_run_ref) 2363 2881 2364 2882 try: 2365 2883 # Handle activity-triggered execution mode ··· 2372 2890 verbose=args.verbose, 2373 2891 max_concurrency=args.jobs, 2374 2892 ) 2893 + _run_result["success"] = 1 if success else 0 2894 + _run_result["failed"] = 0 if success else 1 2375 2895 sys.exit(0 if success else 1) 2376 2896 2377 2897 # Handle flush mode ··· 2384 2904 verbose=args.verbose, 2385 2905 stream=args.stream, 2386 2906 ) 2907 + _run_result["success"] = 1 if success else 0 2908 + _run_result["failed"] = 0 if success else 1 2387 2909 sys.exit(0 if success else 1) 2388 2910 2389 2911 # Handle batch segment re-processing mode ··· 2457 2979 else: 2458 2980 day_log(day, f"dream --segments failed={batch_failed}") 2459 2981 2982 + _run_result["success"] = batch_success 2983 + _run_result["failed"] = batch_failed 2460 2984 if batch_failed > 0: 2461 2985 sys.exit(1) 2462 2986 sys.exit(0) ··· 2483 3007 f"{success_count} succeeded, {fail_count} failed" 2484 3008 ) 2485 3009 day_log(day, f"dream --weekly failed={fail_count}") 3010 + _run_result["success"] = success_count 3011 + _run_result["failed"] = fail_count 2486 3012 2487 3013 if fail_count > 0: 2488 3014 names = ", ".join(failed_names) ··· 2497 3023 if args.verbose: 2498 3024 cmd.append("-v") 2499 3025 day_log(day, f"starting: {' '.join(cmd)}") 2500 - if not run_command(cmd, day): 3026 + _jsonl_log("phase.start", mode=_run_mode, day=day, phase="sense_repair") 3027 + _phase_start = time.time() 3028 + phase_ok = run_command(cmd, day) 3029 + _jsonl_log( 3030 + "phase.complete", 3031 + mode=_run_mode, 3032 + day=day, 3033 + phase="sense_repair", 3034 + success=phase_ok, 3035 + duration_ms=int((time.time() - _phase_start) * 1000), 3036 + ) 3037 + if not phase_ok: 2501 3038 logging.warning("Sense repair failed, continuing anyway") 2502 3039 2503 3040 # MAIN PHASE: Run prompts ··· 2529 3066 max_concurrency=args.jobs, 2530 3067 stream=resolved_stream, 2531 3068 ) 3069 + _run_result["success"] = success_count 3070 + _run_result["failed"] = fail_count 2532 3071 2533 3072 # Touch stream.updated marker after segment processing 2534 3073 if args.segment: ··· 2545 3084 rescan_cmd = ["sol", "indexer", "--rescan"] 2546 3085 if args.verbose: 2547 3086 rescan_cmd.append("--verbose") 2548 - run_queued_command(rescan_cmd, day, timeout=3600) 3087 + _jsonl_log("phase.start", mode=_run_mode, day=day, phase="indexer_rescan") 3088 + _phase_start = time.time() 3089 + rescan_ok = run_queued_command(rescan_cmd, day, timeout=3600) 3090 + _jsonl_log( 3091 + "phase.complete", 3092 + mode=_run_mode, 3093 + day=day, 3094 + phase="indexer_rescan", 3095 + success=rescan_ok, 3096 + duration_ms=int((time.time() - _phase_start) * 1000), 3097 + ) 2549 3098 2550 3099 logging.info("Running post-phase: journal stats") 2551 3100 stats_cmd = ["sol", "journal-stats"] 2552 3101 if args.verbose: 2553 3102 stats_cmd.append("--verbose") 2554 - run_command(stats_cmd, day) 3103 + _jsonl_log("phase.start", mode=_run_mode, day=day, phase="journal_stats") 3104 + _phase_start = time.time() 3105 + stats_ok = run_command(stats_cmd, day) 3106 + _jsonl_log( 3107 + "phase.complete", 3108 + mode=_run_mode, 3109 + day=day, 3110 + phase="journal_stats", 3111 + success=stats_ok, 3112 + duration_ms=int((time.time() - _phase_start) * 1000), 3113 + ) 2555 3114 2556 3115 # Check storage health and emit warnings 2557 3116 try: 2558 - from think.retention import check_storage_health, compute_storage_summary 2559 3117 from think.callosum import callosum_send 3118 + from think.retention import ( 3119 + check_storage_health, 3120 + compute_storage_summary, 3121 + ) 2560 3122 2561 3123 storage_summary = compute_storage_summary() 2562 3124 journal_path = get_journal() ··· 2581 3143 action="/app/settings#storage", 2582 3144 ) 2583 3145 except Exception: 2584 - logging.debug("Storage health check failed in post-phase", exc_info=True) 3146 + logging.debug( 3147 + "Storage health check failed in post-phase", exc_info=True 3148 + ) 2585 3149 2586 3150 # Touch daily.updated marker after daily schedule completion 2587 3151 try: ··· 2640 3204 _clear_status() 2641 3205 _stop_status.set() 2642 3206 status_thread.join(timeout=2) 2643 - _callosum.stop() 3207 + _run_duration_ms = int((time.time() - _run_start_time) * 1000) 3208 + _jsonl_log( 3209 + "run.complete", 3210 + mode=_run_mode, 3211 + day=day, 3212 + ref=_run_ref, 3213 + success=_run_result["success"], 3214 + failed=_run_result["failed"], 3215 + skipped=_jsonl.skip_count if _jsonl else 0, 3216 + duration_ms=_run_duration_ms, 3217 + ) 3218 + if _jsonl: 3219 + _jsonl.close() 3220 + _jsonl = None 3221 + if _callosum: 3222 + _callosum.stop() 2644 3223 2645 3224 2646 3225 if __name__ == "__main__":