personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at d18a7c02359cd827d0ff15058861de5c2600a96f 244 lines 7.8 kB view raw
1#!/usr/bin/env python3 2# SPDX-License-Identifier: AGPL-3.0-only 3# Copyright (c) 2026 sol pbc 4 5"""Layer-hygiene lint. 6 7Low-bar static check for the invariants in ``docs/coding-standards.md`` § 8"Layer Hygiene" (L1, L2, L3, L6, L7). Warns when code inside infrastructure 9modules (``think/indexer/``, ``think/importers/``, ``think/search/``, 10``think/graph/``) or inside a read-verb CLI handler (a function in 11``apps/*/call.py`` whose name contains a read verb such as ``load``, ``show``, 12``check``, ``validate``, ``find``, ``list``, ``scan``, ``get``) performs a 13direct write (``atomic_write``, ``json.dump``, ``.write_text``, 14``open(..., "w")``, ``unlink``, ``rmtree``) against a path under 15``journal/entities/``, ``journal/facets/``, or ``journal/observations``. 16 17By design this is a grep-level check with known false-positive surface. Known 18audit-tracked violations are allowlisted below with a TODO and an audit 19reference. An allowlist entry is expected to disappear once its bundle ships — 20see ``vpe/workspace/solstone-layer-violations-audit.md`` in the sol pbc 21internal extro repo for the canonical list (V1-V14). 22 23Exit codes: 24 0 — no un-tracked violations 25 1 — new violations found outside the allowlist 26""" 27 28from __future__ import annotations 29 30import ast 31import re 32import subprocess 33import sys 34from pathlib import Path 35 36ROOT = Path(__file__).resolve().parent.parent 37 38# Module families scrutinized as "infrastructure" per L1/L6/L7. 39INFRASTRUCTURE_SCOPES: tuple[str, ...] = ( 40 "think/indexer", 41 "think/importers", 42 "think/search", 43 "think/graph", 44) 45 46# Direct-write operations. Indirect writes via helper methods (e.g. 47# ``checklist.save()``) are out of scope by design — the audit notes that 48# indirect writes are not reachable by grep. 49WRITE_PATTERNS: tuple[tuple[re.Pattern[str], str], ...] = ( 50 (re.compile(r"\batomic_write\s*\("), "atomic_write"), 51 (re.compile(r"\bjson\.dump\s*\("), "json.dump"), 52 (re.compile(r"\.write_text\s*\("), ".write_text"), 53 (re.compile(r"""\bopen\s*\([^)]*["']w[+b]?["']"""), 'open(..., "w")'), 54 (re.compile(r"\bos\.unlink\s*\("), "os.unlink"), 55 (re.compile(r"\.unlink\s*\(\s*(?:missing_ok|\))"), ".unlink()"), 56 (re.compile(r"\b(?:shutil\.)?rmtree\s*\("), "rmtree"), 57) 58 59# Strings / identifiers that indicate the write target sits under one of the 60# protected domains. The window-based proximity check below uses these to 61# decide whether a flagged write is on a domain path. 62TARGET_PATH_PATTERNS: tuple[re.Pattern[str], ...] = ( 63 re.compile(r"journal/entities\b"), 64 re.compile(r"journal/facets\b"), 65 re.compile(r"journal/observations"), 66 re.compile(r'["\']entities["\']'), 67 re.compile(r'["\']facets["\']'), 68 re.compile(r'["\']observations'), 69 re.compile( 70 r"\b(?:entity|facet|observation|observations?)_(?:path|dir|file|json)\b" 71 ), 72) 73 74# Read verbs per docs/coding-standards.md § L3. Match against any 75# underscore-split segment of the function name. 76READ_VERBS: frozenset[str] = frozenset( 77 { 78 "load", 79 "get", 80 "read", 81 "scan", 82 "list", 83 "show", 84 "find", 85 "match", 86 "resolve", 87 "query", 88 "lookup", 89 "status", 90 "check", 91 "validate", 92 "discover", 93 "format", 94 "render", 95 "extract", 96 "parse", 97 "view", 98 "inspect", 99 "info", 100 "describe", 101 "search", 102 } 103) 104 105# Temporary, file-scoped exceptions for known layer-hygiene violations. 106# Keep this empty by default; add entries only with a tracking identifier 107# and remove them in the same bundle that fixes the violation. 108ALLOWLIST: dict[str, str] = {} 109 110CONTEXT_WINDOW = 8 # lines above and below each write to search for paths 111 112 113def tracked_python_files() -> list[Path]: 114 result = subprocess.run( 115 ["git", "ls-files", "*.py"], 116 cwd=ROOT, 117 check=True, 118 capture_output=True, 119 text=True, 120 ) 121 return [Path(line) for line in result.stdout.splitlines() if line] 122 123 124def in_infrastructure_scope(rel: Path) -> bool: 125 path_str = rel.as_posix() 126 return any(path_str.startswith(scope + "/") for scope in INFRASTRUCTURE_SCOPES) 127 128 129def is_call_py(rel: Path) -> bool: 130 parts = rel.parts 131 return len(parts) >= 3 and parts[0] == "apps" and parts[-1] == "call.py" 132 133 134def has_target_path_nearby(lines: list[str], idx: int) -> bool: 135 start = max(0, idx - CONTEXT_WINDOW) 136 end = min(len(lines), idx + CONTEXT_WINDOW + 1) 137 window = "\n".join(lines[start:end]) 138 return any(p.search(window) for p in TARGET_PATH_PATTERNS) 139 140 141def scan_lines(lines: list[str]) -> list[tuple[int, str]]: 142 findings: list[tuple[int, str]] = [] 143 for idx, line in enumerate(lines): 144 for pat, label in WRITE_PATTERNS: 145 if pat.search(line) and has_target_path_nearby(lines, idx): 146 findings.append((idx + 1, label)) 147 break 148 return findings 149 150 151def has_read_verb(name: str) -> bool: 152 base = name.lstrip("_") 153 return any(part in READ_VERBS for part in base.split("_") if part) 154 155 156def check_call_py(rel: Path, source: str) -> list[tuple[int, str, str]]: 157 """Flag writes inside read-verb function bodies. 158 159 Returns a list of ``(line_no, write_label, function_name)`` tuples. 160 """ 161 try: 162 tree = ast.parse(source, filename=str(rel)) 163 except SyntaxError: 164 return [] 165 166 findings: list[tuple[int, str, str]] = [] 167 src_lines = source.splitlines() 168 169 for node in ast.walk(tree): 170 if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 171 continue 172 if not has_read_verb(node.name): 173 continue 174 start = node.lineno - 1 175 end = (node.end_lineno or node.lineno) - 1 176 body_lines = src_lines[start : end + 1] 177 sub_findings = scan_lines(body_lines) 178 for local_line, label in sub_findings: 179 findings.append((start + local_line, label, node.name)) 180 return findings 181 182 183def main() -> int: 184 new: list[str] = [] 185 tracked: list[str] = [] 186 187 for rel in sorted(tracked_python_files()): 188 abs_path = ROOT / rel 189 if not abs_path.is_file(): 190 continue 191 try: 192 source = abs_path.read_text(encoding="utf-8") 193 except UnicodeDecodeError: 194 continue 195 196 rel_str = rel.as_posix() 197 issues: list[str] = [] 198 199 if in_infrastructure_scope(rel): 200 for line_no, label in scan_lines(source.splitlines()): 201 issues.append( 202 f"{rel_str}:{line_no}: {label} " 203 f"on journal-domain path (infrastructure scope)" 204 ) 205 206 if is_call_py(rel): 207 for line_no, label, func_name in check_call_py(rel, source): 208 issues.append( 209 f"{rel_str}:{line_no}: {label} in read-verb handler '{func_name}()'" 210 ) 211 212 if not issues: 213 continue 214 215 audit_ref = ALLOWLIST.get(rel_str) 216 for issue in issues: 217 if audit_ref: 218 tracked.append(f"{issue} [tracked: {audit_ref}]") 219 else: 220 new.append(issue) 221 222 if tracked: 223 print("layer-hygiene: known violations (tracked, expected to disappear):") 224 for line in tracked: 225 print(f" {line}") 226 print() 227 228 if new: 229 print("layer-hygiene: NEW violations:", file=sys.stderr) 230 for line in new: 231 print(f" {line}", file=sys.stderr) 232 print(file=sys.stderr) 233 print( 234 "See docs/coding-standards.md § Layer Hygiene (L1/L2/L3/L6/L7).", 235 file=sys.stderr, 236 ) 237 return 1 238 239 print("layer-hygiene: pass") 240 return 0 241 242 243if __name__ == "__main__": 244 raise SystemExit(main())