personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(supervisor): add priority-based sequential scheduling for daily agents

Implement true priority-based scheduling where agents in the same priority group
run in parallel, but different priority groups execute sequentially. This ensures
higher priority agents (lower numbers) complete before lower priority ones start.

Key changes:
- Main loop now runs at 1-second intervals for responsiveness
- Subsystems manage their own timing internally (health checks, agent scheduling)
- Non-blocking state machine for scheduled agent execution
- 5-minute timeout per priority group prevents indefinite blocking
- Fast shutdown signal handling (1-second granularity)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

+161 -76
+3 -1
tests/test_supervisor.py
··· 170 170 mod.shutdown_requested = False 171 171 172 172 health_states = [["hear"], []] 173 + time_counter = iter([0.0, 1.0, 2.0, 3.0, 4.0]) # Incrementing time for health check timing 173 174 174 175 def fake_check_health(threshold): 175 176 state = health_states.pop(0) ··· 180 181 monkeypatch.setattr(mod, "check_runner_exits", lambda procs: []) 181 182 monkeypatch.setattr(mod, "check_health", fake_check_health) 182 183 monkeypatch.setattr(mod, "send_notification", lambda *args, **kwargs: None) 183 - monkeypatch.setattr(mod.time, "time", lambda: 0) 184 + monkeypatch.setattr(mod, "check_scheduled_agents", lambda: None) # Mock scheduled agents 185 + monkeypatch.setattr(mod.time, "time", lambda: next(time_counter)) 184 186 monkeypatch.setattr(mod.time, "sleep", lambda _: None) 185 187 186 188 monkeypatch.setenv("JOURNAL_PATH", "/test/journal")
+158 -75
think/supervisor.py
··· 20 20 # Global shutdown flag 21 21 shutdown_requested = False 22 22 23 + # State for scheduled agent execution 24 + _scheduled_state = { 25 + "pending_groups": [], # List of (priority, [(persona_id, config, yesterday)]) 26 + "active_files": [], # List of Path objects for current priority group 27 + "start_time": 0, # When current group started 28 + } 29 + 23 30 24 31 def _get_journal_path() -> Path: 25 32 journal = os.getenv("JOURNAL_PATH") ··· 208 215 209 216 210 217 def spawn_scheduled_agents() -> None: 211 - """Spawn agents that have schedule:daily in their metadata.""" 218 + """Prepare scheduled agents grouped by priority for sequential execution.""" 212 219 try: 213 220 # Calculate yesterday's date 214 221 yesterday = (datetime.now().date() - timedelta(days=1)).strftime("%Y-%m-%d") 215 222 216 223 agents = get_agents() 217 224 218 - # Filter and sort scheduled agents by priority 219 - scheduled_agents = [] 225 + # Group agents by priority 226 + priority_groups: dict[int, list[tuple[str, dict]]] = {} 220 227 for persona_id, config in agents.items(): 221 228 if config.get("schedule") == "daily": 222 - # Default priority is 50 if not specified 223 229 priority = config.get("priority", 50) 224 - scheduled_agents.append((priority, persona_id, config)) 230 + priority_groups.setdefault(priority, []).append((persona_id, config)) 231 + 232 + # Store sorted groups in state for sequential processing 233 + _scheduled_state["pending_groups"] = [ 234 + ( 235 + priority, 236 + [(persona_id, config, yesterday) for persona_id, config in agents_list], 237 + ) 238 + for priority, agents_list in sorted(priority_groups.items()) 239 + ] 240 + _scheduled_state["active_files"] = [] 241 + _scheduled_state["start_time"] = 0 242 + 243 + total_agents = sum( 244 + len(agents_list) for _, agents_list in _scheduled_state["pending_groups"] 245 + ) 246 + logging.info( 247 + f"Prepared {len(_scheduled_state['pending_groups'])} priority groups " 248 + f"with {total_agents} total agents" 249 + ) 250 + except Exception as e: 251 + logging.error(f"Failed to prepare scheduled agents: {e}") 225 252 226 - # Sort by priority (lower number = higher priority = runs first) 227 - scheduled_agents.sort(key=lambda x: x[0]) 253 + 254 + def check_scheduled_agents() -> None: 255 + """Check and advance scheduled agent execution (non-blocking). 228 256 229 - # Execute agents in priority order 230 - for priority, persona_id, config in scheduled_agents: 231 - logging.info(f"Spawning scheduled agent: {persona_id} (priority: {priority})") 257 + Called from the main supervise loop to incrementally process priority groups. 258 + Each priority group completes before the next begins. 259 + """ 260 + state = _scheduled_state 261 + 262 + # Nothing to do if no pending groups and no active agents 263 + if not state["pending_groups"] and not state["active_files"]: 264 + return 232 265 233 - # Check if this is a multi-domain agent 234 - if config.get("multi_domain"): 235 - domains = get_domains() 236 - for domain_name in domains.keys(): 237 - logging.info(f"Spawning {persona_id} for domain: {domain_name}") 266 + # Check if current priority group is done 267 + if state["active_files"]: 268 + all_done = not any(f.exists() for f in state["active_files"]) 269 + timed_out = ( 270 + time.time() - state["start_time"] 271 + ) > 300 # 5 minute timeout per group 272 + 273 + if all_done: 274 + logging.info("Priority group completed") 275 + state["active_files"] = [] 276 + elif timed_out: 277 + logging.warning( 278 + "Priority group timed out after 300s, proceeding to next group" 279 + ) 280 + state["active_files"] = [] 281 + else: 282 + return # Still running, check again next iteration 283 + 284 + # Check for shutdown before starting next group 285 + if shutdown_requested: 286 + state["pending_groups"] = [] 287 + state["active_files"] = [] 288 + return 289 + 290 + # Spawn next priority group 291 + if state["pending_groups"]: 292 + priority, agents_list = state["pending_groups"].pop(0) 293 + logging.info(f"Starting priority {priority} agents ({len(agents_list)} agents)") 294 + 295 + active_files = [] 296 + for persona_id, config, yesterday in agents_list: 297 + try: 298 + # Check if this is a multi-domain agent 299 + if config.get("multi_domain"): 300 + domains = get_domains() 301 + for domain_name in domains.keys(): 302 + logging.info(f"Spawning {persona_id} for domain: {domain_name}") 303 + request_file = cortex_request( 304 + prompt=f"You are processing domain '{domain_name}' for yesterday ({yesterday}), use get_domain('{domain_name}') to load the correct context before starting.", 305 + persona=persona_id, 306 + ) 307 + active_files.append(Path(request_file)) 308 + agent_id = Path(request_file).stem.replace("_active", "") 309 + logging.info( 310 + f"Started {persona_id} for {domain_name} (ID: {agent_id})" 311 + ) 312 + else: 313 + # Regular single-instance agent 238 314 request_file = cortex_request( 239 - prompt=f"You are processing domain '{domain_name}' for yesterday ({yesterday}), use get_domain('{domain_name}') to load the correct context before starting.", 240 - persona=persona_id 315 + prompt=f"Running daily scheduled task for {persona_id}, yesterday was {yesterday}.", 316 + persona=persona_id, 241 317 ) 242 - # Extract agent_id from the filename 318 + active_files.append(Path(request_file)) 243 319 agent_id = Path(request_file).stem.replace("_active", "") 244 - logging.info(f"Started {persona_id} agent for domain {domain_name} (ID: {agent_id})") 245 - else: 246 - # Regular single-instance agent 247 - # Spawn via Cortex - it will load and merge the persona config 248 - request_file = cortex_request( 249 - prompt=f"Running daily scheduled task for {persona_id}, yesterday was {yesterday}.", 250 - persona=persona_id, 251 - ) 252 - # Extract agent_id from the filename 253 - agent_id = Path(request_file).stem.replace("_active", "") 254 - logging.info(f"Started {persona_id} agent (ID: {agent_id})") 255 - except Exception as e: 256 - logging.error(f"Failed to spawn scheduled agents: {e}") 320 + logging.info(f"Started {persona_id} agent (ID: {agent_id})") 321 + except Exception as e: 322 + logging.error(f"Failed to spawn {persona_id}: {e}") 323 + 324 + state["active_files"] = active_files 325 + state["start_time"] = time.time() 257 326 258 327 259 328 def start_runners() -> list[ManagedProcess]: ··· 301 370 daily: bool = True, 302 371 procs: list[ManagedProcess] | None = None, 303 372 ) -> None: 304 - """Monitor heartbeat files and alert when they become stale.""" 373 + """Monitor heartbeat files and alert when they become stale. 374 + 375 + Main supervision loop runs at 1-second intervals for responsiveness. 376 + Subsystems manage their own timing (health checks every interval seconds, 377 + scheduled agents check continuously but only advance when ready). 378 + """ 305 379 global shutdown_requested 306 380 last_day = datetime.now().date() 381 + last_health_check = 0.0 # Track last health check time 307 382 alert_state = {} # Track {issue_key: (last_alert_time, backoff_seconds)} 308 383 prev_stale: set[str] = set() 309 384 initial_backoff = 60 # Start with 1 minute ··· 357 432 358 433 if managed.restart and not shutdown_requested: 359 434 policy = _get_restart_policy(managed.name) 360 - uptime = time.time() - policy.last_start if policy.last_start else 0 435 + uptime = ( 436 + time.time() - policy.last_start if policy.last_start else 0 437 + ) 361 438 if uptime >= 60: 362 439 policy.reset_attempts() 363 440 delay = policy.next_delay() ··· 380 457 log_name=managed.logger.path.stem, 381 458 ) 382 459 except Exception as exc: # pragma: no cover - defensive 383 - logging.exception("Failed to restart %s: %s", managed.name, exc) 460 + logging.exception( 461 + "Failed to restart %s: %s", managed.name, exc 462 + ) 384 463 continue 385 464 386 465 insert_at = index if index is not None else len(procs) ··· 391 470 else: 392 471 logging.info("Not restarting %s", managed.name) 393 472 394 - stale = check_health(threshold) 395 - stale_set = set(stale) 473 + # Check health periodically (interval-based timing) 474 + now = time.time() 475 + if now - last_health_check >= interval: 476 + stale = check_health(threshold) 477 + stale_set = set(stale) 396 478 397 - recovered = sorted(prev_stale - stale_set) 398 - for name in recovered: 399 - logging.info("%s heartbeat recovered", name) 479 + recovered = sorted(prev_stale - stale_set) 480 + for name in recovered: 481 + logging.info("%s heartbeat recovered", name) 400 482 401 - if stale_set: 402 - msg = f"Journaling offline: {', '.join(sorted(stale_set))}" 403 - logging.warning(msg) 483 + if stale_set: 484 + msg = f"Journaling offline: {', '.join(sorted(stale_set))}" 485 + logging.warning(msg) 404 486 405 - # Apply exponential backoff 406 - stale_key = ("stale", tuple(sorted(stale_set))) 407 - now = time.time() 487 + # Apply exponential backoff 488 + stale_key = ("stale", tuple(sorted(stale_set))) 408 489 409 - if stale_key in alert_state: 410 - last_time, backoff = alert_state[stale_key] 411 - if now - last_time >= backoff: 412 - send_notification(msg, command) 413 - # Double the backoff for next time, up to max 414 - alert_state[stale_key] = (now, min(backoff * 2, max_backoff)) 415 - logging.info( 416 - f"Alert sent, next backoff: {min(backoff * 2, max_backoff)}s" 417 - ) 490 + if stale_key in alert_state: 491 + last_time, backoff = alert_state[stale_key] 492 + if now - last_time >= backoff: 493 + send_notification(msg, command) 494 + # Double the backoff for next time, up to max 495 + alert_state[stale_key] = (now, min(backoff * 2, max_backoff)) 496 + logging.info( 497 + f"Alert sent, next backoff: {min(backoff * 2, max_backoff)}s" 498 + ) 499 + else: 500 + remaining = int(backoff - (now - last_time)) 501 + logging.info(f"Suppressing alert, next in {remaining}s") 418 502 else: 419 - remaining = int(backoff - (now - last_time)) 420 - logging.info(f"Suppressing alert, next in {remaining}s") 503 + send_notification(msg, command) 504 + alert_state[stale_key] = (now, initial_backoff) 505 + # Retain only alert state entries still relevant 506 + alert_state = { 507 + k: v 508 + for k, v in alert_state.items() 509 + if k[0] != "stale" or set(k[1]).issubset(stale_set) 510 + } 421 511 else: 422 - send_notification(msg, command) 423 - alert_state[stale_key] = (now, initial_backoff) 424 - # Retain only alert state entries still relevant 425 - alert_state = { 426 - k: v 427 - for k, v in alert_state.items() 428 - if k[0] != "stale" or set(k[1]).issubset(stale_set) 429 - } 430 - else: 431 - if prev_stale: 432 - logging.info("Heartbeat OK") 433 - # Clear alert state for stale services when they recover 434 - alert_state = {k: v for k, v in alert_state.items() if k[0] != "stale"} 512 + if prev_stale: 513 + logging.info("Heartbeat OK") 514 + # Clear alert state for stale services when they recover 515 + alert_state = {k: v for k, v in alert_state.items() if k[0] != "stale"} 435 516 436 - prev_stale = stale_set 517 + prev_stale = stale_set 518 + last_health_check = now 437 519 520 + # Check for daily processing (fast date comparison) 438 521 if daily and datetime.now().date() != last_day: 439 522 if run_process_day(): 440 523 spawn_scheduled_agents() 441 524 last_day = datetime.now().date() 442 525 443 - # Use shorter sleep intervals to check for shutdown 444 - for _ in range(interval): 445 - if shutdown_requested: 446 - break 447 - time.sleep(1) 526 + # Advance scheduled agent execution (non-blocking) 527 + check_scheduled_agents() 528 + 529 + # Sleep 1 second before next iteration (responsive to shutdown) 530 + time.sleep(1) 448 531 449 532 450 533 def parse_args() -> argparse.ArgumentParser: