personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(dream): summarize pipeline health from JSONL logs

Add think.pipeline_health with summarize_pipeline_day(day) -> dict and
pipeline_status_message(summary) -> dict | None. The summarizer reads
dream health JSONL files written by DreamJSONLWriter (b8c4c2d3, 2efe9a44),
aggregates per-mode run and agent counts, flags four anomaly kinds
(agent_failure, activity_agents_missing, daily_agents_missing soft-miss
on today vs past days, segment_runs_missing as a soft signal), and
classifies status as healthy/warning/stale with stale taking precedence.
Never raises — malformed lines are skipped, missing dirs return an empty
healthy summary. pipeline_status_message returns None when healthy so
consumers can stay silent by default.

+504
+323
tests/test_pipeline_health.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for think.pipeline_health.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + from datetime import datetime 10 + from pathlib import Path 11 + 12 + import pytest 13 + 14 + from think.pipeline_health import pipeline_status_message, summarize_pipeline_day 15 + 16 + 17 + def _write_jsonl(path: Path, events: list[dict]) -> None: 18 + path.parent.mkdir(parents=True, exist_ok=True) 19 + with path.open("w", encoding="utf-8") as handle: 20 + for event in events: 21 + handle.write(json.dumps(event) + "\n") 22 + 23 + 24 + @pytest.fixture 25 + def pipeline_journal(tmp_path, monkeypatch): 26 + journal = tmp_path / "journal" 27 + journal.mkdir() 28 + monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 29 + return journal 30 + 31 + 32 + def test_empty_day_is_healthy(pipeline_journal): 33 + summary = summarize_pipeline_day("20260101") 34 + 35 + assert summary["status"] == "healthy" 36 + assert summary["anomalies"] == [] 37 + assert summary["agents"] == { 38 + "dispatched": 0, 39 + "completed": 0, 40 + "failed": 0, 41 + "skipped": 0, 42 + "failed_list": [], 43 + "failed_list_truncated": False, 44 + } 45 + assert summary["activities"] == { 46 + "detected": 0, 47 + "persisted": 0, 48 + "agents_fired": False, 49 + } 50 + assert all(run == {"count": 0, "duration_ms_total": 0} for run in summary["runs"].values()) 51 + 52 + 53 + def test_missing_health_dir(pipeline_journal): 54 + (pipeline_journal / "20260101").mkdir() 55 + 56 + summary = summarize_pipeline_day("20260101") 57 + 58 + assert summary["status"] == "healthy" 59 + assert summary["anomalies"] == [] 60 + assert summary["runs"]["daily"]["count"] == 0 61 + 62 + 63 + def test_healthy_day_with_all_modes(pipeline_journal): 64 + day = "20990101" 65 + base = pipeline_journal / day / "health" 66 + _write_jsonl( 67 + base / "1_segment_dream.jsonl", 68 + [ 69 + {"event": "run.start", "mode": "segment"}, 70 + {"event": "agent.dispatch", "mode": "segment"}, 71 + {"event": "agent.complete", "mode": "segment"}, 72 + {"event": "run.complete", "mode": "segment", "duration_ms": 10}, 73 + ], 74 + ) 75 + _write_jsonl( 76 + base / "2_daily_dream.jsonl", 77 + [ 78 + {"event": "run.start", "mode": "daily"}, 79 + {"event": "agent.dispatch", "mode": "daily"}, 80 + {"event": "agent.complete", "mode": "daily"}, 81 + {"event": "run.complete", "mode": "daily", "duration_ms": 20}, 82 + ], 83 + ) 84 + _write_jsonl( 85 + base / "3_activity_dream.jsonl", 86 + [ 87 + {"event": "run.start", "mode": "activity"}, 88 + {"event": "agent.dispatch", "mode": "activity"}, 89 + {"event": "agent.complete", "mode": "activity"}, 90 + {"event": "run.complete", "mode": "activity", "duration_ms": 30}, 91 + ], 92 + ) 93 + 94 + summary = summarize_pipeline_day(day) 95 + 96 + assert summary["status"] == "healthy" 97 + assert summary["agents"]["dispatched"] == 3 98 + assert summary["agents"]["completed"] == 3 99 + assert summary["runs"]["segment"] == {"count": 1, "duration_ms_total": 10} 100 + assert summary["runs"]["daily"] == {"count": 1, "duration_ms_total": 20} 101 + assert summary["runs"]["activity"] == {"count": 1, "duration_ms_total": 30} 102 + assert summary["activities"]["agents_fired"] is True 103 + 104 + 105 + def test_agent_failure_promotes_warning(pipeline_journal): 106 + day = "20990102" 107 + _write_jsonl( 108 + pipeline_journal / day / "health" / "1_segment_dream.jsonl", 109 + [ 110 + { 111 + "event": "agent.fail", 112 + "mode": "segment", 113 + "name": "screen", 114 + "agent_id": "a-1", 115 + "state": "timeout", 116 + } 117 + ], 118 + ) 119 + 120 + summary = summarize_pipeline_day(day) 121 + 122 + assert summary["status"] == "warning" 123 + assert summary["agents"]["failed"] == 1 124 + assert summary["agents"]["failed_list"] == [ 125 + {"mode": "segment", "name": "screen", "agent_id": "a-1", "state": "timeout"} 126 + ] 127 + assert summary["anomalies"] == [ 128 + { 129 + "kind": "agent_failure", 130 + "mode": "segment", 131 + "name": "screen", 132 + "agent_id": "a-1", 133 + "state": "timeout", 134 + } 135 + ] 136 + 137 + 138 + def test_failed_list_truncates_at_20(pipeline_journal): 139 + day = "20990103" 140 + events = [ 141 + { 142 + "event": "agent.fail", 143 + "mode": "daily", 144 + "name": f"agent-{idx}", 145 + "agent_id": f"id-{idx}", 146 + "state": "error", 147 + } 148 + for idx in range(25) 149 + ] 150 + _write_jsonl(pipeline_journal / day / "health" / "1_daily_dream.jsonl", events) 151 + 152 + summary = summarize_pipeline_day(day) 153 + 154 + assert summary["agents"]["failed"] == 25 155 + assert len(summary["agents"]["failed_list"]) == 20 156 + assert summary["agents"]["failed_list_truncated"] is True 157 + assert sum(1 for a in summary["anomalies"] if a["kind"] == "agent_failure") == 20 158 + 159 + 160 + def test_activity_detected_without_run_is_stale(pipeline_journal): 161 + day = "20990104" 162 + _write_jsonl( 163 + pipeline_journal / day / "health" / "1_segment_dream.jsonl", 164 + [{"event": "activity.detected", "mode": "segment"}], 165 + ) 166 + 167 + summary = summarize_pipeline_day(day) 168 + 169 + assert summary["status"] == "stale" 170 + assert {"kind": "activity_agents_missing"} in summary["anomalies"] 171 + 172 + 173 + def test_past_day_without_daily_run_is_stale(pipeline_journal, monkeypatch): 174 + day = "20200101" 175 + _write_jsonl( 176 + pipeline_journal / day / "health" / "1_segment_dream.jsonl", 177 + [{"event": "run.start", "mode": "segment"}], 178 + ) 179 + monkeypatch.setattr( 180 + "think.pipeline_health._now", lambda: datetime(2020, 1, 2, 12, 0, 0) 181 + ) 182 + 183 + summary = summarize_pipeline_day(day) 184 + 185 + assert summary["status"] == "stale" 186 + assert {"kind": "daily_agents_missing"} in summary["anomalies"] 187 + 188 + 189 + def test_today_before_23h_no_daily_run_is_healthy(pipeline_journal, monkeypatch): 190 + current = datetime(2026, 4, 16, 12, 0, 0) 191 + monkeypatch.setattr("think.pipeline_health._now", lambda: current) 192 + (pipeline_journal / current.strftime("%Y%m%d") / "health").mkdir(parents=True) 193 + 194 + summary = summarize_pipeline_day(current.strftime("%Y%m%d")) 195 + 196 + assert summary["status"] == "healthy" 197 + assert {"kind": "daily_agents_missing"} not in summary["anomalies"] 198 + 199 + 200 + def test_today_after_23h_no_daily_run_is_stale(pipeline_journal, monkeypatch): 201 + current = datetime(2026, 4, 16, 23, 30, 0) 202 + monkeypatch.setattr("think.pipeline_health._now", lambda: current) 203 + (pipeline_journal / current.strftime("%Y%m%d") / "health").mkdir(parents=True) 204 + 205 + summary = summarize_pipeline_day(current.strftime("%Y%m%d")) 206 + 207 + assert summary["status"] == "stale" 208 + assert {"kind": "daily_agents_missing"} in summary["anomalies"] 209 + 210 + 211 + def test_segment_runs_missing_is_soft(pipeline_journal, monkeypatch): 212 + day = "20990105" 213 + (pipeline_journal / day / "health").mkdir(parents=True) 214 + monkeypatch.setattr( 215 + "think.pipeline_health.iter_segments", 216 + lambda value: [("default", "120000_300", Path("/tmp/fake"))], 217 + ) 218 + 219 + summary = summarize_pipeline_day(day) 220 + 221 + assert summary["status"] == "healthy" 222 + assert {"kind": "segment_runs_missing"} in summary["anomalies"] 223 + 224 + 225 + def test_invalid_day_returns_healthy_empty(pipeline_journal): 226 + summary = summarize_pipeline_day("not-a-date") 227 + 228 + assert summary["status"] == "healthy" 229 + assert summary["anomalies"] == [] 230 + assert summary["agents"] == { 231 + "dispatched": 0, 232 + "completed": 0, 233 + "failed": 0, 234 + "skipped": 0, 235 + "failed_list": [], 236 + "failed_list_truncated": False, 237 + } 238 + 239 + 240 + def test_malformed_json_lines_skipped(pipeline_journal): 241 + day = "20990106" 242 + path = pipeline_journal / day / "health" / "1_segment_dream.jsonl" 243 + path.parent.mkdir(parents=True, exist_ok=True) 244 + path.write_text( 245 + json.dumps({"event": "run.start", "mode": "segment"}) 246 + + "\nnot json at all\n" 247 + + json.dumps({"event": "agent.dispatch", "mode": "segment"}) 248 + + "\n", 249 + encoding="utf-8", 250 + ) 251 + 252 + summary = summarize_pipeline_day(day) 253 + 254 + assert summary["runs"]["segment"]["count"] == 1 255 + assert summary["agents"]["dispatched"] == 1 256 + 257 + 258 + @pytest.mark.parametrize( 259 + ("summary", "expected"), 260 + [ 261 + ({"status": "healthy", "anomalies": [], "agents": {"failed": 0}, "day": "20260101"}, None), 262 + ( 263 + { 264 + "status": "stale", 265 + "anomalies": [ 266 + {"kind": "activity_agents_missing"}, 267 + {"kind": "daily_agents_missing"}, 268 + {"kind": "agent_failure"}, 269 + ], 270 + "agents": {"failed": 3}, 271 + "day": "20260101", 272 + }, 273 + { 274 + "status": "stale", 275 + "message": "Activity agents didn't run — persisted activities untouched.", 276 + }, 277 + ), 278 + ( 279 + { 280 + "status": "stale", 281 + "anomalies": [ 282 + {"kind": "daily_agents_missing"}, 283 + {"kind": "agent_failure"}, 284 + ], 285 + "agents": {"failed": 2}, 286 + "day": "20260102", 287 + }, 288 + { 289 + "status": "stale", 290 + "message": "Daily dream didn't run for 20260102.", 291 + }, 292 + ), 293 + ( 294 + { 295 + "status": "warning", 296 + "anomalies": [{"kind": "agent_failure"}], 297 + "agents": {"failed": 1}, 298 + "day": "20260101", 299 + }, 300 + {"status": "warning", "message": "1 dream agent failed."}, 301 + ), 302 + ( 303 + { 304 + "status": "warning", 305 + "anomalies": [{"kind": "agent_failure"}] * 3, 306 + "agents": {"failed": 3}, 307 + "day": "20260101", 308 + }, 309 + {"status": "warning", "message": "3 dream agents failed."}, 310 + ), 311 + ( 312 + { 313 + "status": "healthy", 314 + "anomalies": [{"kind": "segment_runs_missing"}], 315 + "agents": {"failed": 0}, 316 + "day": "20260101", 317 + }, 318 + None, 319 + ), 320 + ], 321 + ) 322 + def test_status_message_priorities(summary, expected): 323 + assert pipeline_status_message(summary) == expected
+181
think/pipeline_health.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Summarize dream pipeline health from daily JSONL logs.""" 5 + 6 + from __future__ import annotations 7 + 8 + import json 9 + import logging 10 + from datetime import datetime 11 + 12 + from think.utils import day_path, iter_segments, now_ms 13 + 14 + logger = logging.getLogger(__name__) 15 + 16 + # Test indirection: tests monkeypatch this for time-sensitive branches. 17 + _now = datetime.now 18 + 19 + _MODES = ("segment", "daily", "activity", "weekly", "flush") 20 + _FAILED_LIST_CAP = 20 21 + 22 + 23 + def summarize_pipeline_day(day: str) -> dict: 24 + """Return a day-level summary of dream pipeline health.""" 25 + summary = { 26 + "day": day, 27 + "generated_at": now_ms(), 28 + "status": "healthy", 29 + "anomalies": [], 30 + "runs": { 31 + mode: {"count": 0, "duration_ms_total": 0} 32 + for mode in _MODES 33 + }, 34 + "agents": { 35 + "dispatched": 0, 36 + "completed": 0, 37 + "failed": 0, 38 + "skipped": 0, 39 + "failed_list": [], 40 + "failed_list_truncated": False, 41 + }, 42 + "activities": { 43 + "detected": 0, 44 + "persisted": 0, 45 + "agents_fired": False, 46 + }, 47 + } 48 + 49 + try: 50 + health_dir = day_path(day, create=False) / "health" 51 + if not health_dir.is_dir(): 52 + return summary 53 + 54 + for path in sorted(health_dir.glob("*.jsonl")): 55 + mode = None 56 + for candidate in _MODES: 57 + if path.name.endswith(f"_{candidate}_dream.jsonl"): 58 + mode = candidate 59 + break 60 + if mode is None: 61 + logger.debug("pipeline_health: skipping unrecognized file %s", path) 62 + continue 63 + 64 + summary["runs"][mode]["count"] += 1 65 + 66 + with path.open(encoding="utf-8") as handle: 67 + for raw_line in handle: 68 + line = raw_line.strip() 69 + if not line: 70 + continue 71 + try: 72 + rec = json.loads(line) 73 + except json.JSONDecodeError: 74 + logger.debug("malformed jsonl line in %s", path) 75 + continue 76 + 77 + if not isinstance(rec, dict) or "event" not in rec: 78 + logger.debug("pipeline_health: skipping invalid record in %s", path) 79 + continue 80 + 81 + event = rec["event"] 82 + if event == "agent.dispatch": 83 + summary["agents"]["dispatched"] += 1 84 + elif event == "agent.complete": 85 + summary["agents"]["completed"] += 1 86 + elif event == "agent.fail": 87 + summary["agents"]["failed"] += 1 88 + if len(summary["agents"]["failed_list"]) < _FAILED_LIST_CAP: 89 + summary["agents"]["failed_list"].append( 90 + { 91 + "mode": rec.get("mode") or mode, 92 + "name": rec.get("name"), 93 + "agent_id": rec.get("agent_id"), 94 + "state": rec.get("state"), 95 + } 96 + ) 97 + else: 98 + summary["agents"]["failed_list_truncated"] = True 99 + elif event == "agent.skip": 100 + summary["agents"]["skipped"] += 1 101 + elif event == "activity.detected": 102 + summary["activities"]["detected"] += 1 103 + elif event == "activity.persisted": 104 + summary["activities"]["persisted"] += 1 105 + elif event == "run.complete": 106 + try: 107 + duration_ms = int(rec.get("duration_ms", 0)) 108 + except (TypeError, ValueError): 109 + duration_ms = 0 110 + summary["runs"][mode]["duration_ms_total"] += duration_ms 111 + elif event == "run.start" and (rec.get("mode") or mode) == "activity": 112 + summary["activities"]["agents_fired"] = True 113 + except Exception: 114 + logger.warning( 115 + "pipeline_health: unexpected error summarizing %s", 116 + day, 117 + exc_info=True, 118 + ) 119 + return summary 120 + 121 + for failure in summary["agents"]["failed_list"]: 122 + summary["anomalies"].append({"kind": "agent_failure", **failure}) 123 + 124 + if summary["activities"]["detected"] > 0 and summary["runs"]["activity"]["count"] == 0: 125 + summary["anomalies"].append({"kind": "activity_agents_missing"}) 126 + 127 + current = _now() 128 + today = current.strftime("%Y%m%d") 129 + if day == today: 130 + if current.hour >= 23 and summary["runs"]["daily"]["count"] == 0: 131 + summary["anomalies"].append({"kind": "daily_agents_missing"}) 132 + elif day < today and summary["runs"]["daily"]["count"] == 0: 133 + summary["anomalies"].append({"kind": "daily_agents_missing"}) 134 + 135 + if summary["runs"]["segment"]["count"] == 0: 136 + try: 137 + segs = list(iter_segments(day)) 138 + except Exception: 139 + segs = [] 140 + if len(segs) >= 1: 141 + summary["anomalies"].append({"kind": "segment_runs_missing"}) 142 + 143 + has_stale = any( 144 + anomaly["kind"] in {"activity_agents_missing", "daily_agents_missing"} 145 + for anomaly in summary["anomalies"] 146 + ) 147 + has_failure = any( 148 + anomaly["kind"] == "agent_failure" for anomaly in summary["anomalies"] 149 + ) 150 + if has_stale: 151 + summary["status"] = "stale" 152 + elif has_failure: 153 + summary["status"] = "warning" 154 + 155 + return summary 156 + 157 + 158 + def pipeline_status_message(summary: dict) -> dict | None: 159 + """Return a short user-facing message for non-healthy summaries.""" 160 + if summary.get("status") == "healthy": 161 + return None 162 + 163 + anomalies = summary.get("anomalies", []) 164 + if any(anomaly.get("kind") == "activity_agents_missing" for anomaly in anomalies): 165 + return { 166 + "status": "stale", 167 + "message": "Activity agents didn't run — persisted activities untouched.", 168 + } 169 + if any(anomaly.get("kind") == "daily_agents_missing" for anomaly in anomalies): 170 + return { 171 + "status": "stale", 172 + "message": f"Daily dream didn't run for {summary['day']}.", 173 + } 174 + if any(anomaly.get("kind") == "agent_failure" for anomaly in anomalies): 175 + count = summary.get("agents", {}).get("failed", 0) 176 + plural = "s" if count != 1 else "" 177 + return { 178 + "status": "warning", 179 + "message": f"{count} dream agent{plural} failed.", 180 + } 181 + return None