personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(plaud,supervisor): timeout stalled S3 streams; cap scheduled task runtime

Two coupled fixes for the 2026-04-28 wedge where a half-closed S3 socket
left download_to_file blocked inside iter_content for 7+ hours, coalescing
seven sched:sync:plaud refs onto one frozen task.

think/importers/plaud.py:
- download_to_file: switch session.get(stream=True) to (connect=30s,
read=45s) tuple timeout so per-chunk stalls fail fast. Wrap streaming
in RequestException handling that returns False with a clear log.
- Add throttled progress_cb (~5s) into iter_content; PlaudBackend.sync
passes a closure that refreshes last_completed mid-download so the
outer inactivity timer trips on per-step progress, not just on
per-file completion.

think/supervisor.py + think/scheduler.py + think/utils.py:
- New TaskQueue.set_cap / enforce_deadlines: cmd_name-keyed runtime
caps. supervise() ticks enforce_deadlines once per second; on cap
breach it sends non-blocking SIGTERM (raw send_signal, not
RunnerManagedProcess.terminate which blocks on wait(timeout=15)),
records the termination start, then escalates to SIGKILL after 15s
via a sidecar dict. Mirrors the existing _restart_requests pattern.
- scheduler.load_config parses optional max_runtime per entry via
new shared parse_duration_seconds helper (accepts int, Ns, Nm, Nh).
Invalid values warn and drop the cap; the entry still loads.
- scheduler.collect_runtime_caps feeds caps into the TaskQueue at
supervisor boot, keyed on TaskQueue.get_command_name(cmd).

Caps are off by default; absent max_runtime means no cap. Note: no
historical sync:plaud import logs were reachable from this worktree to
tune the schedule value. Operators can set "max_runtime": "30m" on the
sync:plaud entry in journal/config/schedules.json — comfortably above
normal sync duration, well below the 1h hourly cadence.

