personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Clock-aligned task scheduler for the supervisor.
5
6Reads schedule definitions from config/schedules.json and submits tasks
7via Callosum at hour and day boundaries. State (last-run times) persists
8to health/scheduler.json across restarts.
9
10Runtime functions (init, check) are used by the supervisor.
11The main() function provides the ``sol schedule`` CLI.
12"""
13
14from __future__ import annotations
15
16import argparse
17import json
18import logging
19import tempfile
20import time
21from datetime import datetime, timedelta
22from pathlib import Path
23from typing import Any
24
25from think.utils import get_journal, now_ms, require_solstone, setup_cli
26
27logger = logging.getLogger(__name__)
28
29# Valid schedule intervals
30INTERVALS = {"hourly", "daily", "weekly"}
31
32# ---------------------------------------------------------------------------
33# Module state (populated by init(), used by check())
34# ---------------------------------------------------------------------------
35_entries: dict[str, dict[str, Any]] = {}
36_state: dict[str, dict[str, Any]] = {}
37_callosum: Any = None # CallosumConnection
38_last_hour: datetime | None = None
39_daily_time: str | None = None
40_last_daily_mark: datetime | None = None
41_weekly_day: str | None = None
42_weekly_time: str | None = None
43_last_weekly_mark: datetime | None = None
44
45
46# ---------------------------------------------------------------------------
47# Config + state I/O
48# ---------------------------------------------------------------------------
49
50
51def load_config() -> dict[str, dict[str, Any]]:
52 """Read config/schedules.json and return validated entries."""
53 global _daily_time, _weekly_day, _weekly_time
54
55 config_path = Path(get_journal()) / "config" / "schedules.json"
56 if not config_path.exists():
57 _daily_time = None
58 _weekly_day = None
59 _weekly_time = None
60 return {}
61
62 try:
63 with open(config_path, "r", encoding="utf-8") as f:
64 raw = json.load(f)
65 except (json.JSONDecodeError, OSError) as exc:
66 logger.warning("Failed to load schedules config: %s", exc)
67 _daily_time = None
68 _weekly_day = None
69 _weekly_time = None
70 return {}
71
72 if not isinstance(raw, dict):
73 logger.warning(
74 "schedules.json must be a JSON object, got %s", type(raw).__name__
75 )
76 _daily_time = None
77 _weekly_day = None
78 _weekly_time = None
79 return {}
80
81 # Extract daily_time metadata (not a schedule entry)
82 _daily_time = raw.pop("daily_time", None)
83 if _daily_time is not None and not isinstance(_daily_time, str):
84 logger.warning("schedules.json: daily_time must be a string, ignoring")
85 _daily_time = None
86
87 # Extract weekly_day metadata
88 _weekly_day = raw.pop("weekly_day", None)
89 if _weekly_day is not None and not isinstance(_weekly_day, str):
90 logger.warning("schedules.json: weekly_day must be a string, ignoring")
91 _weekly_day = None
92 elif _weekly_day is not None and _parse_weekly_day(_weekly_day) is None:
93 logger.warning(
94 "schedules.json: unrecognized weekly_day '%s', ignoring", _weekly_day
95 )
96 _weekly_day = None
97
98 # Extract weekly_time metadata
99 _weekly_time = raw.pop("weekly_time", None)
100 if _weekly_time is not None and not isinstance(_weekly_time, str):
101 logger.warning("schedules.json: weekly_time must be a string, ignoring")
102 _weekly_time = None
103
104 entries: dict[str, dict[str, Any]] = {}
105 for name, entry in raw.items():
106 if not isinstance(entry, dict):
107 logger.warning("Schedule '%s': expected object, skipping", name)
108 continue
109
110 cmd = entry.get("cmd")
111 if not cmd or not isinstance(cmd, list):
112 logger.warning("Schedule '%s': missing or invalid 'cmd', skipping", name)
113 continue
114
115 every = entry.get("every")
116 if every not in INTERVALS:
117 logger.warning(
118 "Schedule '%s': unknown interval '%s' (expected %s), skipping",
119 name,
120 every,
121 "/".join(sorted(INTERVALS)),
122 )
123 continue
124
125 if not entry.get("enabled", True):
126 continue
127
128 entries[name] = {"cmd": cmd, "every": every}
129
130 return entries
131
132
133def load_state() -> dict[str, dict[str, Any]]:
134 """Read health/scheduler.json."""
135 state_path = Path(get_journal()) / "health" / "scheduler.json"
136 if not state_path.exists():
137 return {}
138
139 try:
140 with open(state_path, "r", encoding="utf-8") as f:
141 return json.load(f)
142 except (json.JSONDecodeError, OSError) as exc:
143 logger.warning("Failed to load scheduler state: %s", exc)
144 return {}
145
146
147def save_state() -> None:
148 """Persist _state to health/scheduler.json atomically."""
149 health_dir = Path(get_journal()) / "health"
150 health_dir.mkdir(parents=True, exist_ok=True)
151 state_path = health_dir / "scheduler.json"
152
153 fd, tmp_path = tempfile.mkstemp(dir=health_dir, suffix=".tmp", prefix=".scheduler_")
154 tmp_file = Path(tmp_path)
155 try:
156 with open(fd, "w", encoding="utf-8") as f:
157 json.dump(_state, f, indent=2)
158 tmp_file.replace(state_path)
159 except BaseException:
160 tmp_file.unlink(missing_ok=True)
161 raise
162
163
164# ---------------------------------------------------------------------------
165# Boundary helpers
166# ---------------------------------------------------------------------------
167
168
169def _hour_mark(dt: datetime) -> datetime:
170 """Truncate datetime to the start of its hour."""
171 return dt.replace(minute=0, second=0, microsecond=0)
172
173
174def _parse_daily_time(raw: str | None) -> tuple[int, int] | None:
175 """Parse HH:MM daily time string. Returns (hour, minute) or None."""
176 if not raw or not isinstance(raw, str):
177 return None
178 parts = raw.split(":")
179 if len(parts) != 2:
180 return None
181 try:
182 h, m = int(parts[0]), int(parts[1])
183 if 0 <= h <= 23 and 0 <= m <= 59:
184 return (h, m)
185 except ValueError:
186 return None
187 return None
188
189
190DAY_NAMES: dict[str, int] = {
191 "monday": 0,
192 "mon": 0,
193 "tuesday": 1,
194 "tue": 1,
195 "wednesday": 2,
196 "wed": 2,
197 "thursday": 3,
198 "thu": 3,
199 "friday": 4,
200 "fri": 4,
201 "saturday": 5,
202 "sat": 5,
203 "sunday": 6,
204 "sun": 6,
205}
206
207
208def _parse_weekly_day(raw: str | None) -> int | None:
209 """Parse day-of-week name. Returns weekday int (0=Monday, 6=Sunday) or None."""
210 if not raw or not isinstance(raw, str):
211 return None
212 return DAY_NAMES.get(raw.strip().lower())
213
214
215def _compute_daily_mark(now: datetime, daily_time_str: str | None) -> datetime:
216 """Compute the most recent daily boundary datetime.
217
218 With a configured daily_time (e.g. "03:00"), the boundary is that time
219 today if already passed, otherwise that time yesterday. Without a
220 configured time, falls back to midnight (start of today).
221 """
222 parsed = _parse_daily_time(daily_time_str)
223 if parsed is None:
224 return now.replace(hour=0, minute=0, second=0, microsecond=0)
225 h, m = parsed
226 today_mark = now.replace(hour=h, minute=m, second=0, microsecond=0)
227 if now >= today_mark:
228 return today_mark
229 return today_mark - timedelta(days=1)
230
231
232def _compute_weekly_mark(
233 now: datetime, weekly_day: int, weekly_time_str: str | None
234) -> datetime:
235 """Compute the most recent weekly boundary datetime.
236
237 Returns the most recent occurrence of the target weekday at the target time.
238 If now is past this week's boundary, returns this week's. Otherwise last week's.
239 """
240 parsed = _parse_daily_time(weekly_time_str)
241 if parsed is None:
242 h, m = 3, 0 # default 03:00
243 else:
244 h, m = parsed
245 days_since = (now.weekday() - weekly_day) % 7
246 target_date = now - timedelta(days=days_since)
247 target_mark = target_date.replace(hour=h, minute=m, second=0, microsecond=0)
248 if now >= target_mark:
249 return target_mark
250 return target_mark - timedelta(weeks=1)
251
252
253def _is_due(entry: dict, state_entry: dict | None, now: datetime) -> bool:
254 """Check if an entry is due based on its interval and last_run."""
255 last_run = (state_entry or {}).get("last_run")
256 if last_run is None:
257 return True
258
259 try:
260 last_dt = datetime.fromtimestamp(last_run)
261 except (OSError, ValueError):
262 return True
263
264 every = entry["every"]
265 if every == "hourly":
266 return last_dt < _hour_mark(now)
267 if every == "daily":
268 return last_dt < _compute_daily_mark(now, _daily_time)
269 if every == "weekly":
270 weekly_day_val = _parse_weekly_day(_weekly_day)
271 if weekly_day_val is None:
272 weekly_day_val = 6 # default Sunday
273 return last_dt < _compute_weekly_mark(now, weekly_day_val, _weekly_time)
274 return False
275
276
277# ---------------------------------------------------------------------------
278# Runtime API (called by supervisor)
279# ---------------------------------------------------------------------------
280
281
282def init(callosum: Any) -> None:
283 """Initialize scheduler with a Callosum connection. Load config and state."""
284 global _entries, _state, _callosum, _last_hour, _last_daily_mark, _last_weekly_mark
285
286 _callosum = callosum
287 _entries = load_config()
288 _state = load_state()
289
290 now = datetime.now()
291 _last_hour = _hour_mark(now)
292 _last_daily_mark = _compute_daily_mark(now, _daily_time)
293 weekly_day_val = _parse_weekly_day(_weekly_day)
294 if weekly_day_val is None:
295 weekly_day_val = 6
296 _last_weekly_mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time)
297
298 if _entries:
299 logger.info(
300 "Scheduler initialized with %d schedule(s): %s",
301 len(_entries),
302 ", ".join(sorted(_entries)),
303 )
304 else:
305 logger.info("Scheduler initialized (no schedules configured)")
306
307
308def register_defaults() -> None:
309 """Ensure built-in default schedules exist in the config file.
310
311 Called by the supervisor after init(). Writes missing defaults to
312 config/schedules.json and reloads entries.
313 """
314 global _entries
315
316 need_heartbeat = "heartbeat" not in _entries
317 need_weekly = "weekly-agents" not in _entries
318
319 if not need_heartbeat and not need_weekly:
320 return
321
322 # Read raw config (preserving daily_time and other entries)
323 config_dir = Path(get_journal()) / "config"
324 config_dir.mkdir(parents=True, exist_ok=True)
325 config_path = config_dir / "schedules.json"
326
327 raw: dict[str, Any] = {}
328 if config_path.exists():
329 try:
330 with open(config_path, "r", encoding="utf-8") as f:
331 raw = json.load(f)
332 except (json.JSONDecodeError, OSError):
333 pass
334
335 if not isinstance(raw, dict):
336 raw = {}
337
338 changed = False
339
340 if need_heartbeat and "heartbeat" not in raw:
341 raw["heartbeat"] = {
342 "cmd": ["sol", "heartbeat"],
343 "every": "daily",
344 "enabled": True,
345 }
346 changed = True
347
348 if need_weekly and "weekly-agents" not in raw:
349 raw["weekly-agents"] = {
350 "cmd": ["sol", "think", "--weekly", "-v"],
351 "every": "weekly",
352 "enabled": True,
353 }
354 changed = True
355
356 if not changed:
357 return
358
359 # Atomic write
360 fd, tmp_path = tempfile.mkstemp(dir=config_dir, suffix=".tmp", prefix=".schedules_")
361 tmp_file = Path(tmp_path)
362 try:
363 with open(fd, "w", encoding="utf-8") as f:
364 json.dump(raw, f, indent=2)
365 tmp_file.replace(config_path)
366 logger.info("Auto-registered default schedule(s) in config/schedules.json")
367 except BaseException:
368 tmp_file.unlink(missing_ok=True)
369 raise
370
371 # Reload to pick up the new entry
372 _entries = load_config()
373
374
375def check() -> None:
376 """Check for clock boundaries and submit due tasks.
377
378 Called each supervisor tick (~1s). Does nothing unless an hour or day
379 boundary has been crossed since the last check.
380 """
381 global _entries, _last_hour, _last_daily_mark, _last_weekly_mark
382
383 if _last_hour is None:
384 return
385
386 now = datetime.now()
387 current_hour = _hour_mark(now)
388 current_daily_mark = _compute_daily_mark(now, _daily_time)
389 weekly_day_val = _parse_weekly_day(_weekly_day)
390 if weekly_day_val is None:
391 weekly_day_val = 6
392 current_weekly_mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time)
393
394 hour_changed = current_hour != _last_hour
395 daily_mark_changed = current_daily_mark != _last_daily_mark
396 weekly_mark_changed = current_weekly_mark != _last_weekly_mark
397
398 if not hour_changed and not daily_mark_changed and not weekly_mark_changed:
399 return
400
401 # Boundary crossed — reload config for freshest definitions
402 _entries = load_config()
403 _last_hour = current_hour
404 # Recompute with potentially updated _daily_time from config reload
405 new_daily_mark = _compute_daily_mark(now, _daily_time)
406 if new_daily_mark != _last_daily_mark:
407 daily_mark_changed = True
408 _last_daily_mark = new_daily_mark
409 new_weekly_day_val = _parse_weekly_day(_weekly_day)
410 if new_weekly_day_val is None:
411 new_weekly_day_val = 6
412 new_weekly_mark = _compute_weekly_mark(now, new_weekly_day_val, _weekly_time)
413 if new_weekly_mark != _last_weekly_mark:
414 weekly_mark_changed = True
415 _last_weekly_mark = new_weekly_mark
416
417 if not _entries:
418 return
419
420 submitted = False
421 for name, entry in _entries.items():
422 every = entry["every"]
423
424 # Only check entries matching the boundary that changed
425 if every == "hourly" and not hour_changed:
426 continue
427 if every == "daily" and not daily_mark_changed:
428 continue
429 if every == "weekly" and not weekly_mark_changed:
430 continue
431
432 if not _is_due(entry, _state.get(name), now):
433 continue
434
435 ref = f"sched:{name}:{now_ms()}"
436 cmd = entry["cmd"]
437
438 if _callosum:
439 ok = _callosum.emit("supervisor", "request", cmd=cmd, ref=ref)
440 if ok:
441 logger.info(
442 "Scheduled task submitted: %s → %s (ref=%s)",
443 name,
444 " ".join(cmd),
445 ref,
446 )
447 _state.setdefault(name, {})["last_run"] = time.time()
448 submitted = True
449 else:
450 logger.warning(
451 "Failed to emit scheduled task %s (callosum not connected)", name
452 )
453 else:
454 logger.warning("No callosum connection for scheduled task: %s", name)
455
456 if submitted:
457 try:
458 save_state()
459 except Exception as exc:
460 logger.warning("Failed to save scheduler state: %s", exc)
461
462
463def collect_status() -> list[dict[str, Any]]:
464 """Return schedule status for supervisor.status events."""
465 now = datetime.now()
466 result = []
467 for name, entry in _entries.items():
468 state_entry = _state.get(name)
469 last_run = (state_entry or {}).get("last_run")
470 entry_status = {
471 "name": name,
472 "every": entry["every"],
473 "last_run": last_run,
474 "due": _is_due(entry, state_entry, now),
475 }
476 entry_status["next_run"] = _compute_next_run(entry, state_entry, now)
477 if entry["every"] == "daily" and _daily_time:
478 entry_status["daily_time"] = _daily_time
479 if entry["every"] == "weekly":
480 if _weekly_day:
481 entry_status["weekly_day"] = _weekly_day
482 if _weekly_time:
483 entry_status["weekly_time"] = _weekly_time
484 result.append(entry_status)
485 return result
486
487
488def _compute_next_run(entry: dict, state_entry: dict | None, now: datetime) -> int:
489 """Compute next run time as epoch milliseconds."""
490 every = entry["every"]
491 if every == "hourly":
492 mark = _hour_mark(now)
493 nxt = mark if _is_due(entry, state_entry, now) else mark + timedelta(hours=1)
494 elif every == "daily":
495 mark = _compute_daily_mark(now, _daily_time)
496 nxt = mark if _is_due(entry, state_entry, now) else mark + timedelta(days=1)
497 elif every == "weekly":
498 weekly_day_val = _parse_weekly_day(_weekly_day)
499 if weekly_day_val is None:
500 weekly_day_val = 6
501 mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time)
502 nxt = mark if _is_due(entry, state_entry, now) else mark + timedelta(weeks=1)
503 else:
504 return int(now.timestamp() * 1000)
505 return int(nxt.timestamp() * 1000)
506
507
508# ---------------------------------------------------------------------------
509# CLI: sol schedule
510# ---------------------------------------------------------------------------
511
512
513def _format_timestamp(epoch: float | None) -> str:
514 """Format an epoch timestamp for display."""
515 if epoch is None:
516 return "never"
517 try:
518 return datetime.fromtimestamp(epoch).strftime("%Y-%m-%d %H:%M")
519 except (OSError, ValueError):
520 return "invalid"
521
522
523def _format_next_due(entry: dict, state_entry: dict | None, now: datetime) -> str:
524 """Format the next due time for display."""
525 if _is_due(entry, state_entry, now):
526 return "now"
527
528 every = entry["every"]
529 if every == "hourly":
530 nxt = _hour_mark(now) + timedelta(hours=1)
531 return nxt.strftime("%H:%M")
532 if every == "daily":
533 parsed = _parse_daily_time(_daily_time)
534 return f"{parsed[0]:02d}:{parsed[1]:02d}" if parsed else "midnight"
535 if every == "weekly":
536 weekly_day_val = _parse_weekly_day(_weekly_day)
537 if weekly_day_val is None:
538 weekly_day_val = 6
539 weekly_mark = _compute_weekly_mark(now, weekly_day_val, _weekly_time)
540 nxt = weekly_mark + timedelta(weeks=1)
541 return f"{nxt.strftime('%A')} {nxt.strftime('%H:%M')}"
542 return "?"
543
544
545def main() -> None:
546 """CLI entry point for sol schedule."""
547 parser = argparse.ArgumentParser(description="Show scheduled tasks")
548 setup_cli(parser)
549 require_solstone()
550
551 journal = Path(get_journal())
552 config_path = journal / "config" / "schedules.json"
553 state_path = journal / "health" / "scheduler.json"
554
555 # Load config (all entries, including disabled for display)
556 config: dict[str, Any] = {}
557 if config_path.exists():
558 try:
559 with open(config_path, "r", encoding="utf-8") as f:
560 config = json.load(f)
561 except (json.JSONDecodeError, OSError) as exc:
562 print(f"Error reading {config_path}: {exc}")
563 return
564
565 # Extract daily_time metadata before processing entries
566 global _daily_time, _weekly_day, _weekly_time
567 raw_daily_time = config.pop("daily_time", None)
568 _daily_time = raw_daily_time if isinstance(raw_daily_time, str) else None
569 raw_weekly_day = config.pop("weekly_day", None)
570 raw_weekly_time = config.pop("weekly_time", None)
571 _weekly_day = raw_weekly_day if isinstance(raw_weekly_day, str) else None
572 _weekly_time = raw_weekly_time if isinstance(raw_weekly_time, str) else None
573
574 if not config:
575 print("No schedules configured.")
576 print(f"\nAdd schedules to: {config_path}")
577 return
578
579 # Load state
580 state: dict[str, Any] = {}
581 if state_path.exists():
582 try:
583 with open(state_path, "r", encoding="utf-8") as f:
584 state = json.load(f)
585 except (json.JSONDecodeError, OSError):
586 pass
587
588 now = datetime.now()
589
590 # Compute column widths
591 names = list(config.keys())
592 name_width = max(max(len(n) for n in names), 4)
593 every_width = 8
594 last_run_width = 18
595 next_due_width = 10
596
597 # Header
598 header = (
599 f" {'NAME':<{name_width}} {'EVERY':<{every_width}} "
600 f"{'LAST RUN':<{last_run_width}} {'NEXT DUE':<{next_due_width}} CMD"
601 )
602 print(header)
603 print()
604
605 for name, raw_entry in sorted(config.items()):
606 if not isinstance(raw_entry, dict):
607 continue
608
609 every = raw_entry.get("every", "?")
610 cmd = raw_entry.get("cmd", [])
611 enabled = raw_entry.get("enabled", True)
612 state_entry = state.get(name)
613
614 last_run_str = _format_timestamp((state_entry or {}).get("last_run"))
615
616 # Build a validated entry for _is_due / _format_next_due
617 if every in INTERVALS and enabled:
618 entry = {"cmd": cmd, "every": every}
619 next_due_str = _format_next_due(entry, state_entry, now)
620 else:
621 next_due_str = "disabled" if not enabled else "?"
622
623 cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
624
625 tags = ""
626 if not enabled:
627 tags = " [disabled]"
628
629 line = (
630 f" {name:<{name_width}} {every:<{every_width}} "
631 f"{last_run_str:<{last_run_width}} {next_due_str:<{next_due_width}} {cmd_str}{tags}"
632 )
633 print(line.rstrip())
634
635 print()
636 print(f"Config: {config_path}")