personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add clock-aligned task scheduler for supervisor

Introduces think/scheduler.py — a flat module that reads schedule
definitions from config/schedules.json and submits tasks via Callosum
at hour and day boundaries. State persists to health/scheduler.json
across restarts. Includes `sol schedule` CLI for viewing status.

Integrates with supervisor tick loop (--no-schedule to disable).
Adds 26 tests covering config loading, state persistence, boundary
detection, task submission, and CLI output.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

+933 -1
+2
sol.py
··· 43 43 "planner": "think.planner", 44 44 "indexer": "think.indexer", 45 45 "supervisor": "think.supervisor", 46 + "schedule": "think.scheduler", 46 47 "detect-created": "think.detect_created", 47 48 "top": "think.top", 48 49 "logs": "think.logs_cli", ··· 95 96 "planner", 96 97 "indexer", 97 98 "supervisor", 99 + "schedule", 98 100 "top", 99 101 "logs", 100 102 "callosum",
+15
tests/fixtures/journal/config/schedules.json
··· 1 + { 2 + "test:echo": { 3 + "cmd": ["sol", "echo", "-v"], 4 + "every": "hourly" 5 + }, 6 + "test:daily": { 7 + "cmd": ["sol", "dream", "-v"], 8 + "every": "daily" 9 + }, 10 + "test:disabled": { 11 + "cmd": ["sol", "noop"], 12 + "every": "hourly", 13 + "enabled": false 14 + } 15 + }
+507
tests/test_scheduler.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for think.scheduler — clock-aligned task scheduler.""" 5 + 6 + import json 7 + import time 8 + from contextlib import contextmanager 9 + from datetime import datetime 10 + from pathlib import Path 11 + from unittest.mock import Mock 12 + 13 + import pytest 14 + 15 + import think.scheduler 16 + 17 + 18 + @contextmanager 19 + def _fake_now(dt: datetime): 20 + """Temporarily replace think.scheduler.datetime with a fake that returns dt.""" 21 + 22 + class _FakeDatetime: 23 + min = datetime.min 24 + 25 + @staticmethod 26 + def now(): 27 + return dt 28 + 29 + @staticmethod 30 + def fromtimestamp(ts): 31 + return datetime.fromtimestamp(ts) 32 + 33 + @staticmethod 34 + def combine(*a, **k): 35 + return datetime.combine(*a, **k) 36 + 37 + think.scheduler.datetime = _FakeDatetime 38 + try: 39 + yield 40 + finally: 41 + think.scheduler.datetime = datetime 42 + 43 + 44 + @pytest.fixture(autouse=True) 45 + def reset_scheduler_state(): 46 + """Reset scheduler module state between tests.""" 47 + import think.scheduler as mod 48 + 49 + mod._entries = {} 50 + mod._state = {} 51 + mod._callosum = None 52 + mod._last_hour = None 53 + mod._last_day = None 54 + yield 55 + mod._entries = {} 56 + mod._state = {} 57 + mod._callosum = None 58 + mod._last_hour = None 59 + mod._last_day = None 60 + 61 + 62 + @pytest.fixture 63 + def journal_path(tmp_path, monkeypatch): 64 + """Create a temp journal with config/ and health/ dirs.""" 65 + monkeypatch.setenv("JOURNAL_PATH", str(tmp_path)) 66 + (tmp_path / "config").mkdir() 67 + (tmp_path / "health").mkdir() 68 + return tmp_path 69 + 70 + 71 + def _write_config(journal: Path, config: dict) -> None: 72 + with open(journal / "config" / "schedules.json", "w") as f: 73 + json.dump(config, f) 74 + 75 + 76 + def _write_state(journal: Path, state: dict) -> None: 77 + with open(journal / "health" / "scheduler.json", "w") as f: 78 + json.dump(state, f) 79 + 80 + 81 + def _read_state(journal: Path) -> dict: 82 + with open(journal / "health" / "scheduler.json") as f: 83 + return json.load(f) 84 + 85 + 86 + # --------------------------------------------------------------------------- 87 + # load_config 88 + # --------------------------------------------------------------------------- 89 + 90 + 91 + class TestLoadConfig: 92 + def test_valid_config(self, journal_path): 93 + _write_config( 94 + journal_path, 95 + { 96 + "sync:plaud": { 97 + "cmd": ["sol", "import", "--sync", "plaud"], 98 + "every": "hourly", 99 + }, 100 + }, 101 + ) 102 + from think.scheduler import load_config 103 + 104 + entries = load_config() 105 + assert "sync:plaud" in entries 106 + assert entries["sync:plaud"]["every"] == "hourly" 107 + assert entries["sync:plaud"]["cmd"] == ["sol", "import", "--sync", "plaud"] 108 + 109 + def test_missing_file_returns_empty(self, journal_path): 110 + from think.scheduler import load_config 111 + 112 + assert load_config() == {} 113 + 114 + def test_invalid_json_returns_empty(self, journal_path): 115 + (journal_path / "config" / "schedules.json").write_text("not json{") 116 + from think.scheduler import load_config 117 + 118 + assert load_config() == {} 119 + 120 + def test_unknown_every_skipped(self, journal_path): 121 + _write_config( 122 + journal_path, 123 + { 124 + "bad": {"cmd": ["sol", "noop"], "every": "weekly"}, 125 + }, 126 + ) 127 + from think.scheduler import load_config 128 + 129 + assert load_config() == {} 130 + 131 + def test_missing_cmd_skipped(self, journal_path): 132 + _write_config( 133 + journal_path, 134 + { 135 + "bad": {"every": "hourly"}, 136 + }, 137 + ) 138 + from think.scheduler import load_config 139 + 140 + assert load_config() == {} 141 + 142 + def test_disabled_entry_excluded(self, journal_path): 143 + _write_config( 144 + journal_path, 145 + { 146 + "off": {"cmd": ["sol", "noop"], "every": "hourly", "enabled": False}, 147 + }, 148 + ) 149 + from think.scheduler import load_config 150 + 151 + assert load_config() == {} 152 + 153 + 154 + # --------------------------------------------------------------------------- 155 + # load_state / save_state 156 + # --------------------------------------------------------------------------- 157 + 158 + 159 + class TestState: 160 + def test_round_trip(self, journal_path): 161 + import think.scheduler as mod 162 + 163 + mod._state = {"sync:plaud": {"last_run": 1700000000.0}} 164 + mod.save_state() 165 + 166 + loaded = mod.load_state() 167 + assert loaded["sync:plaud"]["last_run"] == 1700000000.0 168 + 169 + def test_missing_file_returns_empty(self, journal_path): 170 + from think.scheduler import load_state 171 + 172 + assert load_state() == {} 173 + 174 + def test_atomic_write_no_partial(self, journal_path): 175 + """State file shouldn't have leftover tmp files on success.""" 176 + import think.scheduler as mod 177 + 178 + mod._state = {"a": {"last_run": 1.0}} 179 + mod.save_state() 180 + 181 + tmps = list((journal_path / "health").glob(".scheduler_*")) 182 + assert tmps == [] 183 + 184 + 185 + # --------------------------------------------------------------------------- 186 + # _is_due 187 + # --------------------------------------------------------------------------- 188 + 189 + 190 + class TestIsDue: 191 + def test_no_state_is_due(self): 192 + from think.scheduler import _is_due 193 + 194 + entry = {"cmd": ["sol", "x"], "every": "hourly"} 195 + assert _is_due(entry, None, datetime(2026, 2, 17, 14, 30)) is True 196 + 197 + def test_hourly_same_hour_not_due(self): 198 + from think.scheduler import _is_due 199 + 200 + entry = {"cmd": ["sol", "x"], "every": "hourly"} 201 + # Last run at 14:05, now is 14:30 — same hour 202 + state = {"last_run": datetime(2026, 2, 17, 14, 5).timestamp()} 203 + assert _is_due(entry, state, datetime(2026, 2, 17, 14, 30)) is False 204 + 205 + def test_hourly_new_hour_is_due(self): 206 + from think.scheduler import _is_due 207 + 208 + entry = {"cmd": ["sol", "x"], "every": "hourly"} 209 + # Last run at 13:45, now is 14:01 — new hour 210 + state = {"last_run": datetime(2026, 2, 17, 13, 45).timestamp()} 211 + assert _is_due(entry, state, datetime(2026, 2, 17, 14, 1)) is True 212 + 213 + def test_daily_same_day_not_due(self): 214 + from think.scheduler import _is_due 215 + 216 + entry = {"cmd": ["sol", "x"], "every": "daily"} 217 + # Last run today at 00:05, now is 14:00 218 + state = {"last_run": datetime(2026, 2, 17, 0, 5).timestamp()} 219 + assert _is_due(entry, state, datetime(2026, 2, 17, 14, 0)) is False 220 + 221 + def test_daily_new_day_is_due(self): 222 + from think.scheduler import _is_due 223 + 224 + entry = {"cmd": ["sol", "x"], "every": "daily"} 225 + # Last run yesterday at 23:50, now is 00:01 226 + state = {"last_run": datetime(2026, 2, 16, 23, 50).timestamp()} 227 + assert _is_due(entry, state, datetime(2026, 2, 17, 0, 1)) is True 228 + 229 + 230 + # --------------------------------------------------------------------------- 231 + # init 232 + # --------------------------------------------------------------------------- 233 + 234 + 235 + class TestInit: 236 + def test_loads_config_and_state(self, journal_path): 237 + _write_config( 238 + journal_path, 239 + { 240 + "a": {"cmd": ["sol", "x"], "every": "hourly"}, 241 + }, 242 + ) 243 + _write_state(journal_path, {"a": {"last_run": 1700000000.0}}) 244 + 245 + import think.scheduler as mod 246 + 247 + callosum = Mock() 248 + mod.init(callosum) 249 + 250 + assert "a" in mod._entries 251 + assert mod._state["a"]["last_run"] == 1700000000.0 252 + assert mod._callosum is callosum 253 + assert mod._last_hour is not None 254 + assert mod._last_day is not None 255 + 256 + def test_no_config_file(self, journal_path): 257 + import think.scheduler as mod 258 + 259 + mod.init(Mock()) 260 + assert mod._entries == {} 261 + 262 + 263 + # --------------------------------------------------------------------------- 264 + # check 265 + # --------------------------------------------------------------------------- 266 + 267 + 268 + class TestCheck: 269 + def test_pre_init_returns_immediately(self, journal_path): 270 + """check() does nothing when init() hasn't been called.""" 271 + import think.scheduler as mod 272 + 273 + callosum = Mock() 274 + callosum.emit = Mock(return_value=True) 275 + mod._callosum = callosum 276 + 277 + mod.check() 278 + callosum.emit.assert_not_called() 279 + 280 + def test_no_boundary_no_io(self, journal_path): 281 + """When no boundary has crossed, check() does nothing.""" 282 + import think.scheduler as mod 283 + 284 + callosum = Mock() 285 + callosum.emit = Mock(return_value=True) 286 + now = datetime(2026, 2, 17, 14, 30) 287 + 288 + _write_config( 289 + journal_path, 290 + { 291 + "a": {"cmd": ["sol", "x"], "every": "hourly"}, 292 + }, 293 + ) 294 + 295 + mod.init(callosum) 296 + # Set boundaries to current — no crossing 297 + mod._last_hour = mod._hour_mark(now) 298 + mod._last_day = now.date() 299 + 300 + with _fake_now(now): 301 + mod.check() 302 + 303 + callosum.emit.assert_not_called() 304 + 305 + def test_hourly_boundary_submits(self, journal_path): 306 + """Crossing an hour boundary submits due hourly tasks.""" 307 + import think.scheduler as mod 308 + 309 + callosum = Mock() 310 + callosum.emit = Mock(return_value=True) 311 + 312 + _write_config( 313 + journal_path, 314 + { 315 + "a": {"cmd": ["sol", "test-task", "-v"], "every": "hourly"}, 316 + }, 317 + ) 318 + 319 + mod.init(callosum) 320 + 321 + # Simulate: last check was at 13:59, now it's 14:01 322 + mod._last_hour = datetime(2026, 2, 17, 13, 0) 323 + mod._last_day = datetime(2026, 2, 17).date() 324 + # No prior state → task is due 325 + 326 + with _fake_now(datetime(2026, 2, 17, 14, 1)): 327 + mod.check() 328 + 329 + callosum.emit.assert_called_once() 330 + call_kwargs = callosum.emit.call_args 331 + assert call_kwargs[0][0] == "supervisor" 332 + assert call_kwargs[0][1] == "request" 333 + assert call_kwargs[1]["cmd"] == ["sol", "test-task", "-v"] 334 + assert call_kwargs[1]["ref"].startswith("sched:a:") 335 + 336 + # State should be updated 337 + assert "a" in mod._state 338 + assert mod._state["a"]["last_run"] > 0 339 + 340 + # State file should be written 341 + saved = _read_state(journal_path) 342 + assert "a" in saved 343 + 344 + def test_daily_boundary_submits(self, journal_path): 345 + """Crossing a day boundary submits due daily tasks.""" 346 + import think.scheduler as mod 347 + 348 + callosum = Mock() 349 + callosum.emit = Mock(return_value=True) 350 + 351 + _write_config( 352 + journal_path, 353 + { 354 + "d": {"cmd": ["sol", "daily-thing"], "every": "daily"}, 355 + }, 356 + ) 357 + 358 + mod.init(callosum) 359 + 360 + # Simulate: last check was yesterday 23:59, now it's 00:01 361 + mod._last_hour = datetime(2026, 2, 16, 23, 0) 362 + mod._last_day = datetime(2026, 2, 16).date() 363 + 364 + with _fake_now(datetime(2026, 2, 17, 0, 1)): 365 + mod.check() 366 + 367 + callosum.emit.assert_called_once() 368 + assert callosum.emit.call_args[1]["cmd"] == ["sol", "daily-thing"] 369 + 370 + def test_submits_on_new_hour_after_previous_run(self, journal_path): 371 + """Task ran in hour 14; crossing to hour 15 triggers resubmission.""" 372 + import think.scheduler as mod 373 + 374 + callosum = Mock() 375 + callosum.emit = Mock(return_value=True) 376 + 377 + _write_config( 378 + journal_path, 379 + { 380 + "a": {"cmd": ["sol", "x"], "every": "hourly"}, 381 + }, 382 + ) 383 + # Already ran at 14:02 384 + _write_state( 385 + journal_path, 386 + { 387 + "a": {"last_run": datetime(2026, 2, 17, 14, 2).timestamp()}, 388 + }, 389 + ) 390 + 391 + mod.init(callosum) 392 + mod._last_hour = datetime(2026, 2, 17, 14, 0) 393 + mod._last_day = datetime(2026, 2, 17).date() 394 + 395 + # Cross to hour 15 396 + with _fake_now(datetime(2026, 2, 17, 15, 0, 1)): 397 + mod.check() 398 + 399 + # Should submit because we crossed to hour 15 and last_run was in hour 14 400 + callosum.emit.assert_called_once() 401 + 402 + def test_config_reloaded_on_boundary(self, journal_path): 403 + """Config file changes are picked up when a boundary is crossed.""" 404 + import think.scheduler as mod 405 + 406 + callosum = Mock() 407 + callosum.emit = Mock(return_value=True) 408 + 409 + # Start with empty config 410 + _write_config(journal_path, {}) 411 + mod.init(callosum) 412 + mod._last_hour = datetime(2026, 2, 17, 13, 0) 413 + mod._last_day = datetime(2026, 2, 17).date() 414 + 415 + # Now write a real config 416 + _write_config( 417 + journal_path, 418 + { 419 + "new": {"cmd": ["sol", "new-task"], "every": "hourly"}, 420 + }, 421 + ) 422 + 423 + with _fake_now(datetime(2026, 2, 17, 14, 1)): 424 + mod.check() 425 + 426 + callosum.emit.assert_called_once() 427 + assert callosum.emit.call_args[1]["cmd"] == ["sol", "new-task"] 428 + 429 + def test_emit_failure_no_state_update(self, journal_path): 430 + """If emit fails, last_run should not be updated.""" 431 + import think.scheduler as mod 432 + 433 + callosum = Mock() 434 + callosum.emit = Mock(return_value=False) 435 + 436 + _write_config( 437 + journal_path, 438 + { 439 + "a": {"cmd": ["sol", "x"], "every": "hourly"}, 440 + }, 441 + ) 442 + 443 + mod.init(callosum) 444 + mod._last_hour = datetime(2026, 2, 17, 13, 0) 445 + mod._last_day = datetime(2026, 2, 17).date() 446 + 447 + with _fake_now(datetime(2026, 2, 17, 14, 1)): 448 + mod.check() 449 + 450 + assert mod._state.get("a") is None 451 + 452 + 453 + # --------------------------------------------------------------------------- 454 + # collect_status 455 + # --------------------------------------------------------------------------- 456 + 457 + 458 + class TestCollectStatus: 459 + def test_returns_entries(self, journal_path): 460 + import think.scheduler as mod 461 + 462 + mod._entries = { 463 + "a": {"cmd": ["sol", "x"], "every": "hourly"}, 464 + } 465 + mod._state = {"a": {"last_run": time.time()}} 466 + 467 + status = mod.collect_status() 468 + assert len(status) == 1 469 + assert status[0]["name"] == "a" 470 + assert status[0]["every"] == "hourly" 471 + assert "last_run" in status[0] 472 + assert "due" in status[0] 473 + 474 + 475 + # --------------------------------------------------------------------------- 476 + # CLI main() 477 + # --------------------------------------------------------------------------- 478 + 479 + 480 + class TestCLI: 481 + def test_no_config_prints_message(self, journal_path, capsys, monkeypatch): 482 + monkeypatch.setattr("sys.argv", ["sol schedule"]) 483 + from think.scheduler import main 484 + 485 + main() 486 + out = capsys.readouterr().out 487 + assert "No schedules configured" in out 488 + 489 + def test_with_config_prints_table(self, journal_path, capsys, monkeypatch): 490 + monkeypatch.setattr("sys.argv", ["sol schedule"]) 491 + _write_config( 492 + journal_path, 493 + { 494 + "sync:plaud": { 495 + "cmd": ["sol", "import", "--sync", "plaud"], 496 + "every": "hourly", 497 + }, 498 + }, 499 + ) 500 + 501 + from think.scheduler import main 502 + 503 + main() 504 + out = capsys.readouterr().out 505 + assert "sync:plaud" in out 506 + assert "hourly" in out 507 + assert "NAME" in out
+1 -1
tests/test_supervisor.py
··· 298 298 monkeypatch.setenv("JOURNAL_PATH", "/test/journal") 299 299 300 300 with caplog.at_level(logging.INFO): 301 - await mod.supervise(threshold=1, interval=1, procs=[]) 301 + await mod.supervise(threshold=1, interval=1, schedule=False, procs=[]) 302 302 303 303 messages = [record.getMessage() for record in caplog.records] 304 304 assert "hear heartbeat recovered" in messages
+380
think/scheduler.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Clock-aligned task scheduler for the supervisor. 5 + 6 + Reads schedule definitions from config/schedules.json and submits tasks 7 + via Callosum at hour and day boundaries. State (last-run times) persists 8 + to health/scheduler.json across restarts. 9 + 10 + Runtime functions (init, check) are used by the supervisor. 11 + The main() function provides the ``sol schedule`` CLI. 12 + """ 13 + 14 + from __future__ import annotations 15 + 16 + import argparse 17 + import json 18 + import logging 19 + import tempfile 20 + import time 21 + from datetime import date, datetime, timedelta 22 + from pathlib import Path 23 + from typing import Any 24 + 25 + from think.utils import get_journal, now_ms, setup_cli 26 + 27 + logger = logging.getLogger(__name__) 28 + 29 + # Valid schedule intervals 30 + INTERVALS = {"hourly", "daily"} 31 + 32 + # --------------------------------------------------------------------------- 33 + # Module state (populated by init(), used by check()) 34 + # --------------------------------------------------------------------------- 35 + _entries: dict[str, dict[str, Any]] = {} 36 + _state: dict[str, dict[str, Any]] = {} 37 + _callosum: Any = None # CallosumConnection 38 + _last_hour: datetime | None = None 39 + _last_day: date | None = None 40 + 41 + 42 + # --------------------------------------------------------------------------- 43 + # Config + state I/O 44 + # --------------------------------------------------------------------------- 45 + 46 + 47 + def load_config() -> dict[str, dict[str, Any]]: 48 + """Read config/schedules.json and return validated entries.""" 49 + config_path = Path(get_journal()) / "config" / "schedules.json" 50 + if not config_path.exists(): 51 + return {} 52 + 53 + try: 54 + with open(config_path, "r", encoding="utf-8") as f: 55 + raw = json.load(f) 56 + except (json.JSONDecodeError, OSError) as exc: 57 + logger.warning("Failed to load schedules config: %s", exc) 58 + return {} 59 + 60 + if not isinstance(raw, dict): 61 + logger.warning( 62 + "schedules.json must be a JSON object, got %s", type(raw).__name__ 63 + ) 64 + return {} 65 + 66 + entries: dict[str, dict[str, Any]] = {} 67 + for name, entry in raw.items(): 68 + if not isinstance(entry, dict): 69 + logger.warning("Schedule '%s': expected object, skipping", name) 70 + continue 71 + 72 + cmd = entry.get("cmd") 73 + if not cmd or not isinstance(cmd, list): 74 + logger.warning("Schedule '%s': missing or invalid 'cmd', skipping", name) 75 + continue 76 + 77 + every = entry.get("every") 78 + if every not in INTERVALS: 79 + logger.warning( 80 + "Schedule '%s': unknown interval '%s' (expected %s), skipping", 81 + name, 82 + every, 83 + "/".join(sorted(INTERVALS)), 84 + ) 85 + continue 86 + 87 + if not entry.get("enabled", True): 88 + continue 89 + 90 + entries[name] = {"cmd": cmd, "every": every} 91 + 92 + return entries 93 + 94 + 95 + def load_state() -> dict[str, dict[str, Any]]: 96 + """Read health/scheduler.json.""" 97 + state_path = Path(get_journal()) / "health" / "scheduler.json" 98 + if not state_path.exists(): 99 + return {} 100 + 101 + try: 102 + with open(state_path, "r", encoding="utf-8") as f: 103 + return json.load(f) 104 + except (json.JSONDecodeError, OSError) as exc: 105 + logger.warning("Failed to load scheduler state: %s", exc) 106 + return {} 107 + 108 + 109 + def save_state() -> None: 110 + """Persist _state to health/scheduler.json atomically.""" 111 + health_dir = Path(get_journal()) / "health" 112 + health_dir.mkdir(parents=True, exist_ok=True) 113 + state_path = health_dir / "scheduler.json" 114 + 115 + fd, tmp_path = tempfile.mkstemp(dir=health_dir, suffix=".tmp", prefix=".scheduler_") 116 + tmp_file = Path(tmp_path) 117 + try: 118 + with open(fd, "w", encoding="utf-8") as f: 119 + json.dump(_state, f, indent=2) 120 + tmp_file.replace(state_path) 121 + except BaseException: 122 + tmp_file.unlink(missing_ok=True) 123 + raise 124 + 125 + 126 + # --------------------------------------------------------------------------- 127 + # Boundary helpers 128 + # --------------------------------------------------------------------------- 129 + 130 + 131 + def _hour_mark(dt: datetime) -> datetime: 132 + """Truncate datetime to the start of its hour.""" 133 + return dt.replace(minute=0, second=0, microsecond=0) 134 + 135 + 136 + def _is_due(entry: dict, state_entry: dict | None, now: datetime) -> bool: 137 + """Check if an entry is due based on its interval and last_run.""" 138 + last_run = (state_entry or {}).get("last_run") 139 + if last_run is None: 140 + return True 141 + 142 + try: 143 + last_dt = datetime.fromtimestamp(last_run) 144 + except (OSError, ValueError): 145 + return True 146 + 147 + every = entry["every"] 148 + if every == "hourly": 149 + return last_dt < _hour_mark(now) 150 + if every == "daily": 151 + return last_dt.date() < now.date() 152 + return False 153 + 154 + 155 + # --------------------------------------------------------------------------- 156 + # Runtime API (called by supervisor) 157 + # --------------------------------------------------------------------------- 158 + 159 + 160 + def init(callosum: Any) -> None: 161 + """Initialize scheduler with a Callosum connection. Load config and state.""" 162 + global _entries, _state, _callosum, _last_hour, _last_day 163 + 164 + _callosum = callosum 165 + _entries = load_config() 166 + _state = load_state() 167 + 168 + now = datetime.now() 169 + _last_hour = _hour_mark(now) 170 + _last_day = now.date() 171 + 172 + if _entries: 173 + logger.info( 174 + "Scheduler initialized with %d schedule(s): %s", 175 + len(_entries), 176 + ", ".join(sorted(_entries)), 177 + ) 178 + else: 179 + logger.info("Scheduler initialized (no schedules configured)") 180 + 181 + 182 + def check() -> None: 183 + """Check for clock boundaries and submit due tasks. 184 + 185 + Called each supervisor tick (~1s). Does nothing unless an hour or day 186 + boundary has been crossed since the last check. 187 + """ 188 + global _entries, _last_hour, _last_day 189 + 190 + if _last_hour is None: 191 + return 192 + 193 + now = datetime.now() 194 + current_hour = _hour_mark(now) 195 + current_day = now.date() 196 + 197 + hour_changed = current_hour != _last_hour 198 + day_changed = current_day != _last_day 199 + 200 + if not hour_changed and not day_changed: 201 + return 202 + 203 + # Boundary crossed — reload config for freshest definitions 204 + _entries = load_config() 205 + _last_hour = current_hour 206 + _last_day = current_day 207 + 208 + if not _entries: 209 + return 210 + 211 + submitted = False 212 + for name, entry in _entries.items(): 213 + every = entry["every"] 214 + 215 + # Only check entries matching the boundary that changed 216 + if every == "hourly" and not hour_changed: 217 + continue 218 + if every == "daily" and not day_changed: 219 + continue 220 + 221 + if not _is_due(entry, _state.get(name), now): 222 + continue 223 + 224 + ref = f"sched:{name}:{now_ms()}" 225 + cmd = entry["cmd"] 226 + 227 + if _callosum: 228 + ok = _callosum.emit("supervisor", "request", cmd=cmd, ref=ref) 229 + if ok: 230 + logger.info( 231 + "Scheduled task submitted: %s → %s (ref=%s)", 232 + name, 233 + " ".join(cmd), 234 + ref, 235 + ) 236 + _state.setdefault(name, {})["last_run"] = time.time() 237 + submitted = True 238 + else: 239 + logger.warning( 240 + "Failed to emit scheduled task %s (callosum not connected)", name 241 + ) 242 + else: 243 + logger.warning("No callosum connection for scheduled task: %s", name) 244 + 245 + if submitted: 246 + try: 247 + save_state() 248 + except Exception as exc: 249 + logger.warning("Failed to save scheduler state: %s", exc) 250 + 251 + 252 + def collect_status() -> list[dict[str, Any]]: 253 + """Return schedule status for supervisor.status events.""" 254 + now = datetime.now() 255 + result = [] 256 + for name, entry in _entries.items(): 257 + state_entry = _state.get(name) 258 + last_run = (state_entry or {}).get("last_run") 259 + result.append( 260 + { 261 + "name": name, 262 + "every": entry["every"], 263 + "last_run": last_run, 264 + "due": _is_due(entry, state_entry, now), 265 + } 266 + ) 267 + return result 268 + 269 + 270 + # --------------------------------------------------------------------------- 271 + # CLI: sol schedule 272 + # --------------------------------------------------------------------------- 273 + 274 + 275 + def _format_timestamp(epoch: float | None) -> str: 276 + """Format an epoch timestamp for display.""" 277 + if epoch is None: 278 + return "never" 279 + try: 280 + return datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M") 281 + except (OSError, ValueError): 282 + return "invalid" 283 + 284 + 285 + def _format_next_due(entry: dict, state_entry: dict | None, now: datetime) -> str: 286 + """Format the next due time for display.""" 287 + if _is_due(entry, state_entry, now): 288 + return "now" 289 + 290 + every = entry["every"] 291 + if every == "hourly": 292 + nxt = _hour_mark(now) + timedelta(hours=1) 293 + return nxt.strftime("%H:%M") 294 + if every == "daily": 295 + return "midnight" 296 + return "?" 297 + 298 + 299 + def main() -> None: 300 + """CLI entry point for sol schedule.""" 301 + parser = argparse.ArgumentParser(description="Show scheduled tasks") 302 + setup_cli(parser) 303 + 304 + journal = Path(get_journal()) 305 + config_path = journal / "config" / "schedules.json" 306 + state_path = journal / "health" / "scheduler.json" 307 + 308 + # Load config (all entries, including disabled for display) 309 + config: dict[str, Any] = {} 310 + if config_path.exists(): 311 + try: 312 + with open(config_path, "r", encoding="utf-8") as f: 313 + config = json.load(f) 314 + except (json.JSONDecodeError, OSError) as exc: 315 + print(f"Error reading {config_path}: {exc}") 316 + return 317 + 318 + if not config: 319 + print("No schedules configured.") 320 + print(f"\nAdd schedules to: {config_path}") 321 + return 322 + 323 + # Load state 324 + state: dict[str, Any] = {} 325 + if state_path.exists(): 326 + try: 327 + with open(state_path, "r", encoding="utf-8") as f: 328 + state = json.load(f) 329 + except (json.JSONDecodeError, OSError): 330 + pass 331 + 332 + now = datetime.now() 333 + 334 + # Compute column widths 335 + names = list(config.keys()) 336 + name_width = max(max(len(n) for n in names), 4) 337 + every_width = 8 338 + last_run_width = 18 339 + next_due_width = 10 340 + 341 + # Header 342 + header = ( 343 + f" {'NAME':<{name_width}} {'EVERY':<{every_width}} " 344 + f"{'LAST RUN':<{last_run_width}} {'NEXT DUE':<{next_due_width}} CMD" 345 + ) 346 + print(header) 347 + print() 348 + 349 + for name, raw_entry in sorted(config.items()): 350 + if not isinstance(raw_entry, dict): 351 + continue 352 + 353 + every = raw_entry.get("every", "?") 354 + cmd = raw_entry.get("cmd", []) 355 + enabled = raw_entry.get("enabled", True) 356 + state_entry = state.get(name) 357 + 358 + last_run_str = _format_timestamp((state_entry or {}).get("last_run")) 359 + 360 + # Build a validated entry for _is_due / _format_next_due 361 + if every in INTERVALS and enabled: 362 + entry = {"cmd": cmd, "every": every} 363 + next_due_str = _format_next_due(entry, state_entry, now) 364 + else: 365 + next_due_str = "disabled" if not enabled else "?" 366 + 367 + cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd) 368 + 369 + tags = "" 370 + if not enabled: 371 + tags = " [disabled]" 372 + 373 + line = ( 374 + f" {name:<{name_width}} {every:<{every_width}} " 375 + f"{last_run_str:<{last_run_width}} {next_due_str:<{next_due_width}} {cmd_str}{tags}" 376 + ) 377 + print(line.rstrip()) 378 + 379 + print() 380 + print(f"Config: {config_path}")
+28
think/supervisor.py
··· 19 19 from desktop_notifier import DesktopNotifier, Urgency 20 20 21 21 from observe.sync import check_remote_health 22 + from think import scheduler 22 23 from think.callosum import CallosumConnection, CallosumServer 23 24 from think.runner import DailyLogWriter 24 25 from think.runner import ManagedProcess as RunnerManagedProcess ··· 776 777 # Stale heartbeats 777 778 stale = check_health() 778 779 780 + # Scheduled tasks 781 + schedules = scheduler.collect_status() 782 + 779 783 return { 780 784 "services": services, 781 785 "crashed": crashed, 782 786 "tasks": tasks, 783 787 "queues": queues, 784 788 "stale_heartbeats": stale, 789 + "schedules": schedules, 785 790 } 786 791 787 792 ··· 1272 1277 threshold: int = DEFAULT_THRESHOLD, 1273 1278 interval: int = CHECK_INTERVAL, 1274 1279 daily: bool = True, 1280 + schedule: bool = True, 1275 1281 procs: list[ManagedProcess] | None = None, 1276 1282 ) -> None: 1277 1283 """Monitor health via Callosum events and alert when stale. ··· 1331 1337 if daily: 1332 1338 handle_daily_tasks() 1333 1339 1340 + # Check periodic task schedules (non-blocking, submits via callosum) 1341 + if schedule: 1342 + scheduler.check() 1343 + 1334 1344 # Sleep 1 second before next iteration (responsive to shutdown) 1335 1345 await asyncio.sleep(1) 1336 1346 finally: ··· 1376 1386 help="Do not start the Convey web application", 1377 1387 ) 1378 1388 parser.add_argument( 1389 + "--no-schedule", 1390 + action="store_true", 1391 + help="Disable periodic task scheduler", 1392 + ) 1393 + parser.add_argument( 1379 1394 "--remote", 1380 1395 type=str, 1381 1396 help="Remote mode: sync to server URL instead of local processing", ··· 1508 1523 # Initialize daily state to today - dream only triggers at midnight when day changes 1509 1524 _daily_state["last_day"] = datetime.now().date() 1510 1525 1526 + # Initialize periodic task scheduler 1527 + schedule_enabled = not args.no_schedule and not _is_remote_mode 1528 + if schedule_enabled and _supervisor_callosum: 1529 + scheduler.init(_supervisor_callosum) 1530 + 1511 1531 # Show Convey URL if running 1512 1532 if convey_port: 1513 1533 print(f"Convey: http://localhost:{convey_port}/") ··· 1523 1543 threshold=args.threshold, 1524 1544 interval=args.interval, 1525 1545 daily=daily_enabled, 1546 + schedule=schedule_enabled, 1526 1547 procs=procs if procs else None, 1527 1548 ) 1528 1549 ) ··· 1555 1576 except Exception: 1556 1577 pass 1557 1578 managed.cleanup() 1579 + 1580 + # Save scheduler state before disconnecting 1581 + if schedule_enabled and scheduler._state: 1582 + try: 1583 + scheduler.save_state() 1584 + except Exception as exc: 1585 + logging.warning("Failed to save scheduler state on shutdown: %s", exc) 1558 1586 1559 1587 # Disconnect supervisor's Callosum connection 1560 1588 if _supervisor_callosum: