personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at d18a7c02359cd827d0ff15058861de5c2600a96f 636 lines 21 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Clock-aligned task scheduler for the supervisor. 5 6Reads schedule definitions from config/schedules.json and submits tasks 7via Callosum at hour and day boundaries. State (last-run times) persists 8to health/scheduler.json across restarts. 9 10Runtime functions (init, check) are used by the supervisor. 11The main() function provides the ``sol schedule`` CLI. 12""" 13 14from __future__ import annotations 15 16import argparse 17import json 18import logging 19import tempfile 20import time 21from datetime import datetime, timedelta 22from pathlib import Path 23from typing import Any 24 25from think.utils import get_journal, now_ms, require_solstone, setup_cli 26 27logger = logging.getLogger(__name__) 28 29# Valid schedule intervals 30INTERVALS = {"hourly", "daily", "weekly"} 31 32# --------------------------------------------------------------------------- 33# Module state (populated by init(), used by check()) 34# --------------------------------------------------------------------------- 35_entries: dict[str, dict[str, Any]] = {} 36_state: dict[str, dict[str, Any]] = {} 37_callosum: Any = None # CallosumConnection 38_last_hour: datetime | None = None 39_daily_time: str | None = None 40_last_daily_mark: datetime | None = None 41_weekly_day: str | None = None 42_weekly_time: str | None = None 43_last_weekly_mark: datetime | None = None 44 45 46# --------------------------------------------------------------------------- 47# Config + state I/O 48# --------------------------------------------------------------------------- 49 50 51def load_config() -> dict[str, dict[str, Any]]: 52 """Read config/schedules.json and return validated entries.""" 53 global _daily_time, _weekly_day, _weekly_time 54 55 config_path = Path(get_journal()) / "config" / "schedules.json" 56 if not config_path.exists(): 57 _daily_time = None 58 _weekly_day = None 59 _weekly_time = None 60 return {} 61 62 try: 63 with open(config_path, "r", encoding="utf-8") as f: 64 raw = json.load(f) 65 except (json.JSONDecodeError, OSError) as exc: 66 logger.warning("Failed to load schedules config: %s", exc) 67 _daily_time = None 68 _weekly_day = None 69 _weekly_time = None 70 return {} 71 72 if not isinstance(raw, dict): 73 logger.warning( 74 "schedules.json must be a JSON object, got %s", type(raw).__name__ 75 ) 76 _daily_time = None 77 _weekly_day = None 78 _weekly_time = None 79 return {} 80 81 # Extract daily_time metadata (not a schedule entry) 82 _daily_time = raw.pop("daily_time", None) 83 if _daily_time is not None and not isinstance(_daily_time, str): 84 logger.warning("schedules.json: daily_time must be a string, ignoring") 85 _daily_time = None 86 87 # Extract weekly_day metadata 88 _weekly_day = raw.pop("weekly_day", None) 89 if _weekly_day is not None and not isinstance(_weekly_day, str): 90 logger.warning("schedules.json: weekly_day must be a string, ignoring") 91 _weekly_day = None 92 elif _weekly_day is not None and _parse_weekly_day(_weekly_day) is None: 93 logger.warning( 94 "schedules.json: unrecognized weekly_day '%s', ignoring", _weekly_day 95 ) 96 _weekly_day = None 97 98 # Extract weekly_time metadata 99 _weekly_time = raw.pop("weekly_time", None) 100 if _weekly_time is not None and not isinstance(_weekly_time, str): 101 logger.warning("schedules.json: weekly_time must be a string, ignoring") 102 _weekly_time = None 103 104 entries: dict[str, dict[str, Any]] = {} 105 for name, entry in raw.items(): 106 if not isinstance(entry, dict): 107 logger.warning("Schedule '%s': expected object, skipping", name) 108 continue 109 110 cmd = entry.get("cmd") 111 if not cmd or not isinstance(cmd, list): 112 logger.warning("Schedule '%s': missing or invalid 'cmd', skipping", name) 113 continue 114 115 every = entry.get("every") 116 if every not in INTERVALS: 117 logger.warning( 118 "Schedule '%s': unknown interval '%s' (expected %s), skipping", 119 name, 120 every, 121 "/".join(sorted(INTERVALS)), 122 ) 123 continue 124 125 if not entry.get("enabled", True): 126 continue 127 128 entries[name] = {"cmd": cmd, "every": every} 129 130 return entries 131 132 133def load_state() -> dict[str, dict[str, Any]]: 134 """Read health/scheduler.json.""" 135 state_path = Path(get_journal()) / "health" / "scheduler.json" 136 if not state_path.exists(): 137 return {} 138 139 try: 140 with open(state_path, "r", encoding="utf-8") as f: 141 return json.load(f) 142 except (json.JSONDecodeError, OSError) as exc: 143 logger.warning("Failed to load scheduler state: %s", exc) 144 return {} 145 146 147def save_state() -> None: 148 """Persist _state to health/scheduler.json atomically.""" 149 health_dir = Path(get_journal()) / "health" 150 health_dir.mkdir(parents=True, exist_ok=True) 151 state_path = health_dir / "scheduler.json" 152 153 fd, tmp_path = tempfile.mkstemp(dir=health_dir, suffix=".tmp", prefix=".scheduler_") 154 tmp_file = Path(tmp_path) 155 try: 156 with open(fd, "w", encoding="utf-8") as f: 157 json.dump(_state, f, indent=2) 158 tmp_file.replace(state_path) 159 except BaseException: 160 tmp_file.unlink(missing_ok=True) 161 raise 162 163 164# --------------------------------------------------------------------------- 165# Boundary helpers 166# --------------------------------------------------------------------------- 167 168 169def _hour_mark(dt: datetime) -> datetime: 170 """Truncate datetime to the start of its hour.""" 171 return dt.replace(minute=0, second=0, microsecond=0) 172 173 174def _parse_daily_time(raw: str | None) -> tuple[int, int] | None: 175 """Parse HH:MM daily time string. Returns (hour, minute) or None.""" 176 if not raw or not isinstance(raw, str): 177 return None 178 parts = raw.split(":") 179 if len(parts) != 2: 180 return None 181 try: 182 h, m = int(parts[0]), int(parts[1]) 183 if 0 <= h <= 23 and 0 <= m <= 59: 184 return (h, m) 185 except ValueError: 186 return None 187 return None 188 189 190DAY_NAMES: dict[str, int] = { 191 "monday": 0, 192 "mon": 0, 193 "tuesday": 1, 194 "tue": 1, 195 "wednesday": 2, 196 "wed": 2, 197 "thursday": 3, 198 "thu": 3, 199 "friday": 4, 200 "fri": 4, 201 "saturday": 5, 202 "sat": 5, 203 "sunday": 6, 204 "sun": 6, 205} 206 207 208def _parse_weekly_day(raw: str | None) -> int | None: 209 """Parse day-of-week name. Returns weekday int (0=Monday, 6=Sunday) or None.""" 210 if not raw or not isinstance(raw, str): 211 return None 212 return DAY_NAMES.get(raw.strip().lower()) 213 214 215def _compute_daily_mark(now: datetime, daily_time_str: str | None) -> datetime: 216 """Compute the most recent daily boundary datetime. 217 218 With a configured daily_time (e.g. "03:00"), the boundary is that time 219 today if already passed, otherwise that time yesterday. Without a 220 configured time, falls back to midnight (start of today). 221 """ 222 parsed = _parse_daily_time(daily_time_str) 223 if parsed is None: 224 return now.replace(hour=0, minute=0, second=0, microsecond=0) 225 h, m = parsed 226 today_mark = now.replace(hour=h, minute=m, second=0, microsecond=0) 227 if now >= today_mark: 228 return today_mark 229 return today_mark - timedelta(days=1) 230 231 232def _compute_weekly_mark( 233 now: datetime, weekly_day: int, weekly_time_str: str | None 234) -> datetime: 235 """Compute the most recent weekly boundary datetime. 236 237 Returns the most recent occurrence of the target weekday at the target time. 238 If now is past this week's boundary, returns this week's. Otherwise last week's. 239 """ 240 parsed = _parse_daily_time(weekly_time_str) 241 if parsed is None: 242 h, m = 3, 0 # default 03:00 243 else: 244 h, m = parsed 245 days_since = (now.weekday() - weekly_day) % 7 246 target_date = now - timedelta(days=days_since) 247 target_mark = target_date.replace(hour=h, minute=m, second=0, microsecond=0) 248 if now >= target_mark: 249 return target_mark 250 return target_mark - timedelta(weeks=1) 251 252 253def _is_due(entry: dict, state_entry: dict | None, now: datetime) -> bool: 254 """Check if an entry is due based on its interval and last_run.""" 255 last_run = (state_entry or {}).get("last_run") 256 if last_run is None: 257 return True 258 259 try: 260 last_dt = datetime.fromtimestamp(last_run) 261 except (OSError, ValueError): 262 return True 263 264 every = entry["every"] 265 if every == "hourly": 266 return last_dt < _hour_mark(now) 267 if every == "daily": 268 return last_dt < _compute_daily_mark(now, _daily_time) 269 if every == "weekly": 270 weekly_day_val = _parse_weekly_day(_weekly_day) 271 if weekly_day_val is None: 272 weekly_day_val = 6 # default Sunday 273 return last_dt < _compute_weekly_mark(now, weekly_day_val, _weekly_time) 274 return False 275 276 277# --------------------------------------------------------------------------- 278# Runtime API (called by supervisor) 279# --------------------------------------------------------------------------- 280 281 282def init(callosum: Any) -> None: 283 """Initialize scheduler with a Callosum connection. Load config and state.""" 284 global _entries, _state, _callosum, _last_hour, _last_daily_mark, _last_weekly_mark 285 286 _callosum = callosum 287 _entries = load_config() 288 _state = load_state() 289 290 now = datetime.now() 291 _last_hour = _hour_mark(now) 292 _last_daily_mark = _compute_daily_mark(now, _daily_time) 293 weekly_day_val = _parse_weekly_day(_weekly_day) 294 if weekly_day_val is None: 295 weekly_day_val = 6 296 _last_weekly_mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time) 297 298 if _entries: 299 logger.info( 300 "Scheduler initialized with %d schedule(s): %s", 301 len(_entries), 302 ", ".join(sorted(_entries)), 303 ) 304 else: 305 logger.info("Scheduler initialized (no schedules configured)") 306 307 308def register_defaults() -> None: 309 """Ensure built-in default schedules exist in the config file. 310 311 Called by the supervisor after init(). Writes missing defaults to 312 config/schedules.json and reloads entries. 313 """ 314 global _entries 315 316 need_heartbeat = "heartbeat" not in _entries 317 need_weekly = "weekly-agents" not in _entries 318 319 if not need_heartbeat and not need_weekly: 320 return 321 322 # Read raw config (preserving daily_time and other entries) 323 config_dir = Path(get_journal()) / "config" 324 config_dir.mkdir(parents=True, exist_ok=True) 325 config_path = config_dir / "schedules.json" 326 327 raw: dict[str, Any] = {} 328 if config_path.exists(): 329 try: 330 with open(config_path, "r", encoding="utf-8") as f: 331 raw = json.load(f) 332 except (json.JSONDecodeError, OSError): 333 pass 334 335 if not isinstance(raw, dict): 336 raw = {} 337 338 changed = False 339 340 if need_heartbeat and "heartbeat" not in raw: 341 raw["heartbeat"] = { 342 "cmd": ["sol", "heartbeat"], 343 "every": "daily", 344 "enabled": True, 345 } 346 changed = True 347 348 if need_weekly and "weekly-agents" not in raw: 349 raw["weekly-agents"] = { 350 "cmd": ["sol", "think", "--weekly", "-v"], 351 "every": "weekly", 352 "enabled": True, 353 } 354 changed = True 355 356 if not changed: 357 return 358 359 # Atomic write 360 fd, tmp_path = tempfile.mkstemp(dir=config_dir, suffix=".tmp", prefix=".schedules_") 361 tmp_file = Path(tmp_path) 362 try: 363 with open(fd, "w", encoding="utf-8") as f: 364 json.dump(raw, f, indent=2) 365 tmp_file.replace(config_path) 366 logger.info("Auto-registered default schedule(s) in config/schedules.json") 367 except BaseException: 368 tmp_file.unlink(missing_ok=True) 369 raise 370 371 # Reload to pick up the new entry 372 _entries = load_config() 373 374 375def check() -> None: 376 """Check for clock boundaries and submit due tasks. 377 378 Called each supervisor tick (~1s). Does nothing unless an hour or day 379 boundary has been crossed since the last check. 380 """ 381 global _entries, _last_hour, _last_daily_mark, _last_weekly_mark 382 383 if _last_hour is None: 384 return 385 386 now = datetime.now() 387 current_hour = _hour_mark(now) 388 current_daily_mark = _compute_daily_mark(now, _daily_time) 389 weekly_day_val = _parse_weekly_day(_weekly_day) 390 if weekly_day_val is None: 391 weekly_day_val = 6 392 current_weekly_mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time) 393 394 hour_changed = current_hour != _last_hour 395 daily_mark_changed = current_daily_mark != _last_daily_mark 396 weekly_mark_changed = current_weekly_mark != _last_weekly_mark 397 398 if not hour_changed and not daily_mark_changed and not weekly_mark_changed: 399 return 400 401 # Boundary crossed — reload config for freshest definitions 402 _entries = load_config() 403 _last_hour = current_hour 404 # Recompute with potentially updated _daily_time from config reload 405 new_daily_mark = _compute_daily_mark(now, _daily_time) 406 if new_daily_mark != _last_daily_mark: 407 daily_mark_changed = True 408 _last_daily_mark = new_daily_mark 409 new_weekly_day_val = _parse_weekly_day(_weekly_day) 410 if new_weekly_day_val is None: 411 new_weekly_day_val = 6 412 new_weekly_mark = _compute_weekly_mark(now, new_weekly_day_val, _weekly_time) 413 if new_weekly_mark != _last_weekly_mark: 414 weekly_mark_changed = True 415 _last_weekly_mark = new_weekly_mark 416 417 if not _entries: 418 return 419 420 submitted = False 421 for name, entry in _entries.items(): 422 every = entry["every"] 423 424 # Only check entries matching the boundary that changed 425 if every == "hourly" and not hour_changed: 426 continue 427 if every == "daily" and not daily_mark_changed: 428 continue 429 if every == "weekly" and not weekly_mark_changed: 430 continue 431 432 if not _is_due(entry, _state.get(name), now): 433 continue 434 435 ref = f"sched:{name}:{now_ms()}" 436 cmd = entry["cmd"] 437 438 if _callosum: 439 ok = _callosum.emit("supervisor", "request", cmd=cmd, ref=ref) 440 if ok: 441 logger.info( 442 "Scheduled task submitted: %s%s (ref=%s)", 443 name, 444 " ".join(cmd), 445 ref, 446 ) 447 _state.setdefault(name, {})["last_run"] = time.time() 448 submitted = True 449 else: 450 logger.warning( 451 "Failed to emit scheduled task %s (callosum not connected)", name 452 ) 453 else: 454 logger.warning("No callosum connection for scheduled task: %s", name) 455 456 if submitted: 457 try: 458 save_state() 459 except Exception as exc: 460 logger.warning("Failed to save scheduler state: %s", exc) 461 462 463def collect_status() -> list[dict[str, Any]]: 464 """Return schedule status for supervisor.status events.""" 465 now = datetime.now() 466 result = [] 467 for name, entry in _entries.items(): 468 state_entry = _state.get(name) 469 last_run = (state_entry or {}).get("last_run") 470 entry_status = { 471 "name": name, 472 "every": entry["every"], 473 "last_run": last_run, 474 "due": _is_due(entry, state_entry, now), 475 } 476 entry_status["next_run"] = _compute_next_run(entry, state_entry, now) 477 if entry["every"] == "daily" and _daily_time: 478 entry_status["daily_time"] = _daily_time 479 if entry["every"] == "weekly": 480 if _weekly_day: 481 entry_status["weekly_day"] = _weekly_day 482 if _weekly_time: 483 entry_status["weekly_time"] = _weekly_time 484 result.append(entry_status) 485 return result 486 487 488def _compute_next_run(entry: dict, state_entry: dict | None, now: datetime) -> int: 489 """Compute next run time as epoch milliseconds.""" 490 every = entry["every"] 491 if every == "hourly": 492 mark = _hour_mark(now) 493 nxt = mark if _is_due(entry, state_entry, now) else mark + timedelta(hours=1) 494 elif every == "daily": 495 mark = _compute_daily_mark(now, _daily_time) 496 nxt = mark if _is_due(entry, state_entry, now) else mark + timedelta(days=1) 497 elif every == "weekly": 498 weekly_day_val = _parse_weekly_day(_weekly_day) 499 if weekly_day_val is None: 500 weekly_day_val = 6 501 mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time) 502 nxt = mark if _is_due(entry, state_entry, now) else mark + timedelta(weeks=1) 503 else: 504 return int(now.timestamp() * 1000) 505 return int(nxt.timestamp() * 1000) 506 507 508# --------------------------------------------------------------------------- 509# CLI: sol schedule 510# --------------------------------------------------------------------------- 511 512 513def _format_timestamp(epoch: float | None) -> str: 514 """Format an epoch timestamp for display.""" 515 if epoch is None: 516 return "never" 517 try: 518 return datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M") 519 except (OSError, ValueError): 520 return "invalid" 521 522 523def _format_next_due(entry: dict, state_entry: dict | None, now: datetime) -> str: 524 """Format the next due time for display.""" 525 if _is_due(entry, state_entry, now): 526 return "now" 527 528 every = entry["every"] 529 if every == "hourly": 530 nxt = _hour_mark(now) + timedelta(hours=1) 531 return nxt.strftime("%H:%M") 532 if every == "daily": 533 parsed = _parse_daily_time(_daily_time) 534 return f"{parsed[0]:02d}:{parsed[1]:02d}" if parsed else "midnight" 535 if every == "weekly": 536 weekly_day_val = _parse_weekly_day(_weekly_day) 537 if weekly_day_val is None: 538 weekly_day_val = 6 539 weekly_mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time) 540 nxt = weekly_mark + timedelta(weeks=1) 541 return f"{nxt.strftime('%A')} {nxt.strftime('%H:%M')}" 542 return "?" 543 544 545def main() -> None: 546 """CLI entry point for sol schedule.""" 547 parser = argparse.ArgumentParser(description="Show scheduled tasks") 548 setup_cli(parser) 549 require_solstone() 550 551 journal = Path(get_journal()) 552 config_path = journal / "config" / "schedules.json" 553 state_path = journal / "health" / "scheduler.json" 554 555 # Load config (all entries, including disabled for display) 556 config: dict[str, Any] = {} 557 if config_path.exists(): 558 try: 559 with open(config_path, "r", encoding="utf-8") as f: 560 config = json.load(f) 561 except (json.JSONDecodeError, OSError) as exc: 562 print(f"Error reading {config_path}: {exc}") 563 return 564 565 # Extract daily_time metadata before processing entries 566 global _daily_time, _weekly_day, _weekly_time 567 raw_daily_time = config.pop("daily_time", None) 568 _daily_time = raw_daily_time if isinstance(raw_daily_time, str) else None 569 raw_weekly_day = config.pop("weekly_day", None) 570 raw_weekly_time = config.pop("weekly_time", None) 571 _weekly_day = raw_weekly_day if isinstance(raw_weekly_day, str) else None 572 _weekly_time = raw_weekly_time if isinstance(raw_weekly_time, str) else None 573 574 if not config: 575 print("No schedules configured.") 576 print(f"\nAdd schedules to: {config_path}") 577 return 578 579 # Load state 580 state: dict[str, Any] = {} 581 if state_path.exists(): 582 try: 583 with open(state_path, "r", encoding="utf-8") as f: 584 state = json.load(f) 585 except (json.JSONDecodeError, OSError): 586 pass 587 588 now = datetime.now() 589 590 # Compute column widths 591 names = list(config.keys()) 592 name_width = max(max(len(n) for n in names), 4) 593 every_width = 8 594 last_run_width = 18 595 next_due_width = 10 596 597 # Header 598 header = ( 599 f" {'NAME':<{name_width}} {'EVERY':<{every_width}} " 600 f"{'LAST RUN':<{last_run_width}} {'NEXT DUE':<{next_due_width}} CMD" 601 ) 602 print(header) 603 print() 604 605 for name, raw_entry in sorted(config.items()): 606 if not isinstance(raw_entry, dict): 607 continue 608 609 every = raw_entry.get("every", "?") 610 cmd = raw_entry.get("cmd", []) 611 enabled = raw_entry.get("enabled", True) 612 state_entry = state.get(name) 613 614 last_run_str = _format_timestamp((state_entry or {}).get("last_run")) 615 616 # Build a validated entry for _is_due / _format_next_due 617 if every in INTERVALS and enabled: 618 entry = {"cmd": cmd, "every": every} 619 next_due_str = _format_next_due(entry, state_entry, now) 620 else: 621 next_due_str = "disabled" if not enabled else "?" 622 623 cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd) 624 625 tags = "" 626 if not enabled: 627 tags = " [disabled]" 628 629 line = ( 630 f" {name:<{name_width}} {every:<{every_width}} " 631 f"{last_run_str:<{last_run_width}} {next_due_str:<{next_due_width}} {cmd_str}{tags}" 632 ) 633 print(line.rstrip()) 634 635 print() 636 print(f"Config: {config_path}")