personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""General utilities for solstone.
5
6This module provides core utilities for journal access, date/segment handling,
7configuration loading, and CLI setup. Talent-related utilities (prompt loading,
8agent configs, etc.) have been moved to think/talent.py.
9"""
10
11from __future__ import annotations
12
13import argparse
14import copy
15import json
16import logging
17import os
18import re
19import socket
20import sys
21import time
22from datetime import datetime
23from pathlib import Path
24from typing import Any, Optional
25
26from timefhuman import timefhuman
27
28from media import MIME_TYPES
29
30DATE_RE = re.compile(r"\d{8}")
31CHRONICLE_DIR = "chronicle"
32DEFAULT_STREAM = "_default"
33EXIT_TEMPFAIL = 75
34
35
36def now_ms() -> int:
37 """Return current time as Unix epoch milliseconds."""
38 return int(time.time() * 1000)
39
40
41_rev_cache: str | None = "__unset__"
42
43
44def get_rev() -> str | None:
45 """Return short git commit hash, cached after first call. None if unavailable."""
46 global _rev_cache
47 if _rev_cache != "__unset__":
48 return _rev_cache
49 try:
50 import subprocess
51
52 result = subprocess.run(
53 ["git", "rev-parse", "--short", "HEAD"],
54 capture_output=True,
55 text=True,
56 timeout=5,
57 )
58 _rev_cache = result.stdout.strip() if result.returncode == 0 else None
59 except Exception:
60 _rev_cache = None
61 return _rev_cache
62
63
64def truncated_echo(text: str, max_bytes: int = 16384) -> None:
65 """Print text to stdout, truncating if it exceeds *max_bytes* UTF-8 bytes.
66
67 When the encoded output exceeds the limit it is cut at a clean UTF-8
68 character boundary and a warning is written to stderr reporting the
69 original size. Pass ``max_bytes=0`` to disable the limit.
70 """
71 encoded = text.encode("utf-8")
72 if max_bytes > 0 and len(encoded) > max_bytes:
73 truncated = encoded[:max_bytes].decode("utf-8", errors="ignore")
74 sys.stdout.write(truncated)
75 sys.stdout.write("\n")
76 sys.stderr.write(
77 f"[truncated: {len(encoded):,} bytes total, --max {max_bytes:,}]\n"
78 )
79 else:
80 sys.stdout.write(text)
81 sys.stdout.write("\n")
82
83
84def get_project_root() -> str:
85 """Return the absolute path to the solstone repository root."""
86 return str(Path(__file__).resolve().parent.parent)
87
88
89def get_journal_info() -> tuple[str, str]:
90 """Return the journal path and its source.
91
92 Returns
93 -------
94 tuple[str, str]
95 (path, source) where source is "override" when
96 _SOLSTONE_JOURNAL_OVERRIDE is set, otherwise "project".
97 """
98 override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE")
99 if override:
100 return override, "override"
101
102 journal = str(Path(get_project_root()) / "journal")
103 return journal, "project"
104
105
106def get_journal() -> str:
107 """Return the journal path: <project_root>/journal/
108
109 The journal always lives at ./journal/ relative to the solstone
110 project root. Auto-creates the directory if it doesn't exist.
111
112 Trust this function — never bypass it, cache its result, or set
113 _SOLSTONE_JOURNAL_OVERRIDE from application code. The env var
114 exists for external use only (tests, Makefile sandboxes). See
115 ``docs/environment.md``.
116 """
117 override = os.environ.get("_SOLSTONE_JOURNAL_OVERRIDE")
118 if override:
119 os.makedirs(override, exist_ok=True)
120 return override
121
122 project_root = Path(__file__).resolve().parent.parent
123 journal = str(project_root / "journal")
124 os.makedirs(journal, exist_ok=True)
125 return journal
126
127
128def resolve_journal_path(journal: str | Path, rel: str) -> Path:
129 """Resolve a chronicle-free journal-relative path to its on-disk location."""
130 if not rel:
131 raise ValueError("rel must be non-empty")
132 if os.path.isabs(rel):
133 raise ValueError("rel must be journal-relative")
134 if "\\" in rel:
135 raise ValueError("rel must use POSIX separators")
136 parts = Path(rel).parts
137 if not parts or any(p in ("", ".", "..") for p in parts):
138 raise ValueError("rel must not contain empty, '.', or '..' components")
139 journal_path = Path(journal)
140 if DATE_RE.fullmatch(parts[0]):
141 return journal_path / CHRONICLE_DIR / rel
142 return journal_path / rel
143
144
145def journal_relative_path(journal: str | Path, abs_path: str | Path) -> str:
146 """Return a chronicle-free journal-relative POSIX path for an absolute path under the journal."""
147 journal_path = Path(journal)
148 file_path = Path(abs_path)
149 chronicle_root = journal_path / CHRONICLE_DIR
150 if file_path.is_relative_to(chronicle_root):
151 return file_path.relative_to(chronicle_root).as_posix()
152 return file_path.relative_to(journal_path).as_posix()
153
154
155def day_path(day: Optional[str] = None, *, create: bool = True) -> Path:
156 """Return absolute path for a day directory within the journal chronicle.
157
158 Parameters
159 ----------
160 day : str, optional
161 Day in YYYYMMDD format. If None, uses today's date.
162 create : bool, optional
163 Create the day directory if it does not exist. Defaults to True.
164
165 Returns
166 -------
167 Path
168 Absolute path to the day directory in chronicle/. Directory is created if
169 it doesn't exist.
170
171 Raises
172 ------
173 ValueError
174 If day format is invalid.
175 """
176 journal = get_journal()
177
178 # Handle "today" case
179 if day is None:
180 day = datetime.now().strftime("%Y%m%d")
181 elif not DATE_RE.fullmatch(day):
182 raise ValueError("day must be in YYYYMMDD format")
183
184 path = Path(journal) / CHRONICLE_DIR / day
185 if create:
186 path.mkdir(parents=True, exist_ok=True)
187 return path
188
189
190def day_dirs() -> dict[str, str]:
191 """Return mapping of YYYYMMDD day names to absolute paths.
192
193 Returns
194 -------
195 dict[str, str]
196 Mapping of day folder names to their full paths.
197 Example: {"20250101": "/path/to/journal/chronicle/20250101", ...}
198 """
199 chronicle_dir = Path(get_journal()) / CHRONICLE_DIR
200 if not chronicle_dir.is_dir():
201 return {}
202
203 days: dict[str, str] = {}
204 for name in os.listdir(chronicle_dir):
205 if DATE_RE.fullmatch(name):
206 path = os.path.join(chronicle_dir, name)
207 if os.path.isdir(path):
208 days[name] = path
209 return days
210
211
212def updated_days(exclude: set[str] | None = None) -> list[str]:
213 """Return journal days with pending stream data not yet processed daily.
214
215 A day is "updated" when it has a ``health/stream.updated`` marker that is
216 newer than its ``health/daily.updated`` marker (or daily.updated is missing).
217 Days without ``stream.updated`` are skipped entirely.
218
219 Parameters
220 ----------
221 exclude : set of str, optional
222 Day strings (YYYYMMDD) to skip.
223
224 Returns
225 -------
226 list of str
227 Sorted list of updated day strings.
228 """
229 days = day_dirs()
230 updated: list[str] = []
231 for name, path in days.items():
232 if exclude and name in exclude:
233 continue
234 stream = os.path.join(path, "health", "stream.updated")
235 if not os.path.isfile(stream):
236 continue
237 daily = os.path.join(path, "health", "daily.updated")
238 if not os.path.isfile(daily):
239 updated.append(name)
240 continue
241 if os.path.getmtime(stream) > os.path.getmtime(daily):
242 updated.append(name)
243 updated.sort()
244 return updated
245
246
247def segment_path(day: str, segment: str, stream: str) -> Path:
248 """Return absolute path for a segment directory within a stream.
249
250 Parameters
251 ----------
252 day : str
253 Day in YYYYMMDD format.
254 segment : str
255 Segment key in HHMMSS_LEN format.
256 stream : str
257 Stream name (e.g., "archon", "import.apple").
258
259 Returns
260 -------
261 Path
262 Absolute path to the segment directory (created if it doesn't exist).
263 """
264 path = day_path(day) / stream / segment
265 path.mkdir(parents=True, exist_ok=True)
266 return path
267
268
269def day_from_path(path: str | Path) -> str | None:
270 """Extract the YYYYMMDD day from a journal path.
271
272 Walks up the path's parents and returns the first directory name
273 that matches the YYYYMMDD date format.
274
275 Parameters
276 ----------
277 path : str or Path
278 Any path within the journal directory structure.
279
280 Returns
281 -------
282 str or None
283 The YYYYMMDD day string, or None if no date directory is found.
284 """
285 path = Path(path)
286 for parent in (path, *path.parents):
287 if DATE_RE.fullmatch(parent.name):
288 return parent.name
289 return None
290
291
292def iter_segments(day: str | Path) -> list[tuple[str, str, Path]]:
293 """Return all segments in a day, sorted chronologically.
294
295 Traverses the stream directory structure under a day directory and
296 returns segment information for all streams.
297
298 Parameters
299 ----------
300 day : str or Path
301 Day in YYYYMMDD format (str) or path to day directory (Path).
302
303 Returns
304 -------
305 list of (stream_name, segment_key, segment_path) tuples
306 Sorted by segment_key across all streams for chronological order.
307 """
308 if isinstance(day, Path):
309 day_dir = day
310 else:
311 day_dir = day_path(day, create=False)
312
313 if not day_dir.exists():
314 return []
315
316 results = []
317 for entry in day_dir.iterdir():
318 if not entry.is_dir():
319 continue
320 if segment_key(entry.name) is not None:
321 results.append((DEFAULT_STREAM, entry.name, entry))
322 continue
323 if entry.name == "health":
324 continue
325 stream_name = entry.name
326 for seg_entry in entry.iterdir():
327 if seg_entry.is_dir() and segment_key(seg_entry.name):
328 results.append((stream_name, seg_entry.name, seg_entry))
329
330 results.sort(key=lambda x: x[1])
331 return results
332
333
334def segment_key(name_or_path: str) -> str | None:
335 """Extract segment key (HHMMSS_LEN) from any path/filename.
336
337 Parameters
338 ----------
339 name_or_path : str
340 Segment name, filename, or full path containing segment.
341
342 Returns
343 -------
344 str or None
345 Segment key in HHMMSS_LEN format if valid, None otherwise.
346
347 Examples
348 --------
349 >>> segment_key("143022_300")
350 "143022_300"
351 >>> segment_key("143022_300_summary.txt")
352 "143022_300"
353 >>> segment_key("/journal/20250109/143022_300/audio.jsonl")
354 "143022_300"
355 >>> segment_key("invalid")
356 None
357 """
358 # Match HHMMSS_LEN format: 6 digits, underscore, 1+ digits
359 pattern = r"\b(\d{6})_(\d+)(?:_|\b)"
360 match = re.search(pattern, name_or_path)
361 if match:
362 time_part = match.group(1)
363 len_part = match.group(2)
364 return f"{time_part}_{len_part}"
365 return None
366
367
368def segment_parse(
369 name_or_path: str,
370) -> tuple[datetime.time, datetime.time] | tuple[None, None]:
371 """Parse segment to extract start and end times as datetime objects.
372
373 Parameters
374 ----------
375 name_or_path : str
376 Segment name (e.g., "143022_300") or full path containing segment.
377
378 Returns
379 -------
380 tuple of (datetime.time, datetime.time) or (None, None)
381 Tuple of (start_time, end_time) where:
382 - start_time: datetime.time for HHMMSS
383 - end_time: datetime.time computed from start + LEN seconds
384 Returns (None, None) if not a valid HHMMSS_LEN segment format.
385
386 Examples
387 --------
388 >>> segment_parse("143022_300") # 14:30:22 + 300 seconds = 14:35:22
389 (datetime.time(14, 30, 22), datetime.time(14, 35, 22))
390 >>> segment_parse("/journal/20250109/143022_300/audio.jsonl")
391 (datetime.time(14, 30, 22), datetime.time(14, 35, 22))
392 >>> segment_parse("invalid")
393 (None, None)
394 """
395 from datetime import time, timedelta
396
397 # Extract just the segment name if it's a path
398 if "/" in name_or_path or "\\" in name_or_path:
399 path_parts = Path(name_or_path).parts
400 # Look for segment key in path parts after a YYYYMMDD day directory.
401 # Layout is YYYYMMDD/stream/HHMMSS_LEN/...
402 name = None
403 for i, part in enumerate(path_parts):
404 if part.isdigit() and len(part) == 8:
405 # Scan subsequent parts for a segment key
406 for j in range(i + 1, len(path_parts)):
407 if segment_key(path_parts[j]):
408 name = path_parts[j]
409 break
410 if name:
411 break
412 if name is None:
413 return (None, None)
414 else:
415 name = name_or_path
416
417 # Validate and extract HHMMSS_LEN from segment name
418 if "_" not in name:
419 return (None, None)
420
421 parts = name.split("_", 1) # Split on first underscore only
422 if (
423 len(parts) != 2
424 or not parts[0].isdigit()
425 or len(parts[0]) != 6
426 or not parts[1].isdigit()
427 ):
428 return (None, None)
429
430 time_str = parts[0]
431 length_str = parts[1]
432
433 # Parse HHMMSS to datetime.time
434 try:
435 hour = int(time_str[0:2])
436 minute = int(time_str[2:4])
437 second = int(time_str[4:6])
438
439 # Validate ranges
440 if not (0 <= hour <= 23 and 0 <= minute <= 59 and 0 <= second <= 59):
441 return (None, None)
442
443 start_time = time(hour, minute, second)
444 except (ValueError, IndexError):
445 return (None, None)
446
447 # Parse LEN and compute end time
448 try:
449 length_seconds = int(length_str)
450 # Compute end time by adding duration
451 start_dt = datetime.combine(datetime.today(), start_time)
452 end_dt = start_dt + timedelta(seconds=length_seconds)
453 if end_dt.date() > start_dt.date():
454 end_time = time(23, 59, 59)
455 else:
456 end_time = end_dt.time()
457 return (start_time, end_time)
458 except ValueError:
459 return (None, None)
460
461
462def format_day(day: str) -> str:
463 """Format a day string (YYYYMMDD) as a human-readable date.
464
465 Parameters
466 ----------
467 day:
468 Day in YYYYMMDD format.
469
470 Returns
471 -------
472 str
473 Formatted date like "Friday, January 24, 2026".
474 Returns the original string if parsing fails.
475
476 Examples
477 --------
478 >>> format_day("20260124")
479 "Friday, January 24, 2026"
480 """
481 try:
482 dt = datetime.strptime(day, "%Y%m%d")
483 return dt.strftime("%A, %B %d, %Y")
484 except ValueError:
485 return day
486
487
488def iso_date(day: str) -> str:
489 """Convert a day string (YYYYMMDD) to ISO format (YYYY-MM-DD).
490
491 Parameters
492 ----------
493 day:
494 Day in YYYYMMDD format.
495
496 Returns
497 -------
498 str
499 ISO formatted date like "2026-01-24".
500 """
501 return f"{day[:4]}-{day[4:6]}-{day[6:8]}"
502
503
504def format_segment_times(segment: str) -> tuple[str, str] | tuple[None, None]:
505 """Format segment start and end times as human-readable strings.
506
507 Parameters
508 ----------
509 segment:
510 Segment key in HHMMSS_LEN format (e.g., "143022_300").
511
512 Returns
513 -------
514 tuple[str, str] | tuple[None, None]
515 Tuple of (start_time, end_time) as formatted strings like "2:30 PM".
516 Returns (None, None) if segment format is invalid.
517
518 Examples
519 --------
520 >>> format_segment_times("143022_300")
521 ("2:30 PM", "2:35 PM")
522 >>> format_segment_times("090000_3600")
523 ("9:00 AM", "10:00 AM")
524 """
525 start_time, end_time = segment_parse(segment)
526 if start_time is None or end_time is None:
527 return (None, None)
528
529 return (_format_time(start_time), _format_time(end_time))
530
531
532def _format_time(t: datetime.time) -> str:
533 """Format a time as 12-hour with AM/PM, no leading zero on hour.
534
535 Uses lstrip('0') for cross-platform compatibility (%-I is Unix-only).
536 """
537 return datetime.combine(datetime.today(), t).strftime("%I:%M %p").lstrip("0")
538
539
540def _load_default_config() -> dict[str, Any]:
541 """Load the default journal configuration from journal_default.json.
542
543 Returns
544 -------
545 dict
546 Default configuration structure.
547 """
548 default_path = Path(__file__).parent / "journal_default.json"
549 with open(default_path, "r", encoding="utf-8") as f:
550 return json.load(f)
551
552
553# Cached default config (loaded once at first use)
554_default_config: dict[str, Any] | None = None
555
556
557def get_config() -> dict[str, Any]:
558 """Return the journal configuration from config/journal.json.
559
560 When no journal.json exists, returns a deep copy of the defaults from
561 think/journal_default.json. Once journal.json exists it is the master
562 and is returned as-is with no merging of defaults.
563
564 Returns
565 -------
566 dict
567 Journal configuration.
568 """
569 global _default_config
570 if _default_config is None:
571 _default_config = _load_default_config()
572
573 journal = get_journal()
574 config_path = Path(journal) / "config" / "journal.json"
575
576 # Return defaults when no config file exists yet
577 if not config_path.exists():
578 return copy.deepcopy(_default_config)
579
580 try:
581 with open(config_path, "r", encoding="utf-8") as f:
582 return json.load(f)
583 except (json.JSONDecodeError, OSError) as exc:
584 # Log error but return defaults to avoid breaking callers
585 logging.getLogger(__name__).warning(
586 "Failed to load config from %s: %s", config_path, exc
587 )
588 return copy.deepcopy(_default_config)
589
590
591def _append_task_log(dir_path: str | Path, message: str) -> None:
592 """Append ``message`` to ``task_log.txt`` inside ``dir_path``."""
593 path = Path(dir_path) / "task_log.txt"
594 try:
595 path.parent.mkdir(parents=True, exist_ok=True)
596 with open(path, "a", encoding="utf-8") as f:
597 f.write(f"{int(time.time())}\t{message}\n")
598 except Exception:
599 pass
600
601
602def day_log(day: str, message: str) -> None:
603 """Convenience wrapper to log message for ``day``."""
604 _append_task_log(str(day_path(day)), message)
605
606
607def journal_log(message: str) -> None:
608 """Append ``message`` to the journal's ``task_log.txt``."""
609 _append_task_log(get_journal(), message)
610
611
612def day_input_summary(day: str) -> str:
613 """Return a human-readable summary of recording data available for a day.
614
615 Uses cluster_segments() to detect recording segments and computes
616 total duration from segment keys (HHMMSS_LEN format).
617
618 Parameters
619 ----------
620 day:
621 Day in YYYYMMDD format.
622
623 Returns
624 -------
625 str
626 Human-readable summary like "No recordings", "Light activity: 2 segments,
627 ~3 minutes", or "18 segments, ~7.5 hours".
628 """
629 from think.cluster import cluster_segments
630
631 segments = cluster_segments(day)
632
633 if not segments:
634 return "No recordings"
635
636 # Compute total duration from segment keys (HHMMSS_LEN format)
637 total_seconds = 0
638 for seg in segments:
639 key = seg.get("key", "")
640 if "_" in key:
641 parts = key.split("_")
642 if len(parts) >= 2 and parts[1].isdigit():
643 total_seconds += int(parts[1])
644
645 # Format duration
646 if total_seconds < 60:
647 duration_str = f"~{total_seconds} seconds"
648 elif total_seconds < 3600:
649 minutes = total_seconds / 60
650 duration_str = f"~{minutes:.0f} minutes"
651 else:
652 hours = total_seconds / 3600
653 duration_str = f"~{hours:.1f} hours"
654
655 segment_count = len(segments)
656
657 # Categorize activity level
658 if segment_count < 5 or total_seconds < 1800: # < 5 segments or < 30 min
659 return f"Light activity: {segment_count} segment{'s' if segment_count != 1 else ''}, {duration_str}"
660 else:
661 return f"{segment_count} segments, {duration_str}"
662
663
664def setup_cli(parser: argparse.ArgumentParser, *, parse_known: bool = False):
665 """Parse command line arguments and configure logging.
666
667 The parser will be extended with ``-v``/``--verbose`` and ``-d``/``--debug`` flags.
668 The journal path is resolved via get_journal() which auto-creates a default path
669 if needed. Environment variables from the journal config's ``env`` section
670 (in ``journal.json``) are loaded as fallbacks for any keys not already set.
671 The parsed arguments are returned. If ``parse_known`` is ``True`` a tuple of
672 ``(args, extra)`` is returned using :func:`argparse.ArgumentParser.parse_known_args`.
673 """
674 parser.add_argument(
675 "-v", "--verbose", action="store_true", help="Enable verbose output"
676 )
677 parser.add_argument(
678 "-d", "--debug", action="store_true", help="Enable debug logging"
679 )
680 if parse_known:
681 args, extra = parser.parse_known_args()
682 else:
683 args = parser.parse_args()
684 extra = None
685
686 if args.debug:
687 log_level = logging.DEBUG
688 elif args.verbose:
689 log_level = logging.INFO
690 else:
691 log_level = logging.WARNING
692
693 logging.basicConfig(level=log_level)
694
695 # Initialize journal path (auto-creates if needed)
696 get_journal()
697
698 # Load config env from journal.json — strict source for API keys
699 config = get_config()
700 for key, value in config.get("env", {}).items():
701 os.environ[key] = str(value)
702
703 return (args, extra) if parse_known else args
704
705
706def parse_time_range(text: str) -> Optional[tuple[str, str, str]]:
707 """Return ``(day, start, end)`` from a natural language time range.
708
709 Parameters
710 ----------
711 text:
712 Natural language description of a time range.
713
714 Returns
715 -------
716 tuple[str, str, str] | None
717 ``(day, start, end)`` if a single range within one day was detected.
718 ``day`` is ``YYYYMMDD`` and ``start``/``end`` are ``HHMMSS``. ``None``
719 if parsing fails.
720 """
721
722 try:
723 result = timefhuman(text)
724 except Exception as exc: # pragma: no cover - unexpected library failure
725 logging.info("timefhuman failed for %s: %s", text, exc)
726 return None
727
728 logging.debug("timefhuman(%s) -> %r", text, result)
729
730 if len(result) != 1:
731 logging.info("timefhuman did not return a single expression for %s", text)
732 return None
733
734 range_item = result[0]
735 if not isinstance(range_item, tuple) or len(range_item) != 2:
736 logging.info("Expected a range from %s but got %r", text, range_item)
737 return None
738
739 start_dt, end_dt = range_item
740 if start_dt.date() != end_dt.date():
741 logging.info("Range must be within a single day: %s -> %s", start_dt, end_dt)
742 return None
743
744 day = start_dt.strftime("%Y%m%d")
745 start = start_dt.strftime("%H%M%S")
746 end = end_dt.strftime("%H%M%S")
747 return day, start, end
748
749
750def get_raw_file(day: str, name: str) -> tuple[str, str, Any]:
751 """Return raw file path, mime type and metadata for a transcript.
752
753 Parameters
754 ----------
755 day:
756 Day folder in ``YYYYMMDD`` format.
757 name:
758 Transcript filename such as ``HHMMSS/audio.jsonl``,
759 ``HHMMSS/monitor_1_diff.json``, or ``HHMMSS/screen.jsonl``.
760
761 Returns
762 -------
763 tuple[str, str, Any]
764 ``(path, mime_type, metadata)`` where ``path`` is relative to the day
765 directory (read from metadata header), ``mime_type`` is determined
766 from the raw file extension, and ``metadata`` contains the parsed
767 JSON data (empty on failure).
768 """
769
770 day_dir = day_path(day)
771 transcript_path = day_dir / name
772
773 rel = None
774 meta: Any = {}
775
776 try:
777 with open(transcript_path, "r", encoding="utf-8") as f:
778 if name.endswith(".jsonl"):
779 # First line is metadata header with "raw" field
780 first_line = f.readline().strip()
781 if first_line:
782 header = json.loads(first_line)
783 rel = header.get("raw")
784
785 # Read remaining lines as metadata
786 meta = [json.loads(line) for line in f if line.strip()]
787 else:
788 # Non-JSONL format (e.g., _diff.json)
789 meta = json.load(f)
790 rel = meta.get("raw")
791 except Exception: # pragma: no cover - optional metadata
792 logging.debug("Failed to read %s", transcript_path)
793
794 if not rel:
795 raise ValueError(f"No 'raw' field found in metadata for {name}")
796
797 suffix = Path(rel).suffix.lower()
798 mime = {**MIME_TYPES, ".png": "image/png"}.get(suffix, "application/octet-stream")
799
800 return rel, mime, meta
801
802
803# =============================================================================
804# SOL_* Environment Variable Helpers
805# =============================================================================
806
807
808def get_sol_day() -> str | None:
809 """Read SOL_DAY from the environment."""
810 return os.environ.get("SOL_DAY") or None
811
812
813def get_sol_facet() -> str | None:
814 """Read SOL_FACET from the environment."""
815 return os.environ.get("SOL_FACET") or None
816
817
818def get_sol_segment() -> str | None:
819 """Read SOL_SEGMENT from the environment."""
820 return os.environ.get("SOL_SEGMENT") or None
821
822
823def get_sol_stream() -> str | None:
824 """Read SOL_STREAM from the environment."""
825 return os.environ.get("SOL_STREAM") or None
826
827
828def get_sol_activity() -> str | None:
829 """Read SOL_ACTIVITY from the environment."""
830 return os.environ.get("SOL_ACTIVITY") or None
831
832
833def resolve_sol_day(arg: str | None) -> str:
834 """Return *arg* if provided, else SOL_DAY from env, else exit with error.
835
836 Intended for CLI commands where ``day`` is required but can be supplied
837 via the SOL_DAY environment variable as a convenience.
838 """
839 if arg:
840 return arg
841 env = get_sol_day()
842 if env:
843 return env
844 import typer
845
846 typer.echo("Error: day is required (pass as argument or set SOL_DAY).", err=True)
847 raise typer.Exit(1)
848
849
850def resolve_sol_facet(arg: str | None) -> str:
851 """Return *arg* if provided, else SOL_FACET from env, else exit with error.
852
853 Intended for CLI commands where ``facet`` is required but can be supplied
854 via the SOL_FACET environment variable as a convenience.
855 """
856 if arg:
857 return arg
858 env = get_sol_facet()
859 if env:
860 return env
861 import typer
862
863 typer.echo(
864 "Error: facet is required (pass as argument or set SOL_FACET).", err=True
865 )
866 raise typer.Exit(1)
867
868
869def resolve_sol_segment(arg: str | None) -> str | None:
870 """Return *arg* if provided, else SOL_SEGMENT from env, else None.
871
872 Unlike :func:`resolve_sol_day` this does **not** error when missing
873 because segment is typically optional.
874 """
875 if arg:
876 return arg
877 return get_sol_segment()
878
879
880# =============================================================================
881# Service Port Discovery
882# =============================================================================
883
884
885def find_available_port(host: str = "127.0.0.1") -> int:
886 """Find an available port by binding to port 0.
887
888 Uses the socket bind/getsockname/close pattern to let the OS assign
889 an available port.
890
891 Args:
892 host: Host address to bind to (default: 127.0.0.1)
893
894 Returns:
895 Available port number
896 """
897 import socket
898
899 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
900 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
901 sock.bind((host, 0))
902 _, port = sock.getsockname()
903 sock.close()
904 return port
905
906
907def write_service_port(service: str, port: int) -> None:
908 """Write a service's port to the health directory.
909
910 Creates journal/health/{service}.port with the port number.
911
912 Args:
913 service: Service name (e.g., "convey", "cortex")
914 port: Port number to write
915 """
916 health_dir = Path(get_journal()) / "health"
917 health_dir.mkdir(parents=True, exist_ok=True)
918 port_file = health_dir / f"{service}.port"
919 port_file.write_text(str(port))
920
921
922def read_service_port(service: str) -> int | None:
923 """Read a service's port from the health directory.
924
925 Args:
926 service: Service name (e.g., "convey", "cortex")
927
928 Returns:
929 Port number if file exists and is valid, None otherwise
930 """
931 port_file = Path(get_journal()) / "health" / f"{service}.port"
932 try:
933 return int(port_file.read_text().strip())
934 except (FileNotFoundError, ValueError):
935 return None
936
937
938def is_solstone_up(timeout: float = 0.2) -> bool:
939 """Return True if convey is accepting TCP connections on its recorded port."""
940 port = read_service_port("convey")
941 if port is None:
942 return False
943 try:
944 with socket.create_connection(("127.0.0.1", port), timeout=timeout):
945 return True
946 except OSError:
947 return False
948
949
950def require_solstone() -> None:
951 """Exit(1) with a clear message if solstone's stack isn't running."""
952 if os.environ.get("SOL_SKIP_SUPERVISOR_CHECK") == "1":
953 return
954 if is_solstone_up():
955 return
956 if os.environ.get("SOL_SUPERVISOR_SPAWNED") == "1":
957 sys.exit(EXIT_TEMPFAIL)
958 print(
959 "sol: solstone isn't running. Start it with 'sol up' and retry.",
960 file=sys.stderr,
961 )
962 sys.exit(1)