Tests: new tests/test_plaud_download.py covers read-timeout failure,
throttled progress callback, and the inactivity-timer trip when sync
makes no progress. tests/test_utils.py covers parse_duration_seconds.
tests/test_supervisor.py covers set_cap, SIGTERM on deadline, SIGKILL
escalation after 15s, termination-state cleanup on ref exit, and the
no-cap noop. tests/test_scheduler.py covers max_runtime parsing
(valid string, valid int, invalid negative/garbage/wrong-type), and
collect_runtime_caps filtering.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+522 -36
+108
tests/test_plaud_download.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import logging 5 + from unittest.mock import Mock 6 + 7 + import pytest 8 + import requests 9 + 10 + 11 + class _Response: 12 + status_code = 200 13 + headers = {"Content-Length": "0"} 14 + text = "" 15 + 16 + def __init__(self, chunks=None, exc=None): 17 + self._chunks = chunks or [] 18 + self._exc = exc 19 + 20 + def __enter__(self): 21 + return self 22 + 23 + def __exit__(self, exc_type, exc, tb): 24 + return False 25 + 26 + def iter_content(self, chunk_size): 27 + if self._exc: 28 + raise self._exc 29 + yield from self._chunks 30 + 31 + 32 + @pytest.mark.timeout(5) 33 + def test_download_to_file_returns_false_on_read_timeout(tmp_path, caplog): 34 + from think.importers.plaud import download_to_file 35 + 36 + session = Mock() 37 + session.get.return_value = _Response(exc=requests.exceptions.ReadTimeout("stalled")) 38 + dest_path = tmp_path / "recording.opus" 39 + 40 + caplog.set_level(logging.WARNING) 41 + 42 + assert download_to_file(session, "https://example.test/file", dest_path) is False 43 + assert not dest_path.exists() 44 + assert "Plaud download for recording.opus failed: stalled" in caplog.text 45 + 46 + 47 + def test_download_to_file_calls_progress_cb_throttled(tmp_path, monkeypatch): 48 + from think.importers import plaud 49 + 50 + session = Mock() 51 + session.get.return_value = _Response(chunks=[b"x"] * 12) 52 + progress_cb = Mock() 53 + ticks = iter(range(13)) 54 + monkeypatch.setattr(plaud.time, "monotonic", lambda: next(ticks)) 55 + 56 + dest_path = tmp_path / "recording.opus" 57 + 58 + assert ( 59 + plaud.download_to_file( 60 + session, 61 + "https://example.test/file", 62 + dest_path, 63 + progress_cb=progress_cb, 64 + ) 65 + is True 66 + ) 67 + assert dest_path.read_bytes() == b"x" * 12 68 + assert progress_cb.call_count == 2 69 + 70 + 71 + def test_sync_inactivity_timer_trips_when_progress_stops(tmp_path, monkeypatch, caplog): 72 + from think.importers import plaud 73 + 74 + files = [ 75 + { 76 + "id": "file1", 77 + "filename": "One", 78 + "fullname": "one.opus", 79 + "filesize": 10, 80 + "start_time": 1737000000000, 81 + "duration": 60000, 82 + }, 83 + { 84 + "id": "file2", 85 + "filename": "Two", 86 + "fullname": "two.opus", 87 + "filesize": 10, 88 + "start_time": 1737000300000, 89 + "duration": 60000, 90 + }, 91 + ] 92 + 93 + monkeypatch.setenv("PLAUD_ACCESS_TOKEN", "test-token") 94 + monkeypatch.setattr(plaud, "SYNC_INACTIVITY_TIMEOUT", 1) 95 + monkeypatch.setattr(plaud, "list_files", lambda _session, _token: files) 96 + monkeypatch.setattr( 97 + plaud, "get_temp_url", lambda *_args: "https://example.test/file" 98 + ) 99 + monkeypatch.setattr(plaud, "download_to_file", lambda *_args, **_kwargs: False) 100 + ticks = iter([0.0, 0.5, 2.0]) 101 + monkeypatch.setattr(plaud.time, "monotonic", lambda: next(ticks)) 102 + 103 + caplog.set_level(logging.WARNING) 104 + 105 + result = plaud.PlaudBackend().sync(tmp_path, dry_run=False) 106 + 107 + assert any("Sync stalled" in error for error in result["errors"]) 108 + assert "Sync stalled" in caplog.text
+110
tests/test_scheduler.py
··· 158 158 159 159 assert load_config() == {} 160 160 161 + def test_max_runtime_valid_string_round_trips(self, journal_path): 162 + # D-E/D-F: assert the accepted Plaud cap via test-local config, 163 + # leaving the synthetic fixture schedule minimal. 164 + _write_config( 165 + journal_path, 166 + { 167 + "sync:plaud": { 168 + "cmd": ["sol", "import", "--sync", "plaud"], 169 + "every": "hourly", 170 + "max_runtime": "30m", 171 + }, 172 + }, 173 + ) 174 + from think.scheduler import load_config 175 + 176 + entries = load_config() 177 + assert entries["sync:plaud"]["max_runtime"] == 1800 178 + 179 + def test_max_runtime_valid_int_round_trips(self, journal_path): 180 + _write_config( 181 + journal_path, 182 + { 183 + "sync:plaud": { 184 + "cmd": ["sol", "import", "--sync", "plaud"], 185 + "every": "hourly", 186 + "max_runtime": 1800, 187 + }, 188 + }, 189 + ) 190 + from think.scheduler import load_config 191 + 192 + entries = load_config() 193 + assert entries["sync:plaud"]["max_runtime"] == 1800 194 + 195 + def test_max_runtime_invalid_negative_logged_and_dropped( 196 + self, journal_path, caplog 197 + ): 198 + _write_config( 199 + journal_path, 200 + { 201 + "sync:plaud": { 202 + "cmd": ["sol", "import", "--sync", "plaud"], 203 + "every": "hourly", 204 + "max_runtime": -5, 205 + }, 206 + }, 207 + ) 208 + from think.scheduler import load_config 209 + 210 + entries = load_config() 211 + assert "max_runtime" not in entries["sync:plaud"] 212 + assert "Schedule 'sync:plaud': invalid max_runtime -5" in caplog.text 213 + 214 + def test_max_runtime_invalid_garbage_logged_and_dropped(self, journal_path, caplog): 215 + _write_config( 216 + journal_path, 217 + { 218 + "sync:plaud": { 219 + "cmd": ["sol", "import", "--sync", "plaud"], 220 + "every": "hourly", 221 + "max_runtime": "garbage", 222 + }, 223 + }, 224 + ) 225 + from think.scheduler import load_config 226 + 227 + entries = load_config() 228 + assert "max_runtime" not in entries["sync:plaud"] 229 + assert "Schedule 'sync:plaud': invalid max_runtime 'garbage'" in caplog.text 230 + 231 + def test_max_runtime_invalid_type_logged_and_dropped(self, journal_path, caplog): 232 + _write_config( 233 + journal_path, 234 + { 235 + "sync:plaud": { 236 + "cmd": ["sol", "import", "--sync", "plaud"], 237 + "every": "hourly", 238 + "max_runtime": [1, 2], 239 + }, 240 + }, 241 + ) 242 + from think.scheduler import load_config 243 + 244 + entries = load_config() 245 + assert "max_runtime" not in entries["sync:plaud"] 246 + assert "Schedule 'sync:plaud': invalid max_runtime [1, 2]" in caplog.text 247 + 248 + def test_collect_runtime_caps_returns_only_capped_entries(self, journal_path): 249 + _write_config( 250 + journal_path, 251 + { 252 + "sync:plaud": { 253 + "cmd": ["sol", "import", "--sync", "plaud"], 254 + "every": "hourly", 255 + "max_runtime": "30m", 256 + }, 257 + "heartbeat": { 258 + "cmd": ["sol", "heartbeat"], 259 + "every": "daily", 260 + }, 261 + }, 262 + ) 263 + import think.scheduler as mod 264 + 265 + mod.init(Mock()) 266 + 267 + assert mod.collect_runtime_caps() == [ 268 + (["sol", "import", "--sync", "plaud"], 1800) 269 + ] 270 + 161 271 162 272 # --------------------------------------------------------------------------- 163 273 # load_state / save_state
+84
tests/test_supervisor.py
··· 4 4 import importlib 5 5 import io 6 6 import os 7 + import signal 7 8 import subprocess 8 9 import sys 9 10 from unittest.mock import MagicMock ··· 549 550 550 551 # Old queued entries should still be in queue (stale clear only removes _running) 551 552 assert len(mod._task_queue._queues["indexer"]) == 1 553 + 554 + 555 + class _TaskProcessStub: 556 + def __init__(self): 557 + self.send_signal = MagicMock() 558 + self.kill = MagicMock() 559 + 560 + 561 + class _TaskManagedStub: 562 + def __init__(self, *, cmd, start_time=100.0): 563 + self.cmd = cmd 564 + self.start_time = start_time 565 + self.process = _TaskProcessStub() 566 + 567 + 568 + def test_taskqueue_set_cap_records_cap(): 569 + mod = importlib.import_module("think.supervisor") 570 + queue = mod.TaskQueue(on_queue_change=None) 571 + 572 + queue.set_cap("import", 1800) 573 + 574 + assert queue._caps["import"] == 1800 575 + 576 + 577 + def test_enforce_deadlines_sends_sigterm_when_elapsed_exceeds_cap(caplog): 578 + mod = importlib.import_module("think.supervisor") 579 + queue = mod.TaskQueue(on_queue_change=None) 580 + managed = _TaskManagedStub( 581 + cmd=["sol", "import", "--sync", "plaud", "--save"], 582 + start_time=100.0, 583 + ) 584 + queue._active["ref-1"] = managed 585 + queue.set_cap("import", 50) 586 + 587 + caplog.set_level("WARNING") 588 + queue.enforce_deadlines(200.0) 589 + 590 + managed.process.send_signal.assert_called_once_with(signal.SIGTERM) 591 + assert queue._terminations["ref-1"] == 200.0 592 + assert ( 593 + "Task import (cmd=sol import --sync plaud --save, ref=ref-1) exceeded " 594 + "max_runtime of 50s (elapsed=100s); sending SIGTERM" 595 + ) in caplog.text 596 + 597 + 598 + def test_enforce_deadlines_escalates_to_sigkill_after_15s(caplog): 599 + mod = importlib.import_module("think.supervisor") 600 + queue = mod.TaskQueue(on_queue_change=None) 601 + managed = _TaskManagedStub(cmd=["sol", "import"], start_time=100.0) 602 + queue._active["ref-1"] = managed 603 + queue._terminations["ref-1"] = 200.0 604 + 605 + caplog.set_level("WARNING") 606 + queue.enforce_deadlines(216.0) 607 + 608 + managed.process.kill.assert_called_once_with() 609 + assert queue._terminations["ref-1"] == 0.0 610 + assert ( 611 + "Task import (ref=ref-1) did not exit 15s after SIGTERM; sending SIGKILL" 612 + ) in caplog.text 613 + 614 + 615 + def test_enforce_deadlines_clears_termination_state_when_ref_exits(): 616 + mod = importlib.import_module("think.supervisor") 617 + queue = mod.TaskQueue(on_queue_change=None) 618 + queue._terminations["ref-1"] = 200.0 619 + 620 + queue.enforce_deadlines(216.0) 621 + 622 + assert "ref-1" not in queue._terminations 623 + 624 + 625 + def test_enforce_deadlines_noop_when_no_cap(): 626 + mod = importlib.import_module("think.supervisor") 627 + queue = mod.TaskQueue(on_queue_change=None) 628 + managed = _TaskManagedStub(cmd=["sol", "import"], start_time=100.0) 629 + queue._active["ref-1"] = managed 630 + 631 + queue.enforce_deadlines(10_000.0) 632 + 633 + managed.process.send_signal.assert_not_called() 634 + managed.process.kill.assert_not_called() 635 + assert queue._terminations == {} 552 636 553 637 554 638 def test_supervisor_singleton_lock_acquired(tmp_path, monkeypatch):
+36
tests/test_utils.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import pytest 5 + 6 + from think.utils import parse_duration_seconds 7 + 8 + 9 + @pytest.mark.parametrize( 10 + ("spec", "expected"), 11 + [ 12 + (30, 30), 13 + ("45s", 45), 14 + ("30m", 1800), 15 + ("1h", 3600), 16 + ], 17 + ) 18 + def test_parse_duration_seconds_valid(spec, expected): 19 + assert parse_duration_seconds(spec) == expected 20 + 21 + 22 + @pytest.mark.parametrize( 23 + "spec", 24 + [ 25 + 0, 26 + -5, 27 + "garbage", 28 + "5x", 29 + "30 m", 30 + None, 31 + [], 32 + ], 33 + ) 34 + def test_parse_duration_seconds_invalid(spec): 35 + with pytest.raises(ValueError, match="invalid duration"): 36 + parse_duration_seconds(spec)
+59 -32
think/importers/plaud.py
··· 13 13 import tempfile 14 14 import time 15 15 from pathlib import Path 16 - from typing import Any, Dict, List, Optional 16 + from typing import Any, Callable, Dict, List, Optional 17 17 18 18 import requests 19 19 from requests.adapters import HTTPAdapter ··· 25 25 26 26 # Skip recordings shorter than this (milliseconds) 27 27 MIN_DURATION_MS = 30_000 28 + SYNC_INACTIVITY_TIMEOUT = 3600 28 29 29 30 30 31 def make_session() -> requests.Session: ··· 163 164 164 165 165 166 def download_to_file( 166 - session: requests.Session, url: str, dest_path: pathlib.Path 167 + session: requests.Session, 168 + url: str, 169 + dest_path: pathlib.Path, 170 + progress_cb: Callable[[], None] | None = None, 167 171 ) -> bool: 168 172 """Stream-download URL to dest_path atomically.""" 169 173 dest_path.parent.mkdir(parents=True, exist_ok=True) 170 - with session.get(url, stream=True, timeout=60) as r: 171 - if r.status_code != 200: 172 - logger.warning( 173 - "[%s] Download error %s: %s", 174 - dest_path.stem, 175 - r.status_code, 176 - r.text[:200], 177 - ) 178 - return False 179 - total = int(r.headers.get("Content-Length", "0")) or None 180 - # Write to a temp file then atomically move 181 - with tempfile.NamedTemporaryFile( 182 - dir=str(dest_path.parent), delete=False 183 - ) as tmp: 184 - tmp_path = pathlib.Path(tmp.name) 185 - try: 186 - downloaded = 0 187 - for chunk in r.iter_content(chunk_size=1024 * 256): 188 - if not chunk: 189 - continue 190 - tmp.write(chunk) 191 - downloaded += len(chunk) 192 - tmp.flush() 193 - os.fsync(tmp.fileno()) 194 - except Exception as e: 195 - tmp.close() 196 - tmp_path.unlink(missing_ok=True) 197 - logger.warning("[%s] Error while writing file: %s", dest_path.stem, e) 174 + tmp_path: pathlib.Path | None = None 175 + try: 176 + # Design §4-§5: fail stalled streaming reads and refresh outer progress. 177 + with session.get(url, stream=True, timeout=(30, 45)) as r: 178 + if r.status_code != 200: 179 + logger.warning( 180 + "[%s] Download error %s: %s", 181 + dest_path.stem, 182 + r.status_code, 183 + r.text[:200], 184 + ) 198 185 return False 186 + total = int(r.headers.get("Content-Length", "0")) or None 187 + # Write to a temp file then atomically move 188 + with tempfile.NamedTemporaryFile( 189 + dir=str(dest_path.parent), delete=False 190 + ) as tmp: 191 + tmp_path = pathlib.Path(tmp.name) 192 + try: 193 + downloaded = 0 194 + last_refresh = time.monotonic() 195 + for chunk in r.iter_content(chunk_size=1024 * 256): 196 + if not chunk: 197 + continue 198 + tmp.write(chunk) 199 + downloaded += len(chunk) 200 + now = time.monotonic() 201 + if progress_cb is not None and now - last_refresh >= 5: 202 + progress_cb() 203 + last_refresh = now 204 + tmp.flush() 205 + os.fsync(tmp.fileno()) 206 + except requests.exceptions.RequestException: 207 + raise 208 + except (IOError, OSError) as e: 209 + tmp.close() 210 + tmp_path.unlink(missing_ok=True) 211 + logger.warning( 212 + "[%s] Error while writing file: %s", dest_path.stem, e 213 + ) 214 + return False 215 + except requests.exceptions.RequestException as exc: 216 + if tmp_path is not None: 217 + tmp_path.unlink(missing_ok=True) 218 + logger.warning("Plaud download for %s failed: %s", dest_path.name, exc) 219 + return False 199 220 200 221 tmp_path.replace(dest_path) 201 222 size_info = f" ({total} bytes)" if total else "" ··· 433 454 # within this window. The inner import process has its own 434 455 # 600s inactivity timeout for stall detection, so this is a 435 456 # generous outer safety net. 436 - sync_timeout = 3600 457 + sync_timeout = SYNC_INACTIVITY_TIMEOUT 437 458 last_completed = time.monotonic() 459 + 460 + def _refresh() -> None: 461 + nonlocal last_completed 462 + last_completed = time.monotonic() 438 463 439 464 for idx, (file_id, info) in enumerate(to_process, 1): 440 465 # Check inactivity timeout (time since last file completed) ··· 496 521 errors.append(msg) 497 522 continue 498 523 499 - if not download_to_file(session, temp_url, dest_path): 524 + if not download_to_file( 525 + session, temp_url, dest_path, progress_cb=_refresh 526 + ): 500 527 msg = f"{filename}: download failed" 501 528 logger.warning(" FAILED — %s", msg) 502 529 errors.append(msg)
+32 -2
think/scheduler.py
··· 22 22 from pathlib import Path 23 23 from typing import Any 24 24 25 - from think.utils import get_journal, now_ms, require_solstone, setup_cli 25 + from think.utils import ( 26 + get_journal, 27 + now_ms, 28 + parse_duration_seconds, 29 + require_solstone, 30 + setup_cli, 31 + ) 26 32 27 33 logger = logging.getLogger(__name__) 28 34 ··· 125 131 if not entry.get("enabled", True): 126 132 continue 127 133 128 - entries[name] = {"cmd": cmd, "every": every} 134 + validated = {"cmd": cmd, "every": every} 135 + max_runtime = entry.get("max_runtime") 136 + if max_runtime is not None: 137 + # D-C / design §2: preserve caps for TaskQueue registration, 138 + # not as extra supervisor.request payload fields. 139 + try: 140 + validated["max_runtime"] = parse_duration_seconds(max_runtime) 141 + except ValueError: 142 + logger.warning( 143 + "Schedule '%s': invalid max_runtime %r, dropping cap", 144 + name, 145 + max_runtime, 146 + ) 147 + 148 + entries[name] = validated 129 149 130 150 return entries 131 151 ··· 303 323 ) 304 324 else: 305 325 logger.info("Scheduler initialized (no schedules configured)") 326 + 327 + 328 + def collect_runtime_caps() -> list[tuple[list[str], int]]: 329 + """Return configured task runtime caps from loaded schedule entries.""" 330 + caps: list[tuple[list[str], int]] = [] 331 + for entry in _entries.values(): 332 + max_runtime = entry.get("max_runtime") 333 + if max_runtime is not None: 334 + caps.append((list(entry["cmd"]), max_runtime)) 335 + return caps 306 336 307 337 308 338 def register_defaults() -> None:
+75 -2
think/supervisor.py
··· 197 197 ] = {} # command_name -> {"ref": str, "thread": Thread} 198 198 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts 199 199 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process 200 + self._caps: dict[str, int] = {} 201 + self._terminations: dict[str, float] = {} 200 202 self._pending: list[dict] = [] 201 203 self._ready = ready 202 204 self._lock = threading.Lock() ··· 319 321 320 322 return None 321 323 324 + def set_cap(self, cmd_name: str, seconds: int) -> None: 325 + """Set a max runtime cap in seconds for a queued command name.""" 326 + with self._lock: 327 + self._caps[cmd_name] = seconds 328 + 329 + def enforce_deadlines(self, now: float) -> None: 330 + """Enforce configured task runtime caps without blocking the supervisor tick.""" 331 + with self._lock: 332 + active_refs = set(self._active) 333 + for ref in list(self._terminations): 334 + if ref not in active_refs: 335 + self._terminations.pop(ref, None) 336 + 337 + for ref, managed in list(self._active.items()): 338 + cmd_name = self.get_command_name(managed.cmd) 339 + termination_started = self._terminations.get(ref) 340 + if termination_started is not None: 341 + if termination_started <= 0: 342 + continue 343 + if now - termination_started >= 15: 344 + logging.warning( 345 + "Task %s (ref=%s) did not exit 15s after SIGTERM; " 346 + "sending SIGKILL", 347 + cmd_name, 348 + ref, 349 + ) 350 + try: 351 + managed.process.kill() 352 + except (ProcessLookupError, OSError): 353 + pass 354 + self._terminations[ref] = 0.0 355 + continue 356 + 357 + cap = self._caps.get(cmd_name) 358 + if not cap: 359 + continue 360 + 361 + elapsed = now - managed.start_time 362 + if elapsed <= cap: 363 + continue 364 + 365 + elapsed_seconds = int(elapsed) 366 + logging.warning( 367 + "Task %s (cmd=%s, ref=%s) exceeded max_runtime of %ds " 368 + "(elapsed=%ds); sending SIGTERM", 369 + cmd_name, 370 + " ".join(managed.cmd), 371 + ref, 372 + cap, 373 + elapsed_seconds, 374 + ) 375 + try: 376 + managed.process.send_signal(signal.SIGTERM) 377 + self._terminations[ref] = now 378 + except (ProcessLookupError, OSError): 379 + pass 380 + 322 381 def set_ready(self) -> None: 323 382 """Allow buffered tasks to start dispatching through the normal queue path.""" 324 383 with self._lock: ··· 365 424 managed = RunnerManagedProcess.spawn( 366 425 cmd, ref=primary_ref, callosum=callosum, day=day 367 426 ) 368 - self._active[primary_ref] = managed 427 + with self._lock: 428 + self._active[primary_ref] = managed 369 429 370 430 callosum.emit( 371 431 "supervisor", ··· 415 475 managed.cleanup() 416 476 except Exception: 417 477 logging.exception(f"Task {cmd_name} ({primary_ref}): cleanup failed") 418 - self._active.pop(primary_ref, None) 478 + with self._lock: 479 + self._active.pop(primary_ref, None) 419 480 try: 420 481 callosum.stop() 421 482 except Exception: ··· 1383 1444 logging.error(f"Failed to kill {service}: {e}") 1384 1445 # Don't delete here - let handle_runner_exits clean up 1385 1446 1447 + if _task_queue: 1448 + _task_queue.enforce_deadlines(time.time()) 1449 + 1386 1450 # Check for runner exits first (immediate alert) 1387 1451 if procs: 1388 1452 await handle_runner_exits(procs) ··· 1660 1724 if schedule_enabled and _supervisor_callosum: 1661 1725 scheduler.init(_supervisor_callosum) 1662 1726 scheduler.register_defaults() 1727 + if _task_queue: 1728 + for cmd, seconds in scheduler.collect_runtime_caps(): 1729 + cmd_name = TaskQueue.get_command_name(cmd) 1730 + _task_queue.set_cap(cmd_name, seconds) 1731 + logging.info( 1732 + "Registered max_runtime cap for %s: %ss", 1733 + cmd_name, 1734 + seconds, 1735 + ) 1663 1736 routines.init(_supervisor_callosum) 1664 1737 1665 1738 if _task_queue:
+18
think/utils.py
··· 153 153 return path 154 154 155 155 156 + def parse_duration_seconds(spec) -> int: 157 + # D-D: shared parser for scheduler max_runtime values. 158 + if isinstance(spec, int) and not isinstance(spec, bool): 159 + if spec > 0: 160 + return spec 161 + raise ValueError(f"invalid duration: {spec!r}") 162 + 163 + if isinstance(spec, str): 164 + match = re.fullmatch(r"(\d+)([smh])", spec) 165 + if match: 166 + amount = int(match.group(1)) 167 + if amount <= 0: 168 + raise ValueError(f"invalid duration: {spec!r}") 169 + return amount * {"s": 1, "m": 60, "h": 3600}[match.group(2)] 170 + 171 + raise ValueError(f"invalid duration: {spec!r}") 172 + 173 + 156 174 def resolve_journal_path(journal: str | Path, rel: str) -> Path: 157 175 """Resolve a chronicle-free journal-relative path to its on-disk location.""" 158 176 if not rel: