personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Guard user-level sol alias ownership."""
5
6from __future__ import annotations
7
8import argparse
9import fcntl
10import os
11import re
12import sys
13from contextlib import contextmanager
14from enum import Enum
15from pathlib import Path
16from typing import Iterator, TypedDict
17
18try:
19 import userpath # type: ignore[import-not-found]
20except ImportError: # system python without the venv: doctor.stale_alias_symlink path
21 userpath = None # type: ignore[assignment]
22
23
24WRAPPER_TEMPLATE = """\
25#!/bin/bash
26# sol — managed by 'sol config'. Edits will be overwritten.
27# managed-version: 4
28: "${{SOLSTONE_JOURNAL:={journal}}}"
29export SOLSTONE_JOURNAL
30SOL_BIN='{sol_bin}'
31if [ ! -x "$SOL_BIN" ]; then
32 printf 'sol: venv binary missing or not executable: %s\\n' "$SOL_BIN" >&2
33 exit 127
34fi
35if [ "$1" = "supervisor" ]; then
36 mkdir -p "$SOLSTONE_JOURNAL/health"
37 exec > >(tee -a "$SOLSTONE_JOURNAL/health/service.log") 2>&1
38 export PYTHONUNBUFFERED=1
39fi
40exec "$SOL_BIN" "$@"
41"""
42
43WRAPPER_MARKER = "# managed-version: 4"
44WRAPPER_VERSION = 4
45
46_RE_MARKER = re.compile(r"(?m)^# managed-version: (?P<version>[1234])$")
47_RE_JOURNAL = re.compile(r'(?m)^: "\$\{SOLSTONE_JOURNAL:=(?P<journal>[^\n]*)\}"$')
48_RE_SOL_BIN = re.compile(r"(?m)^SOL_BIN='(?P<sol_bin>(?:[^']|'\\'')*)'$")
49
50_INVALID_JOURNAL_CHARS = ("$", "`", '"', "\\")
51
52
53class AliasState(Enum):
54 WORKTREE = "worktree"
55 ABSENT = "absent"
56 OWNED = "owned"
57 CROSS_REPO = "cross_repo"
58 DANGLING = "dangling"
59 FOREIGN = "foreign"
60
61
62class ParsedWrapper(TypedDict):
63 journal: str
64 sol_bin: str
65 version: int
66
67
68def alias_path() -> Path:
69 return Path.home() / ".local" / "bin" / "sol"
70
71
72def expected_target(curdir: Path) -> Path:
73 return curdir / ".venv" / "bin" / "sol"
74
75
76def render_wrapper(journal: str, sol_bin: str) -> str:
77 """Render the managed wrapper for ~/.local/bin/sol."""
78 escaped_sol_bin = sol_bin.replace("'", "'\\''")
79 return WRAPPER_TEMPLATE.format(journal=journal, sol_bin=escaped_sol_bin)
80
81
82def parse_wrapper(content: str) -> ParsedWrapper | None:
83 """Return embedded paths if the content is a managed wrapper."""
84 marker_match = _RE_MARKER.search(content)
85 if not marker_match:
86 return None
87 journal_match = _RE_JOURNAL.search(content)
88 sol_bin_match = _RE_SOL_BIN.search(content)
89 if not journal_match or not sol_bin_match:
90 return None
91 return {
92 "journal": journal_match.group("journal"),
93 "sol_bin": sol_bin_match.group("sol_bin").replace("'\\''", "'"),
94 "version": int(marker_match.group("version")),
95 }
96
97
98def write_wrapper_atomic(path: Path, content: str) -> None:
99 """Atomically rewrite the managed wrapper and restore exec mode."""
100 from think.entities.core import atomic_write
101
102 atomic_write(path, content)
103 os.chmod(path, 0o755)
104
105
106@contextmanager
107def wrapper_lock(lock_path: Path | None = None) -> Iterator[None]:
108 """Hold an exclusive advisory lock while rewriting the wrapper."""
109 if lock_path is None:
110 lock_path = Path.home() / ".local" / "bin" / ".sol.lock"
111 lock_path.parent.mkdir(parents=True, exist_ok=True)
112 with open(lock_path, "w", encoding="utf-8") as lock_fd:
113 try:
114 fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX)
115 yield
116 finally:
117 fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
118
119
120def validate_journal_path_for_wrapper(path: str) -> None:
121 """Reject shell-active characters that would corrupt wrapper embedding."""
122 for char in _INVALID_JOURNAL_CHARS:
123 if char in path:
124 raise ValueError(
125 f"journal path contains shell-active character {char!r}: {path!r}"
126 )
127
128
129def check_alias(curdir: Path) -> tuple[AliasState, Path | None]:
130 if (curdir / ".git").is_file():
131 return AliasState.WORKTREE, None
132
133 alias = alias_path()
134 if not alias.exists() and not alias.is_symlink():
135 return AliasState.ABSENT, None
136
137 if alias.is_symlink():
138 target = Path(os.readlink(alias))
139 if not target.is_absolute():
140 target = alias.parent / target
141 target = target.resolve()
142 if not target.exists():
143 return AliasState.DANGLING, target
144 if target == expected_target(curdir).resolve():
145 return AliasState.OWNED, target
146 return AliasState.CROSS_REPO, target
147
148 try:
149 content = alias.read_text(encoding="utf-8")
150 except OSError:
151 return AliasState.FOREIGN, None
152
153 parsed = parse_wrapper(content)
154 if parsed is None:
155 return AliasState.FOREIGN, None
156
157 target = Path(parsed["sol_bin"])
158 if target.resolve() == expected_target(curdir).resolve():
159 return AliasState.OWNED, target.resolve()
160 return AliasState.FOREIGN, None
161
162
163def _current_journal_for_alias() -> str:
164 """Return the journal path a wrapper install would embed right now."""
165 from think import utils as think_utils
166
167 try:
168 path, _ = think_utils.get_journal_info()
169 except getattr(think_utils, "SolstoneNotConfigured", RuntimeError):
170 path = str(Path.home() / "Documents" / "journal")
171 return path
172
173
174def check_alias_detail(curdir: Path) -> tuple[AliasState, str]:
175 """Return alias state plus the cmd_check token for owned aliases."""
176 state, _other_target = check_alias(curdir)
177 if state is not AliasState.OWNED:
178 return state, state.value
179
180 alias = alias_path()
181 if alias.is_symlink():
182 return state, "upgrade"
183
184 try:
185 content = alias.read_text(encoding="utf-8")
186 except OSError:
187 return state, "upgrade"
188
189 parsed = parse_wrapper(content)
190 if parsed is None:
191 return state, "upgrade"
192
193 if (
194 parsed["journal"] == _current_journal_for_alias()
195 and parsed["sol_bin"] == str(expected_target(curdir))
196 and parsed["version"] == WRAPPER_VERSION
197 and (alias.stat().st_mode & 0o111) == 0o111
198 ):
199 return state, "current"
200
201 return state, "upgrade"
202
203
204def format_error(
205 state: AliasState,
206 curdir: Path,
207 _alias: Path,
208 other_target: Path | None,
209 *,
210 allow_force: bool = False,
211) -> str:
212 if state is AliasState.WORKTREE:
213 return (
214 f"ERROR: refusing to run from a git worktree ({curdir}). "
215 "Run from the primary clone."
216 )
217
218 if state is AliasState.CROSS_REPO:
219 installed = f" installed: {other_target}"
220 elif state is AliasState.DANGLING:
221 installed = f" installed: dangling: {other_target} does not exist"
222 else:
223 installed = " installed: not a symlink"
224
225 lines = [
226 "ERROR: Another solstone install owns ~/.local/bin/sol.",
227 f" this repo: {curdir}",
228 installed,
229 "Run 'sol setup' from the installed repo first,",
230 "or remove ~/.local/bin/sol manually if that repo is gone.",
231 ]
232 if allow_force:
233 lines.append(
234 "Rerun 'python -m think.install_guard install --force' only if you intend to replace it from this repo."
235 )
236 return "\n".join(lines)
237
238
239def _print_error(
240 state: AliasState,
241 curdir: Path,
242 alias: Path,
243 other_target: Path | None,
244 *,
245 allow_force: bool = False,
246) -> None:
247 sys.stderr.write(
248 format_error(
249 state,
250 curdir,
251 alias,
252 other_target,
253 allow_force=allow_force,
254 )
255 + "\n"
256 )
257
258
259def _ensure_user_bin_on_path(user_bin: Path) -> None:
260 # `userpath` is imported at module top with an ImportError guard, so this
261 # module is importable from system python (where doctor runs) even when
262 # `userpath` is not installed. This code path is only reached via
263 # `cmd_install`, which only runs from inside the venv where `userpath` is
264 # present; if somehow reached without `userpath`, we want a hard failure.
265 if userpath is None:
266 raise RuntimeError("userpath is not available; run `make install` first")
267 user_bin_str = str(user_bin)
268 try:
269 if userpath.in_current_path(user_bin_str):
270 print("path: ~/.local/bin already on PATH")
271 return
272 if userpath.append(user_bin_str, app_name="solstone", all_shells=True):
273 if userpath.need_shell_restart(user_bin_str):
274 print(
275 "path: added ~/.local/bin to shell PATH — restart your shell or run 'exec $SHELL -l' to pick it up"
276 )
277 else:
278 print("path: added ~/.local/bin to shell PATH")
279 return
280 print(
281 'path: could not auto-add ~/.local/bin to PATH — add this line to your shell rc manually: export PATH="$HOME/.local/bin:$PATH"'
282 )
283 except Exception as exc:
284 print(
285 f'path: could not auto-add ~/.local/bin to PATH ({type(exc).__name__}: {exc}) — add this line to your shell rc manually: export PATH="$HOME/.local/bin:$PATH"'
286 )
287
288
289def cmd_check(curdir: Path) -> int:
290 alias = alias_path()
291 state, token = check_alias_detail(curdir)
292
293 if state is AliasState.WORKTREE:
294 print("worktree")
295 _print_error(state, curdir, alias, None)
296 return 1
297 if state is AliasState.ABSENT:
298 print("fresh")
299 return 0
300 if state is AliasState.OWNED:
301 print(token)
302 return 0
303 if state is AliasState.CROSS_REPO:
304 print("cross_repo")
305 _print_error(state, curdir, alias, check_alias(curdir)[1], allow_force=True)
306 return 1
307 if state is AliasState.DANGLING:
308 print("dangling")
309 _print_error(state, curdir, alias, check_alias(curdir)[1], allow_force=True)
310 return 1
311 if state is AliasState.FOREIGN:
312 print("not_symlink")
313 _print_error(state, curdir, alias, None, allow_force=True)
314 return 1
315
316 return 1
317
318
319def cmd_install(curdir: Path, *, force: bool = False) -> int:
320 alias = alias_path()
321 state, other_target = check_alias(curdir)
322
323 if state is AliasState.WORKTREE:
324 _print_error(state, curdir, alias, other_target)
325 return 1
326 if (
327 state
328 in {
329 AliasState.CROSS_REPO,
330 AliasState.DANGLING,
331 AliasState.FOREIGN,
332 }
333 and not force
334 ):
335 _print_error(state, curdir, alias, other_target, allow_force=True)
336 return 1
337
338 journal = _current_journal_for_alias()
339 try:
340 validate_journal_path_for_wrapper(journal)
341 except ValueError as exc:
342 print(f"refused: {exc}", file=sys.stderr)
343 return 1
344
345 content = render_wrapper(journal, str(expected_target(curdir)))
346 alias.parent.mkdir(parents=True, exist_ok=True)
347 with wrapper_lock():
348 locked_state, locked_other_target = check_alias(curdir)
349 if locked_state is AliasState.WORKTREE:
350 _print_error(locked_state, curdir, alias, locked_other_target)
351 return 1
352 if (
353 locked_state
354 in {
355 AliasState.CROSS_REPO,
356 AliasState.DANGLING,
357 AliasState.FOREIGN,
358 }
359 and not force
360 ):
361 _print_error(
362 locked_state,
363 curdir,
364 alias,
365 locked_other_target,
366 allow_force=True,
367 )
368 return 1
369 if alias.is_symlink():
370 alias.unlink()
371 write_wrapper_atomic(alias, content)
372
373 print("installed")
374 _ensure_user_bin_on_path(alias.parent)
375 return 0
376
377
378def cmd_uninstall(curdir: Path) -> int:
379 alias = alias_path()
380 state, other_target = check_alias(curdir)
381
382 if state is AliasState.WORKTREE:
383 _print_error(state, curdir, alias, other_target)
384 return 1
385 if state is AliasState.ABSENT:
386 print("absent")
387 return 0
388 if state is not AliasState.OWNED:
389 _print_error(state, curdir, alias, other_target)
390 return 1
391
392 with wrapper_lock():
393 locked_state, locked_other_target = check_alias(curdir)
394 if locked_state is AliasState.ABSENT:
395 print("absent")
396 return 0
397 if locked_state is not AliasState.OWNED:
398 _print_error(locked_state, curdir, alias, locked_other_target)
399 return 1
400 alias.unlink()
401
402 print("uninstalled")
403 return 0
404
405
406def main(argv: list[str] | None = None) -> int:
407 parser = argparse.ArgumentParser(prog="python -m think.install_guard")
408 subparsers = parser.add_subparsers(dest="cmd", required=True)
409 subparsers.add_parser("check")
410 install_parser = subparsers.add_parser("install")
411 install_parser.add_argument("--force", action="store_true")
412 subparsers.add_parser("uninstall")
413 args = parser.parse_args(argv)
414 curdir = Path.cwd().resolve()
415 if args.cmd == "check":
416 return cmd_check(curdir)
417 if args.cmd == "install":
418 return cmd_install(curdir, force=args.force)
419 return cmd_uninstall(curdir)
420
421
422if __name__ == "__main__":
423 sys.exit(main())