personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 423 lines 13 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Guard user-level sol alias ownership.""" 5 6from __future__ import annotations 7 8import argparse 9import fcntl 10import os 11import re 12import sys 13from contextlib import contextmanager 14from enum import Enum 15from pathlib import Path 16from typing import Iterator, TypedDict 17 18try: 19 import userpath # type: ignore[import-not-found] 20except ImportError: # system python without the venv: doctor.stale_alias_symlink path 21 userpath = None # type: ignore[assignment] 22 23 24WRAPPER_TEMPLATE = """\ 25#!/bin/bash 26# sol — managed by 'sol config'. Edits will be overwritten. 27# managed-version: 4 28: "${{SOLSTONE_JOURNAL:={journal}}}" 29export SOLSTONE_JOURNAL 30SOL_BIN='{sol_bin}' 31if [ ! -x "$SOL_BIN" ]; then 32 printf 'sol: venv binary missing or not executable: %s\\n' "$SOL_BIN" >&2 33 exit 127 34fi 35if [ "$1" = "supervisor" ]; then 36 mkdir -p "$SOLSTONE_JOURNAL/health" 37 exec > >(tee -a "$SOLSTONE_JOURNAL/health/service.log") 2>&1 38 export PYTHONUNBUFFERED=1 39fi 40exec "$SOL_BIN" "$@" 41""" 42 43WRAPPER_MARKER = "# managed-version: 4" 44WRAPPER_VERSION = 4 45 46_RE_MARKER = re.compile(r"(?m)^# managed-version: (?P<version>[1234])$") 47_RE_JOURNAL = re.compile(r'(?m)^: "\$\{SOLSTONE_JOURNAL:=(?P<journal>[^\n]*)\}"$') 48_RE_SOL_BIN = re.compile(r"(?m)^SOL_BIN='(?P<sol_bin>(?:[^']|'\\'')*)'$") 49 50_INVALID_JOURNAL_CHARS = ("$", "`", '"', "\\") 51 52 53class AliasState(Enum): 54 WORKTREE = "worktree" 55 ABSENT = "absent" 56 OWNED = "owned" 57 CROSS_REPO = "cross_repo" 58 DANGLING = "dangling" 59 FOREIGN = "foreign" 60 61 62class ParsedWrapper(TypedDict): 63 journal: str 64 sol_bin: str 65 version: int 66 67 68def alias_path() -> Path: 69 return Path.home() / ".local" / "bin" / "sol" 70 71 72def expected_target(curdir: Path) -> Path: 73 return curdir / ".venv" / "bin" / "sol" 74 75 76def render_wrapper(journal: str, sol_bin: str) -> str: 77 """Render the managed wrapper for ~/.local/bin/sol.""" 78 escaped_sol_bin = sol_bin.replace("'", "'\\''") 79 return WRAPPER_TEMPLATE.format(journal=journal, sol_bin=escaped_sol_bin) 80 81 82def parse_wrapper(content: str) -> ParsedWrapper | None: 83 """Return embedded paths if the content is a managed wrapper.""" 84 marker_match = _RE_MARKER.search(content) 85 if not marker_match: 86 return None 87 journal_match = _RE_JOURNAL.search(content) 88 sol_bin_match = _RE_SOL_BIN.search(content) 89 if not journal_match or not sol_bin_match: 90 return None 91 return { 92 "journal": journal_match.group("journal"), 93 "sol_bin": sol_bin_match.group("sol_bin").replace("'\\''", "'"), 94 "version": int(marker_match.group("version")), 95 } 96 97 98def write_wrapper_atomic(path: Path, content: str) -> None: 99 """Atomically rewrite the managed wrapper and restore exec mode.""" 100 from think.entities.core import atomic_write 101 102 atomic_write(path, content) 103 os.chmod(path, 0o755) 104 105 106@contextmanager 107def wrapper_lock(lock_path: Path | None = None) -> Iterator[None]: 108 """Hold an exclusive advisory lock while rewriting the wrapper.""" 109 if lock_path is None: 110 lock_path = Path.home() / ".local" / "bin" / ".sol.lock" 111 lock_path.parent.mkdir(parents=True, exist_ok=True) 112 with open(lock_path, "w", encoding="utf-8") as lock_fd: 113 try: 114 fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX) 115 yield 116 finally: 117 fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN) 118 119 120def validate_journal_path_for_wrapper(path: str) -> None: 121 """Reject shell-active characters that would corrupt wrapper embedding.""" 122 for char in _INVALID_JOURNAL_CHARS: 123 if char in path: 124 raise ValueError( 125 f"journal path contains shell-active character {char!r}: {path!r}" 126 ) 127 128 129def check_alias(curdir: Path) -> tuple[AliasState, Path | None]: 130 if (curdir / ".git").is_file(): 131 return AliasState.WORKTREE, None 132 133 alias = alias_path() 134 if not alias.exists() and not alias.is_symlink(): 135 return AliasState.ABSENT, None 136 137 if alias.is_symlink(): 138 target = Path(os.readlink(alias)) 139 if not target.is_absolute(): 140 target = alias.parent / target 141 target = target.resolve() 142 if not target.exists(): 143 return AliasState.DANGLING, target 144 if target == expected_target(curdir).resolve(): 145 return AliasState.OWNED, target 146 return AliasState.CROSS_REPO, target 147 148 try: 149 content = alias.read_text(encoding="utf-8") 150 except OSError: 151 return AliasState.FOREIGN, None 152 153 parsed = parse_wrapper(content) 154 if parsed is None: 155 return AliasState.FOREIGN, None 156 157 target = Path(parsed["sol_bin"]) 158 if target.resolve() == expected_target(curdir).resolve(): 159 return AliasState.OWNED, target.resolve() 160 return AliasState.FOREIGN, None 161 162 163def _current_journal_for_alias() -> str: 164 """Return the journal path a wrapper install would embed right now.""" 165 from think import utils as think_utils 166 167 try: 168 path, _ = think_utils.get_journal_info() 169 except getattr(think_utils, "SolstoneNotConfigured", RuntimeError): 170 path = str(Path.home() / "Documents" / "journal") 171 return path 172 173 174def check_alias_detail(curdir: Path) -> tuple[AliasState, str]: 175 """Return alias state plus the cmd_check token for owned aliases.""" 176 state, _other_target = check_alias(curdir) 177 if state is not AliasState.OWNED: 178 return state, state.value 179 180 alias = alias_path() 181 if alias.is_symlink(): 182 return state, "upgrade" 183 184 try: 185 content = alias.read_text(encoding="utf-8") 186 except OSError: 187 return state, "upgrade" 188 189 parsed = parse_wrapper(content) 190 if parsed is None: 191 return state, "upgrade" 192 193 if ( 194 parsed["journal"] == _current_journal_for_alias() 195 and parsed["sol_bin"] == str(expected_target(curdir)) 196 and parsed["version"] == WRAPPER_VERSION 197 and (alias.stat().st_mode & 0o111) == 0o111 198 ): 199 return state, "current" 200 201 return state, "upgrade" 202 203 204def format_error( 205 state: AliasState, 206 curdir: Path, 207 _alias: Path, 208 other_target: Path | None, 209 *, 210 allow_force: bool = False, 211) -> str: 212 if state is AliasState.WORKTREE: 213 return ( 214 f"ERROR: refusing to run from a git worktree ({curdir}). " 215 "Run from the primary clone." 216 ) 217 218 if state is AliasState.CROSS_REPO: 219 installed = f" installed: {other_target}" 220 elif state is AliasState.DANGLING: 221 installed = f" installed: dangling: {other_target} does not exist" 222 else: 223 installed = " installed: not a symlink" 224 225 lines = [ 226 "ERROR: Another solstone install owns ~/.local/bin/sol.", 227 f" this repo: {curdir}", 228 installed, 229 "Run 'sol setup' from the installed repo first,", 230 "or remove ~/.local/bin/sol manually if that repo is gone.", 231 ] 232 if allow_force: 233 lines.append( 234 "Rerun 'python -m think.install_guard install --force' only if you intend to replace it from this repo." 235 ) 236 return "\n".join(lines) 237 238 239def _print_error( 240 state: AliasState, 241 curdir: Path, 242 alias: Path, 243 other_target: Path | None, 244 *, 245 allow_force: bool = False, 246) -> None: 247 sys.stderr.write( 248 format_error( 249 state, 250 curdir, 251 alias, 252 other_target, 253 allow_force=allow_force, 254 ) 255 + "\n" 256 ) 257 258 259def _ensure_user_bin_on_path(user_bin: Path) -> None: 260 # `userpath` is imported at module top with an ImportError guard, so this 261 # module is importable from system python (where doctor runs) even when 262 # `userpath` is not installed. This code path is only reached via 263 # `cmd_install`, which only runs from inside the venv where `userpath` is 264 # present; if somehow reached without `userpath`, we want a hard failure. 265 if userpath is None: 266 raise RuntimeError("userpath is not available; run `make install` first") 267 user_bin_str = str(user_bin) 268 try: 269 if userpath.in_current_path(user_bin_str): 270 print("path: ~/.local/bin already on PATH") 271 return 272 if userpath.append(user_bin_str, app_name="solstone", all_shells=True): 273 if userpath.need_shell_restart(user_bin_str): 274 print( 275 "path: added ~/.local/bin to shell PATH — restart your shell or run 'exec $SHELL -l' to pick it up" 276 ) 277 else: 278 print("path: added ~/.local/bin to shell PATH") 279 return 280 print( 281 'path: could not auto-add ~/.local/bin to PATH — add this line to your shell rc manually: export PATH="$HOME/.local/bin:$PATH"' 282 ) 283 except Exception as exc: 284 print( 285 f'path: could not auto-add ~/.local/bin to PATH ({type(exc).__name__}: {exc}) — add this line to your shell rc manually: export PATH="$HOME/.local/bin:$PATH"' 286 ) 287 288 289def cmd_check(curdir: Path) -> int: 290 alias = alias_path() 291 state, token = check_alias_detail(curdir) 292 293 if state is AliasState.WORKTREE: 294 print("worktree") 295 _print_error(state, curdir, alias, None) 296 return 1 297 if state is AliasState.ABSENT: 298 print("fresh") 299 return 0 300 if state is AliasState.OWNED: 301 print(token) 302 return 0 303 if state is AliasState.CROSS_REPO: 304 print("cross_repo") 305 _print_error(state, curdir, alias, check_alias(curdir)[1], allow_force=True) 306 return 1 307 if state is AliasState.DANGLING: 308 print("dangling") 309 _print_error(state, curdir, alias, check_alias(curdir)[1], allow_force=True) 310 return 1 311 if state is AliasState.FOREIGN: 312 print("not_symlink") 313 _print_error(state, curdir, alias, None, allow_force=True) 314 return 1 315 316 return 1 317 318 319def cmd_install(curdir: Path, *, force: bool = False) -> int: 320 alias = alias_path() 321 state, other_target = check_alias(curdir) 322 323 if state is AliasState.WORKTREE: 324 _print_error(state, curdir, alias, other_target) 325 return 1 326 if ( 327 state 328 in { 329 AliasState.CROSS_REPO, 330 AliasState.DANGLING, 331 AliasState.FOREIGN, 332 } 333 and not force 334 ): 335 _print_error(state, curdir, alias, other_target, allow_force=True) 336 return 1 337 338 journal = _current_journal_for_alias() 339 try: 340 validate_journal_path_for_wrapper(journal) 341 except ValueError as exc: 342 print(f"refused: {exc}", file=sys.stderr) 343 return 1 344 345 content = render_wrapper(journal, str(expected_target(curdir))) 346 alias.parent.mkdir(parents=True, exist_ok=True) 347 with wrapper_lock(): 348 locked_state, locked_other_target = check_alias(curdir) 349 if locked_state is AliasState.WORKTREE: 350 _print_error(locked_state, curdir, alias, locked_other_target) 351 return 1 352 if ( 353 locked_state 354 in { 355 AliasState.CROSS_REPO, 356 AliasState.DANGLING, 357 AliasState.FOREIGN, 358 } 359 and not force 360 ): 361 _print_error( 362 locked_state, 363 curdir, 364 alias, 365 locked_other_target, 366 allow_force=True, 367 ) 368 return 1 369 if alias.is_symlink(): 370 alias.unlink() 371 write_wrapper_atomic(alias, content) 372 373 print("installed") 374 _ensure_user_bin_on_path(alias.parent) 375 return 0 376 377 378def cmd_uninstall(curdir: Path) -> int: 379 alias = alias_path() 380 state, other_target = check_alias(curdir) 381 382 if state is AliasState.WORKTREE: 383 _print_error(state, curdir, alias, other_target) 384 return 1 385 if state is AliasState.ABSENT: 386 print("absent") 387 return 0 388 if state is not AliasState.OWNED: 389 _print_error(state, curdir, alias, other_target) 390 return 1 391 392 with wrapper_lock(): 393 locked_state, locked_other_target = check_alias(curdir) 394 if locked_state is AliasState.ABSENT: 395 print("absent") 396 return 0 397 if locked_state is not AliasState.OWNED: 398 _print_error(locked_state, curdir, alias, locked_other_target) 399 return 1 400 alias.unlink() 401 402 print("uninstalled") 403 return 0 404 405 406def main(argv: list[str] | None = None) -> int: 407 parser = argparse.ArgumentParser(prog="python -m think.install_guard") 408 subparsers = parser.add_subparsers(dest="cmd", required=True) 409 subparsers.add_parser("check") 410 install_parser = subparsers.add_parser("install") 411 install_parser.add_argument("--force", action="store_true") 412 subparsers.add_parser("uninstall") 413 args = parser.parse_args(argv) 414 curdir = Path.cwd().resolve() 415 if args.cmd == "check": 416 return cmd_check(curdir) 417 if args.cmd == "install": 418 return cmd_install(curdir, force=args.force) 419 return cmd_uninstall(curdir) 420 421 422if __name__ == "__main__": 423 sys.exit(main())