personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""sol segment - segment inspection and management CLI."""
5
6from __future__ import annotations
7
8import argparse
9import json
10import os
11import re
12import shutil
13import sqlite3
14import sys
15from datetime import datetime, timedelta
16from pathlib import Path
17
18from think.streams import (
19 get_stream_state,
20 read_segment_stream,
21 rebuild_stream_state,
22 write_segment_stream,
23)
24from think.utils import (
25 day_dirs,
26 day_path,
27 get_journal,
28 iter_segments,
29 require_solstone,
30 segment_parse,
31 setup_cli,
32)
33
34
35def _find_segment_dir_readonly(
36 day: str, segment: str, stream: str | None
37) -> Path | None:
38 """Locate a segment directory without creating anything."""
39 day_dir = day_path(day, create=False)
40 if not day_dir.exists():
41 return None
42 if stream:
43 candidate = day_dir / stream / segment
44 return candidate if candidate.is_dir() else None
45 for _stream_name, _seg_key, seg_path in iter_segments(day):
46 if seg_path.name == segment:
47 return seg_path
48 return None
49
50
51def _format_size(size_bytes: int) -> str:
52 """Return a simple human-readable size string."""
53 if size_bytes >= 1_000_000:
54 return f"{size_bytes / 1_000_000:.1f}M"
55 if size_bytes >= 1_000:
56 return f"{size_bytes / 1_000:.1f}K"
57 return f"{size_bytes}B"
58
59
60def _segment_stats(seg_path: Path) -> dict[str, int]:
61 """Return recursive file, talent, and byte counts for a segment."""
62 files = 0
63 talents = 0
64 size = 0
65 for path in seg_path.rglob("*"):
66 if path.is_file():
67 files += 1
68 size += path.stat().st_size
69 if "talents" in path.parts:
70 talents += 1
71 return {"files": files, "talents": talents, "size": size}
72
73
74def _split_segment_path(path: str) -> tuple[str, str, str]:
75 """Parse day/stream/segment input."""
76 parts = path.split("/")
77 if len(parts) != 3:
78 print(
79 "Segment path must be day/stream/segment (e.g. 20260304/default/090000_300)",
80 file=sys.stderr,
81 )
82 raise SystemExit(1)
83 return parts[0], parts[1], parts[2]
84
85
86def _segment_duration(segment: str) -> int:
87 """Return the duration seconds from HHMMSS_LEN."""
88 return int(segment.split("_", 1)[1])
89
90
91def _segment_time_strings(seg_path: Path) -> tuple[str | None, str | None]:
92 """Return segment start/end strings if parseable."""
93 start_time, end_time = segment_parse(str(seg_path))
94 if start_time is None or end_time is None:
95 return None, None
96 return start_time.strftime("%H:%M:%S"), end_time.strftime("%H:%M:%S")
97
98
99def _next_day(day: str) -> str:
100 """Return the next YYYYMMDD day string."""
101 return (datetime.strptime(day, "%Y%m%d") + timedelta(days=1)).strftime("%Y%m%d")
102
103
104def _find_next_segment(
105 day: str, stream: str, segment: str
106) -> tuple[str | None, str | None]:
107 """Find the next segment in a stream, checking same day then next day."""
108 for scan_day in (day, _next_day(day)):
109 for stream_name, seg_key, seg_path in iter_segments(scan_day):
110 if stream_name != stream:
111 continue
112 marker = read_segment_stream(seg_path)
113 if not marker:
114 continue
115 if marker.get("stream") != stream:
116 continue
117 if marker.get("prev_day") != day:
118 continue
119 if marker.get("prev_segment") != segment:
120 continue
121 return scan_day, seg_key
122 return None, None
123
124
125def _find_successor_segment(
126 day: str, stream: str, segment: str
127) -> tuple[str | None, str | None, Path | None]:
128 """Find the segment whose stream.json points back to day/segment.
129
130 Unlike _find_next_segment which only checks 2 days, this scans
131 all days in the journal. The successor can be on any day because
132 prior cross-day moves may have reordered the chain.
133
134 Returns (day, segment_key, segment_path) or (None, None, None).
135 """
136 for scan_day in sorted(day_dirs().keys()):
137 for stream_name, seg_key, seg_path in iter_segments(scan_day):
138 if stream_name != stream:
139 continue
140 marker = read_segment_stream(seg_path)
141 if not marker:
142 continue
143 if marker.get("stream") != stream:
144 continue
145 if marker.get("prev_day") == day and marker.get("prev_segment") == segment:
146 return scan_day, seg_key, seg_path
147 return None, None, None
148
149
150def _is_stream_tail(day: str, stream: str, segment: str) -> bool:
151 """Return True if stream state marks this segment as the current tail."""
152 state = get_stream_state(stream)
153 if state is None:
154 return False
155 return state.get("last_day") == day and state.get("last_segment") == segment
156
157
158def _segment_files(seg_dir: Path) -> list[str]:
159 """Return top-level file names within a segment directory."""
160 return sorted(path.name for path in seg_dir.iterdir() if path.is_file())
161
162
163def _agent_files(seg_dir: Path) -> list[str]:
164 """Return top-level file names from talents/ if present."""
165 talents_dir = seg_dir / "talents"
166 if not talents_dir.is_dir():
167 return []
168 return sorted(path.name for path in talents_dir.iterdir() if path.is_file())
169
170
171def _events_summary(seg_dir: Path) -> dict[str, object]:
172 """Return count and unique tracts for events.jsonl."""
173 events_path = seg_dir / "events.jsonl"
174 if not events_path.exists():
175 return {"entries": 0, "tracts": []}
176
177 entries = 0
178 tracts: set[str] = set()
179 with open(events_path, "r", encoding="utf-8") as handle:
180 for line in handle:
181 line = line.strip()
182 if not line:
183 continue
184 entries += 1
185 try:
186 payload = json.loads(line)
187 except json.JSONDecodeError:
188 continue
189 tract = payload.get("tract")
190 if isinstance(tract, str):
191 tracts.add(tract)
192
193 return {"entries": entries, "tracts": sorted(tracts)}
194
195
196def _segment_index_info(day: str, stream: str, segment: str) -> dict[str, int | bool]:
197 """Return journal index presence for a segment."""
198 db_path = Path(get_journal()) / "indexer" / "journal.sqlite"
199 if not db_path.exists():
200 return {"available": False, "indexed": False, "chunks": 0}
201
202 rel_path = f"{day}/{stream}/{segment}"
203 try:
204 conn = sqlite3.connect(db_path)
205 try:
206 indexed = bool(
207 conn.execute(
208 "SELECT 1 FROM chunks WHERE path = ? LIMIT 1",
209 (rel_path,),
210 ).fetchone()
211 )
212 chunk_count = conn.execute(
213 "SELECT count(*) FROM chunks WHERE path = ? OR path LIKE ?",
214 (rel_path, f"{rel_path}/%"),
215 ).fetchone()[0]
216 finally:
217 conn.close()
218 except sqlite3.Error:
219 return {"available": False, "indexed": False, "chunks": 0}
220
221 return {"available": True, "indexed": indexed, "chunks": int(chunk_count)}
222
223
224def _describe_prev(day: str, stream: str, marker: dict | None) -> str:
225 """Return formatted previous-chain description."""
226 if not marker or not marker.get("prev_segment"):
227 return "(none)"
228
229 prev_day = marker.get("prev_day") or day
230 prev_segment = marker["prev_segment"]
231 prev_dir = _find_segment_dir_readonly(prev_day, prev_segment, stream)
232 prev_path = f"{prev_day}/{stream}/{prev_segment}"
233 if prev_dir is None:
234 return f"{prev_path} [MISSING]"
235 return prev_path
236
237
238def _describe_next(day: str, stream: str, segment: str) -> str:
239 """Return formatted forward-chain description."""
240 next_day, next_segment = _find_next_segment(day, stream, segment)
241 if next_day and next_segment:
242 return f"{next_day}/{stream}/{next_segment}"
243 if _is_stream_tail(day, stream, segment):
244 return "(tail)"
245 return "(none)"
246
247
248def _check_directory(seg_dir: Path | None) -> tuple[bool, str]:
249 """Verify the segment directory exists."""
250 if seg_dir is not None and seg_dir.is_dir():
251 return True, "directory exists"
252 return False, "directory missing"
253
254
255def _check_stream_json(seg_dir: Path | None) -> tuple[bool, str]:
256 """Verify stream.json exists."""
257 if seg_dir is None:
258 return False, "stream.json missing"
259 if (seg_dir / "stream.json").is_file():
260 return True, "stream.json exists"
261 return False, "stream.json missing"
262
263
264def _check_stream_json_valid(seg_dir: Path | None) -> tuple[bool, str]:
265 """Verify stream.json is valid JSON with a stream field."""
266 if seg_dir is None:
267 return False, "stream.json missing"
268
269 path = seg_dir / "stream.json"
270 if not path.exists():
271 return False, "stream.json missing"
272
273 try:
274 with open(path, "r", encoding="utf-8") as handle:
275 payload = json.load(handle)
276 except (json.JSONDecodeError, OSError):
277 return False, "stream.json invalid JSON"
278
279 if payload.get("stream"):
280 return True, "stream.json valid"
281 return False, "stream.json missing stream field"
282
283
284def _check_content_files(seg_dir: Path | None) -> tuple[bool, str]:
285 """Verify transcript content files exist."""
286 if seg_dir is None:
287 return False, "segment directory missing"
288
289 if (seg_dir / "audio.jsonl").exists() or (seg_dir / "screen.jsonl").exists():
290 return True, "content files present"
291 return False, "no audio.jsonl or screen.jsonl"
292
293
294def _check_backward_chain(
295 day: str, stream: str, marker: dict | None
296) -> tuple[bool, str]:
297 """Verify backward chain integrity."""
298 if not marker or not marker.get("prev_segment"):
299 return True, "no previous segment"
300
301 prev_day = marker.get("prev_day")
302 prev_segment = marker.get("prev_segment")
303 if not prev_day or not prev_segment:
304 return False, "prev_segment set without prev_day"
305
306 prev_dir = _find_segment_dir_readonly(prev_day, prev_segment, stream)
307 if prev_dir is not None:
308 return True, "previous segment found"
309 return False, f"missing previous segment {prev_day}/{stream}/{prev_segment}"
310
311
312def _check_forward_chain(day: str, stream: str, segment: str) -> tuple[bool, str]:
313 """Verify forward chain integrity."""
314 next_day, next_segment = _find_next_segment(day, stream, segment)
315 if next_day and next_segment:
316 return True, f"next segment {next_day}/{stream}/{next_segment}"
317 if _is_stream_tail(day, stream, segment):
318 return True, "stream tail"
319 return False, "next segment not found, not stream tail"
320
321
322def _check_index_presence(day: str, stream: str, segment: str) -> tuple[bool, str]:
323 """Verify the segment has an index entry when a DB is available."""
324 info = _segment_index_info(day, stream, segment)
325 if not info["available"]:
326 return True, "journal index not available"
327 if info["indexed"]:
328 return True, "segment indexed"
329 return False, "segment not indexed"
330
331
332def _run_checks(day: str, stream: str, segment: str) -> list[dict[str, object]]:
333 """Run all segment verification checks."""
334 seg_dir = _find_segment_dir_readonly(day, segment, stream)
335 marker = read_segment_stream(seg_dir) if seg_dir is not None else None
336
337 checks = [
338 ("directory exists", _check_directory(seg_dir)),
339 ("stream.json exists", _check_stream_json(seg_dir)),
340 ("stream.json valid", _check_stream_json_valid(seg_dir)),
341 ("content files present", _check_content_files(seg_dir)),
342 ("backward chain", _check_backward_chain(day, stream, marker)),
343 ("forward chain", _check_forward_chain(day, stream, segment)),
344 ("index presence", _check_index_presence(day, stream, segment)),
345 ]
346
347 return [
348 {"check": name, "passed": passed, "detail": detail}
349 for name, (passed, detail) in checks
350 ]
351
352
353def _rewrite_events_jsonl(seg_dir: Path, new_day: str, new_segment: str) -> int:
354 """Rewrite events.jsonl to update day and segment fields.
355
356 Returns number of lines rewritten.
357 """
358 events_path = seg_dir / "events.jsonl"
359 if not events_path.exists():
360 return 0
361
362 lines = events_path.read_text(encoding="utf-8").splitlines()
363 rewritten = []
364 count = 0
365 for line in lines:
366 stripped = line.strip()
367 if not stripped:
368 rewritten.append(line)
369 continue
370 try:
371 obj = json.loads(stripped)
372 obj["day"] = new_day
373 obj["segment"] = new_segment
374 rewritten.append(json.dumps(obj, ensure_ascii=False))
375 count += 1
376 except (json.JSONDecodeError, TypeError):
377 rewritten.append(line)
378
379 tmp = events_path.with_suffix(".tmp")
380 tmp.write_text("\n".join(rewritten) + "\n" if rewritten else "", encoding="utf-8")
381 os.rename(str(tmp), str(events_path))
382 return count
383
384
385def _touch_health_marker(day: str) -> None:
386 """Touch health/stream.updated for a day."""
387 health_dir = day_path(day) / "health"
388 health_dir.mkdir(parents=True, exist_ok=True)
389 (health_dir / "stream.updated").touch()
390
391
392def _delete_index_rows(journal: str, rel_path: str) -> dict[str, int]:
393 """Delete all index rows referencing a segment path.
394
395 Returns counts of deleted rows per table.
396 """
397 db_path = Path(journal) / "indexer" / "journal.sqlite"
398 if not db_path.exists():
399 return {"chunks": 0, "files": 0, "entities": 0, "entity_signals": 0}
400
401 try:
402 conn = sqlite3.connect(db_path)
403 try:
404 cur = conn.execute(
405 "DELETE FROM chunks WHERE path = ? OR path LIKE ?",
406 (rel_path, f"{rel_path}/%"),
407 )
408 chunks_deleted = cur.rowcount
409
410 cur = conn.execute(
411 "DELETE FROM files WHERE path LIKE ?",
412 (f"{rel_path}/%",),
413 )
414 files_deleted = cur.rowcount
415
416 cur = conn.execute(
417 "DELETE FROM entities WHERE path LIKE ?",
418 (f"{rel_path}/%",),
419 )
420 entities_deleted = cur.rowcount
421
422 cur = conn.execute(
423 "DELETE FROM entity_signals WHERE path LIKE ?",
424 (f"{rel_path}/%",),
425 )
426 signals_deleted = cur.rowcount
427
428 conn.commit()
429 finally:
430 conn.close()
431 except sqlite3.Error:
432 return {"chunks": 0, "files": 0, "entities": 0, "entity_signals": 0}
433
434 return {
435 "chunks": chunks_deleted,
436 "files": files_deleted,
437 "entities": entities_deleted,
438 "entity_signals": signals_deleted,
439 }
440
441
442def _reindex_segment(journal: str, seg_dir: Path) -> int:
443 """Re-index all formattable files in a segment directory.
444
445 Returns the number of files indexed.
446 """
447 from think.indexer.journal import index_file
448
449 count = 0
450 for path in sorted(seg_dir.rglob("*")):
451 if not path.is_file():
452 continue
453 try:
454 if index_file(journal, str(path)):
455 count += 1
456 except (ValueError, FileNotFoundError):
457 continue
458 return count
459
460
461def cmd_move(args: argparse.Namespace) -> None:
462 """Move a segment to a different day/time."""
463 src_day, stream, src_segment = _split_segment_path(args.path)
464
465 src_dir = _find_segment_dir_readonly(src_day, src_segment, stream)
466 if src_dir is None:
467 print(f"Segment not found: {args.path}", file=sys.stderr)
468 raise SystemExit(1)
469
470 to_day = args.to_day
471 if not re.fullmatch(r"\d{8}", to_day):
472 print(f"Invalid --to-day format: {to_day} (expected YYYYMMDD)", file=sys.stderr)
473 raise SystemExit(1)
474
475 if args.to_time:
476 if not re.fullmatch(r"\d{6}", args.to_time):
477 print(
478 f"Invalid --to-time format: {args.to_time} (expected HHMMSS)",
479 file=sys.stderr,
480 )
481 raise SystemExit(1)
482 duration = src_segment.split("_", 1)[1]
483 new_segment = f"{args.to_time}_{duration}"
484 else:
485 new_segment = src_segment
486
487 if to_day == src_day and new_segment == src_segment:
488 print("Source and destination are the same", file=sys.stderr)
489 raise SystemExit(1)
490
491 dst_parent = day_path(to_day, create=False) / stream
492 dst_dir = dst_parent / new_segment
493 if dst_dir.exists():
494 if args.to_time:
495 from observe.utils import find_available_segment
496
497 avail = find_available_segment(dst_parent, new_segment)
498 if avail is None:
499 print(
500 f"No available segment slot near {new_segment} on {to_day}",
501 file=sys.stderr,
502 )
503 raise SystemExit(1)
504 new_segment = avail
505 dst_dir = dst_parent / new_segment
506 else:
507 print(
508 f"Segment {new_segment} already exists on {to_day}/{stream}. "
509 f"Use --to-time to specify an alternate time.",
510 file=sys.stderr,
511 )
512 raise SystemExit(1)
513
514 marker = read_segment_stream(src_dir)
515 if not marker:
516 print("No stream.json in source segment", file=sys.stderr)
517 raise SystemExit(1)
518 if marker.get("stream") != stream:
519 print(
520 f"Stream mismatch: path says '{stream}' but stream.json says '{marker.get('stream')}'",
521 file=sys.stderr,
522 )
523 raise SystemExit(1)
524
525 succ_day, succ_seg, succ_path = _find_successor_segment(
526 src_day, stream, src_segment
527 )
528
529 events_path = src_dir / "events.jsonl"
530 events_count = 0
531 if events_path.exists():
532 with open(events_path, encoding="utf-8") as handle:
533 events_count = sum(1 for line in handle if line.strip())
534
535 journal = get_journal()
536 old_rel = f"{src_day}/{stream}/{src_segment}"
537 index_info = _segment_index_info(src_day, stream, src_segment)
538
539 print(f"Move: {src_day}/{stream}/{src_segment} -> {to_day}/{stream}/{new_segment}")
540 print(f" events.jsonl lines: {events_count}")
541 if succ_day:
542 print(f" successor to patch: {succ_day}/{stream}/{succ_seg}")
543 else:
544 print(" successor to patch: (none - stream tail)")
545 if index_info["available"]:
546 print(f" index chunks: {index_info['chunks']}")
547 print(f" health markers: {src_day}, {to_day}")
548
549 if args.dry_run:
550 print("\n[dry run] No changes made")
551 return
552
553 verbose = getattr(args, "verbose", False)
554
555 print("\nExecuting move...")
556 dst_parent.mkdir(parents=True, exist_ok=True)
557 if verbose:
558 print(f" created directory: {dst_parent}")
559 shutil.move(str(src_dir), str(dst_dir))
560 print(f" moved directory: {src_dir.name} -> {dst_dir}")
561
562 rewritten = _rewrite_events_jsonl(dst_dir, to_day, new_segment)
563 if rewritten:
564 print(
565 f" rewrote {rewritten} events.jsonl lines (day: {src_day}->{to_day}, segment: {src_segment}->{new_segment})"
566 )
567 elif verbose:
568 print(" no events.jsonl to rewrite")
569
570 if succ_path:
571 succ_marker = read_segment_stream(succ_path)
572 if succ_marker:
573 write_segment_stream(
574 succ_path,
575 succ_marker["stream"],
576 to_day,
577 new_segment,
578 succ_marker["seq"],
579 )
580 print(f" patched successor {succ_day}/{stream}/{succ_seg}")
581 if verbose:
582 print(f" prev_day: {succ_marker.get('prev_day')} -> {to_day}")
583 print(
584 f" prev_segment: {succ_marker.get('prev_segment')} -> {new_segment}"
585 )
586 elif verbose:
587 print(" no successor to patch (stream tail)")
588
589 summary = rebuild_stream_state(stream)
590 print(f" rebuilt stream state: {stream}")
591 if verbose:
592 print(
593 f" scanned {summary['segments_scanned']} segments, rebuilt {len(summary['rebuilt'])} stream(s)"
594 )
595
596 if index_info["available"]:
597 deleted = _delete_index_rows(journal, old_rel)
598 if any(deleted.values()) or verbose:
599 print(
600 f" deleted index rows: chunks={deleted['chunks']}, files={deleted['files']}, entities={deleted['entities']}, signals={deleted['entity_signals']}"
601 )
602 new_rel = f"{to_day}/{stream}/{new_segment}"
603 indexed = _reindex_segment(journal, dst_dir)
604 print(f" re-indexed: {indexed} files at {new_rel}")
605 elif verbose:
606 print(" index not available, skipping reindex")
607
608 _touch_health_marker(src_day)
609 _touch_health_marker(to_day)
610 print(f" touched health markers: {src_day}, {to_day}")
611 if verbose:
612 print(" think will re-run daily talents on both days")
613
614 # Post-move verify is informational — the move already completed.
615 print()
616 results = _run_checks(to_day, stream, new_segment)
617 _print_check_results(results)
618 passed = sum(1 for result in results if result["passed"])
619 print(f"\n{passed}/{len(results)} checks passed")
620
621
622def cmd_list(args: argparse.Namespace) -> None:
623 """List segments for a day."""
624 segments = iter_segments(args.day)
625 if args.stream:
626 segments = [entry for entry in segments if entry[0] == args.stream]
627
628 if not segments:
629 print(f"No segments found for {args.day}")
630 return
631
632 rows = []
633 for stream_name, seg_key, seg_path in segments:
634 start, end = _segment_time_strings(seg_path)
635 stats = _segment_stats(seg_path)
636 rows.append(
637 {
638 "stream": stream_name,
639 "segment": seg_key,
640 "start": start,
641 "end": end,
642 "duration": _segment_duration(seg_key),
643 "files": stats["files"],
644 "talents": stats["talents"],
645 "size": stats["size"],
646 }
647 )
648
649 if args.json_output:
650 print(json.dumps(rows, indent=2))
651 return
652
653 print(
654 f"{'STREAM':<20} {'SEGMENT':<14} {'TIME':<15} "
655 f"{'DUR':>5} {'FILES':>5} {'TALENTS':>7} {'SIZE':>8}"
656 )
657 print("-" * 78)
658 for row in rows:
659 time_str = (
660 f"{row['start']}-{row['end']}"
661 if row["start"] is not None and row["end"] is not None
662 else "?"
663 )
664 dur_str = f"{row['duration']}s"
665 print(
666 f"{row['stream']:<20} {row['segment']:<14} {time_str:<15} "
667 f"{dur_str:>5} {row['files']:>5} {row['talents']:>7} "
668 f"{_format_size(int(row['size'])):>8}"
669 )
670
671
672def cmd_inspect(args: argparse.Namespace) -> None:
673 """Inspect one segment."""
674 day, stream, segment = _split_segment_path(args.path)
675 seg_dir = _find_segment_dir_readonly(day, segment, stream)
676 if seg_dir is None:
677 print(f"Segment not found: {args.path}", file=sys.stderr)
678 raise SystemExit(1)
679
680 marker = read_segment_stream(seg_dir) or {}
681 stream_name = marker.get("stream") or stream
682 start, end = _segment_time_strings(seg_dir)
683 duration = _segment_duration(segment)
684 prev_desc = _describe_prev(day, stream_name, marker)
685 next_desc = _describe_next(day, stream_name, segment)
686 files = _segment_files(seg_dir)
687 talents = _agent_files(seg_dir)
688 stats = _segment_stats(seg_dir)
689 events = _events_summary(seg_dir)
690 index_info = _segment_index_info(day, stream_name, segment)
691
692 payload = {
693 "path": f"{day}/{stream}/{segment}",
694 "stream": stream_name,
695 "segment": segment,
696 "seq": marker.get("seq"),
697 "prev_day": marker.get("prev_day"),
698 "prev_segment": marker.get("prev_segment"),
699 "start": start,
700 "end": end,
701 "duration": duration,
702 "chain": {"prev": prev_desc, "next": next_desc},
703 "files": files,
704 "talents": talents,
705 "stats": stats,
706 "events": events,
707 "index": index_info,
708 }
709
710 if args.json_output:
711 print(json.dumps(payload, indent=2))
712 return
713
714 time_range = "?"
715 if start is not None and end is not None:
716 time_range = f"{start} - {end}"
717
718 print(f"Segment: {day}/{stream}/{segment}")
719 if marker.get("seq") is not None:
720 print(f"Stream: {stream_name} (seq {marker['seq']})")
721 else:
722 print(f"Stream: {stream_name}")
723 print(f"Time: {time_range} ({duration}s)")
724 print()
725 print("Chain:")
726 print(f" prev: {prev_desc}")
727 print(f" next: {next_desc}")
728 print()
729 print(f"Files ({len(files)}):")
730 if files:
731 print(f" {', '.join(files)}")
732 print()
733 print(f"Talents ({len(talents)}):")
734 if talents:
735 print(f" {', '.join(talents)}")
736 print()
737 print(f"Size: {_format_size(stats['size'])}")
738 if index_info["available"]:
739 if index_info["indexed"]:
740 print(f"Index: indexed ({index_info['chunks']} chunks)")
741 else:
742 print("Index: not-indexed")
743 else:
744 print("Index: unavailable")
745 tracts = ", ".join(events["tracts"])
746 if tracts:
747 print(f"Events: {events['entries']} entries ({tracts})")
748 else:
749 print(f"Events: {events['entries']} entries")
750
751
752def _print_check_results(results: list[dict[str, object]]) -> None:
753 """Print PASS/FAIL lines for verify output."""
754 for result in results:
755 status = "PASS" if result["passed"] else "FAIL"
756 detail = str(result["detail"])
757 if result["passed"]:
758 print(f"{status:<5} {result['check']}")
759 else:
760 print(f"{status:<5} {result['check']}: {detail}")
761
762
763def cmd_verify(args: argparse.Namespace) -> None:
764 """Verify one segment or all segments for a day."""
765 if args.path:
766 day, stream, segment = _split_segment_path(args.path)
767 results = _run_checks(day, stream, segment)
768 if args.json_output:
769 print(json.dumps(results, indent=2))
770 else:
771 _print_check_results(results)
772 passed = sum(1 for result in results if result["passed"])
773 print()
774 print(f"{passed}/{len(results)} checks passed")
775 raise SystemExit(0 if all(result["passed"] for result in results) else 1)
776
777 if not args.day:
778 print("verify requires a segment path or --day", file=sys.stderr)
779 raise SystemExit(1)
780
781 segments = iter_segments(args.day)
782 if not segments:
783 print(f"No segments found for {args.day}", file=sys.stderr)
784 raise SystemExit(1)
785
786 all_results: dict[str, list[dict[str, object]]] = {}
787 total_passed = 0
788 total_failed = 0
789
790 for stream_name, seg_key, _seg_path in segments:
791 seg_id = f"{args.day}/{stream_name}/{seg_key}"
792 results = _run_checks(args.day, stream_name, seg_key)
793 all_results[seg_id] = results
794 total_passed += sum(1 for result in results if result["passed"])
795 total_failed += sum(1 for result in results if not result["passed"])
796
797 if args.json_output:
798 print(
799 json.dumps(
800 {
801 "segments": all_results,
802 "summary": {"passed": total_passed, "failed": total_failed},
803 },
804 indent=2,
805 )
806 )
807 else:
808 for seg_id, results in all_results.items():
809 print(f"--- {seg_id} ---")
810 _print_check_results(results)
811 print()
812 print(f"Summary: {total_passed}/{total_passed + total_failed} checks passed")
813
814 raise SystemExit(0 if total_failed == 0 else 1)
815
816
817def main() -> None:
818 """CLI entry point for sol segment."""
819 parser = argparse.ArgumentParser(
820 description="Inspect and manage journal segments",
821 usage="sol segment <command> [options]",
822 )
823 sub = parser.add_subparsers(dest="subcommand")
824
825 p_list = sub.add_parser("list", help="List segments for a day")
826 p_list.add_argument("day", help="Day in YYYYMMDD format")
827 p_list.add_argument("--stream", help="Filter to a specific stream")
828 p_list.add_argument(
829 "--json", dest="json_output", action="store_true", help="Output as JSON"
830 )
831
832 p_inspect = sub.add_parser("inspect", help="Show segment metadata")
833 p_inspect.add_argument(
834 "path",
835 help="Segment path: day/stream/segment (e.g. 20260304/default/090000_300)",
836 )
837 p_inspect.add_argument(
838 "--json", dest="json_output", action="store_true", help="Output as JSON"
839 )
840
841 p_verify = sub.add_parser("verify", help="Verify segment integrity")
842 p_verify.add_argument("path", nargs="?", help="Segment path: day/stream/segment")
843 p_verify.add_argument("--day", help="Verify all segments for a day")
844 p_verify.add_argument(
845 "--json", dest="json_output", action="store_true", help="Output as JSON"
846 )
847
848 p_move = sub.add_parser("move", help="Move segment to a different day/time")
849 p_move.add_argument(
850 "path",
851 help="Segment path: day/stream/segment (e.g. 20260304/default/090000_300)",
852 )
853 p_move.add_argument("--to-day", required=True, help="Destination day (YYYYMMDD)")
854 p_move.add_argument(
855 "--to-time", help="New time (HHMMSS), preserving original duration"
856 )
857 p_move.add_argument(
858 "--dry-run", action="store_true", help="Show plan without making changes"
859 )
860 p_move.add_argument(
861 "-v", "--verbose", action="store_true", help="Show detailed progress"
862 )
863
864 args = setup_cli(parser)
865 require_solstone()
866
867 if args.subcommand is None:
868 parser.print_help()
869 raise SystemExit(1)
870
871 if args.subcommand == "list":
872 cmd_list(args)
873 elif args.subcommand == "inspect":
874 cmd_inspect(args)
875 elif args.subcommand == "verify":
876 cmd_verify(args)
877 elif args.subcommand == "move":
878 cmd_move(args)