personal memory agent
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at d18a7c02359cd827d0ff15058861de5c2600a96f 878 lines 30 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""sol segment - segment inspection and management CLI.""" 5 6from __future__ import annotations 7 8import argparse 9import json 10import os 11import re 12import shutil 13import sqlite3 14import sys 15from datetime import datetime, timedelta 16from pathlib import Path 17 18from think.streams import ( 19 get_stream_state, 20 read_segment_stream, 21 rebuild_stream_state, 22 write_segment_stream, 23) 24from think.utils import ( 25 day_dirs, 26 day_path, 27 get_journal, 28 iter_segments, 29 require_solstone, 30 segment_parse, 31 setup_cli, 32) 33 34 35def _find_segment_dir_readonly( 36 day: str, segment: str, stream: str | None 37) -> Path | None: 38 """Locate a segment directory without creating anything.""" 39 day_dir = day_path(day, create=False) 40 if not day_dir.exists(): 41 return None 42 if stream: 43 candidate = day_dir / stream / segment 44 return candidate if candidate.is_dir() else None 45 for _stream_name, _seg_key, seg_path in iter_segments(day): 46 if seg_path.name == segment: 47 return seg_path 48 return None 49 50 51def _format_size(size_bytes: int) -> str: 52 """Return a simple human-readable size string.""" 53 if size_bytes >= 1_000_000: 54 return f"{size_bytes / 1_000_000:.1f}M" 55 if size_bytes >= 1_000: 56 return f"{size_bytes / 1_000:.1f}K" 57 return f"{size_bytes}B" 58 59 60def _segment_stats(seg_path: Path) -> dict[str, int]: 61 """Return recursive file, talent, and byte counts for a segment.""" 62 files = 0 63 talents = 0 64 size = 0 65 for path in seg_path.rglob("*"): 66 if path.is_file(): 67 files += 1 68 size += path.stat().st_size 69 if "talents" in path.parts: 70 talents += 1 71 return {"files": files, "talents": talents, "size": size} 72 73 74def _split_segment_path(path: str) -> tuple[str, str, str]: 75 """Parse day/stream/segment input.""" 76 parts = path.split("/") 77 if len(parts) != 3: 78 print( 79 "Segment path must be day/stream/segment (e.g. 20260304/default/090000_300)", 80 file=sys.stderr, 81 ) 82 raise SystemExit(1) 83 return parts[0], parts[1], parts[2] 84 85 86def _segment_duration(segment: str) -> int: 87 """Return the duration seconds from HHMMSS_LEN.""" 88 return int(segment.split("_", 1)[1]) 89 90 91def _segment_time_strings(seg_path: Path) -> tuple[str | None, str | None]: 92 """Return segment start/end strings if parseable.""" 93 start_time, end_time = segment_parse(str(seg_path)) 94 if start_time is None or end_time is None: 95 return None, None 96 return start_time.strftime("%H:%M:%S"), end_time.strftime("%H:%M:%S") 97 98 99def _next_day(day: str) -> str: 100 """Return the next YYYYMMDD day string.""" 101 return (datetime.strptime(day, "%Y%m%d") + timedelta(days=1)).strftime("%Y%m%d") 102 103 104def _find_next_segment( 105 day: str, stream: str, segment: str 106) -> tuple[str | None, str | None]: 107 """Find the next segment in a stream, checking same day then next day.""" 108 for scan_day in (day, _next_day(day)): 109 for stream_name, seg_key, seg_path in iter_segments(scan_day): 110 if stream_name != stream: 111 continue 112 marker = read_segment_stream(seg_path) 113 if not marker: 114 continue 115 if marker.get("stream") != stream: 116 continue 117 if marker.get("prev_day") != day: 118 continue 119 if marker.get("prev_segment") != segment: 120 continue 121 return scan_day, seg_key 122 return None, None 123 124 125def _find_successor_segment( 126 day: str, stream: str, segment: str 127) -> tuple[str | None, str | None, Path | None]: 128 """Find the segment whose stream.json points back to day/segment. 129 130 Unlike _find_next_segment which only checks 2 days, this scans 131 all days in the journal. The successor can be on any day because 132 prior cross-day moves may have reordered the chain. 133 134 Returns (day, segment_key, segment_path) or (None, None, None). 135 """ 136 for scan_day in sorted(day_dirs().keys()): 137 for stream_name, seg_key, seg_path in iter_segments(scan_day): 138 if stream_name != stream: 139 continue 140 marker = read_segment_stream(seg_path) 141 if not marker: 142 continue 143 if marker.get("stream") != stream: 144 continue 145 if marker.get("prev_day") == day and marker.get("prev_segment") == segment: 146 return scan_day, seg_key, seg_path 147 return None, None, None 148 149 150def _is_stream_tail(day: str, stream: str, segment: str) -> bool: 151 """Return True if stream state marks this segment as the current tail.""" 152 state = get_stream_state(stream) 153 if state is None: 154 return False 155 return state.get("last_day") == day and state.get("last_segment") == segment 156 157 158def _segment_files(seg_dir: Path) -> list[str]: 159 """Return top-level file names within a segment directory.""" 160 return sorted(path.name for path in seg_dir.iterdir() if path.is_file()) 161 162 163def _agent_files(seg_dir: Path) -> list[str]: 164 """Return top-level file names from talents/ if present.""" 165 talents_dir = seg_dir / "talents" 166 if not talents_dir.is_dir(): 167 return [] 168 return sorted(path.name for path in talents_dir.iterdir() if path.is_file()) 169 170 171def _events_summary(seg_dir: Path) -> dict[str, object]: 172 """Return count and unique tracts for events.jsonl.""" 173 events_path = seg_dir / "events.jsonl" 174 if not events_path.exists(): 175 return {"entries": 0, "tracts": []} 176 177 entries = 0 178 tracts: set[str] = set() 179 with open(events_path, "r", encoding="utf-8") as handle: 180 for line in handle: 181 line = line.strip() 182 if not line: 183 continue 184 entries += 1 185 try: 186 payload = json.loads(line) 187 except json.JSONDecodeError: 188 continue 189 tract = payload.get("tract") 190 if isinstance(tract, str): 191 tracts.add(tract) 192 193 return {"entries": entries, "tracts": sorted(tracts)} 194 195 196def _segment_index_info(day: str, stream: str, segment: str) -> dict[str, int | bool]: 197 """Return journal index presence for a segment.""" 198 db_path = Path(get_journal()) / "indexer" / "journal.sqlite" 199 if not db_path.exists(): 200 return {"available": False, "indexed": False, "chunks": 0} 201 202 rel_path = f"{day}/{stream}/{segment}" 203 try: 204 conn = sqlite3.connect(db_path) 205 try: 206 indexed = bool( 207 conn.execute( 208 "SELECT 1 FROM chunks WHERE path = ? LIMIT 1", 209 (rel_path,), 210 ).fetchone() 211 ) 212 chunk_count = conn.execute( 213 "SELECT count(*) FROM chunks WHERE path = ? OR path LIKE ?", 214 (rel_path, f"{rel_path}/%"), 215 ).fetchone()[0] 216 finally: 217 conn.close() 218 except sqlite3.Error: 219 return {"available": False, "indexed": False, "chunks": 0} 220 221 return {"available": True, "indexed": indexed, "chunks": int(chunk_count)} 222 223 224def _describe_prev(day: str, stream: str, marker: dict | None) -> str: 225 """Return formatted previous-chain description.""" 226 if not marker or not marker.get("prev_segment"): 227 return "(none)" 228 229 prev_day = marker.get("prev_day") or day 230 prev_segment = marker["prev_segment"] 231 prev_dir = _find_segment_dir_readonly(prev_day, prev_segment, stream) 232 prev_path = f"{prev_day}/{stream}/{prev_segment}" 233 if prev_dir is None: 234 return f"{prev_path} [MISSING]" 235 return prev_path 236 237 238def _describe_next(day: str, stream: str, segment: str) -> str: 239 """Return formatted forward-chain description.""" 240 next_day, next_segment = _find_next_segment(day, stream, segment) 241 if next_day and next_segment: 242 return f"{next_day}/{stream}/{next_segment}" 243 if _is_stream_tail(day, stream, segment): 244 return "(tail)" 245 return "(none)" 246 247 248def _check_directory(seg_dir: Path | None) -> tuple[bool, str]: 249 """Verify the segment directory exists.""" 250 if seg_dir is not None and seg_dir.is_dir(): 251 return True, "directory exists" 252 return False, "directory missing" 253 254 255def _check_stream_json(seg_dir: Path | None) -> tuple[bool, str]: 256 """Verify stream.json exists.""" 257 if seg_dir is None: 258 return False, "stream.json missing" 259 if (seg_dir / "stream.json").is_file(): 260 return True, "stream.json exists" 261 return False, "stream.json missing" 262 263 264def _check_stream_json_valid(seg_dir: Path | None) -> tuple[bool, str]: 265 """Verify stream.json is valid JSON with a stream field.""" 266 if seg_dir is None: 267 return False, "stream.json missing" 268 269 path = seg_dir / "stream.json" 270 if not path.exists(): 271 return False, "stream.json missing" 272 273 try: 274 with open(path, "r", encoding="utf-8") as handle: 275 payload = json.load(handle) 276 except (json.JSONDecodeError, OSError): 277 return False, "stream.json invalid JSON" 278 279 if payload.get("stream"): 280 return True, "stream.json valid" 281 return False, "stream.json missing stream field" 282 283 284def _check_content_files(seg_dir: Path | None) -> tuple[bool, str]: 285 """Verify transcript content files exist.""" 286 if seg_dir is None: 287 return False, "segment directory missing" 288 289 if (seg_dir / "audio.jsonl").exists() or (seg_dir / "screen.jsonl").exists(): 290 return True, "content files present" 291 return False, "no audio.jsonl or screen.jsonl" 292 293 294def _check_backward_chain( 295 day: str, stream: str, marker: dict | None 296) -> tuple[bool, str]: 297 """Verify backward chain integrity.""" 298 if not marker or not marker.get("prev_segment"): 299 return True, "no previous segment" 300 301 prev_day = marker.get("prev_day") 302 prev_segment = marker.get("prev_segment") 303 if not prev_day or not prev_segment: 304 return False, "prev_segment set without prev_day" 305 306 prev_dir = _find_segment_dir_readonly(prev_day, prev_segment, stream) 307 if prev_dir is not None: 308 return True, "previous segment found" 309 return False, f"missing previous segment {prev_day}/{stream}/{prev_segment}" 310 311 312def _check_forward_chain(day: str, stream: str, segment: str) -> tuple[bool, str]: 313 """Verify forward chain integrity.""" 314 next_day, next_segment = _find_next_segment(day, stream, segment) 315 if next_day and next_segment: 316 return True, f"next segment {next_day}/{stream}/{next_segment}" 317 if _is_stream_tail(day, stream, segment): 318 return True, "stream tail" 319 return False, "next segment not found, not stream tail" 320 321 322def _check_index_presence(day: str, stream: str, segment: str) -> tuple[bool, str]: 323 """Verify the segment has an index entry when a DB is available.""" 324 info = _segment_index_info(day, stream, segment) 325 if not info["available"]: 326 return True, "journal index not available" 327 if info["indexed"]: 328 return True, "segment indexed" 329 return False, "segment not indexed" 330 331 332def _run_checks(day: str, stream: str, segment: str) -> list[dict[str, object]]: 333 """Run all segment verification checks.""" 334 seg_dir = _find_segment_dir_readonly(day, segment, stream) 335 marker = read_segment_stream(seg_dir) if seg_dir is not None else None 336 337 checks = [ 338 ("directory exists", _check_directory(seg_dir)), 339 ("stream.json exists", _check_stream_json(seg_dir)), 340 ("stream.json valid", _check_stream_json_valid(seg_dir)), 341 ("content files present", _check_content_files(seg_dir)), 342 ("backward chain", _check_backward_chain(day, stream, marker)), 343 ("forward chain", _check_forward_chain(day, stream, segment)), 344 ("index presence", _check_index_presence(day, stream, segment)), 345 ] 346 347 return [ 348 {"check": name, "passed": passed, "detail": detail} 349 for name, (passed, detail) in checks 350 ] 351 352 353def _rewrite_events_jsonl(seg_dir: Path, new_day: str, new_segment: str) -> int: 354 """Rewrite events.jsonl to update day and segment fields. 355 356 Returns number of lines rewritten. 357 """ 358 events_path = seg_dir / "events.jsonl" 359 if not events_path.exists(): 360 return 0 361 362 lines = events_path.read_text(encoding="utf-8").splitlines() 363 rewritten = [] 364 count = 0 365 for line in lines: 366 stripped = line.strip() 367 if not stripped: 368 rewritten.append(line) 369 continue 370 try: 371 obj = json.loads(stripped) 372 obj["day"] = new_day 373 obj["segment"] = new_segment 374 rewritten.append(json.dumps(obj, ensure_ascii=False)) 375 count += 1 376 except (json.JSONDecodeError, TypeError): 377 rewritten.append(line) 378 379 tmp = events_path.with_suffix(".tmp") 380 tmp.write_text("\n".join(rewritten) + "\n" if rewritten else "", encoding="utf-8") 381 os.rename(str(tmp), str(events_path)) 382 return count 383 384 385def _touch_health_marker(day: str) -> None: 386 """Touch health/stream.updated for a day.""" 387 health_dir = day_path(day) / "health" 388 health_dir.mkdir(parents=True, exist_ok=True) 389 (health_dir / "stream.updated").touch() 390 391 392def _delete_index_rows(journal: str, rel_path: str) -> dict[str, int]: 393 """Delete all index rows referencing a segment path. 394 395 Returns counts of deleted rows per table. 396 """ 397 db_path = Path(journal) / "indexer" / "journal.sqlite" 398 if not db_path.exists(): 399 return {"chunks": 0, "files": 0, "entities": 0, "entity_signals": 0} 400 401 try: 402 conn = sqlite3.connect(db_path) 403 try: 404 cur = conn.execute( 405 "DELETE FROM chunks WHERE path = ? OR path LIKE ?", 406 (rel_path, f"{rel_path}/%"), 407 ) 408 chunks_deleted = cur.rowcount 409 410 cur = conn.execute( 411 "DELETE FROM files WHERE path LIKE ?", 412 (f"{rel_path}/%",), 413 ) 414 files_deleted = cur.rowcount 415 416 cur = conn.execute( 417 "DELETE FROM entities WHERE path LIKE ?", 418 (f"{rel_path}/%",), 419 ) 420 entities_deleted = cur.rowcount 421 422 cur = conn.execute( 423 "DELETE FROM entity_signals WHERE path LIKE ?", 424 (f"{rel_path}/%",), 425 ) 426 signals_deleted = cur.rowcount 427 428 conn.commit() 429 finally: 430 conn.close() 431 except sqlite3.Error: 432 return {"chunks": 0, "files": 0, "entities": 0, "entity_signals": 0} 433 434 return { 435 "chunks": chunks_deleted, 436 "files": files_deleted, 437 "entities": entities_deleted, 438 "entity_signals": signals_deleted, 439 } 440 441 442def _reindex_segment(journal: str, seg_dir: Path) -> int: 443 """Re-index all formattable files in a segment directory. 444 445 Returns the number of files indexed. 446 """ 447 from think.indexer.journal import index_file 448 449 count = 0 450 for path in sorted(seg_dir.rglob("*")): 451 if not path.is_file(): 452 continue 453 try: 454 if index_file(journal, str(path)): 455 count += 1 456 except (ValueError, FileNotFoundError): 457 continue 458 return count 459 460 461def cmd_move(args: argparse.Namespace) -> None: 462 """Move a segment to a different day/time.""" 463 src_day, stream, src_segment = _split_segment_path(args.path) 464 465 src_dir = _find_segment_dir_readonly(src_day, src_segment, stream) 466 if src_dir is None: 467 print(f"Segment not found: {args.path}", file=sys.stderr) 468 raise SystemExit(1) 469 470 to_day = args.to_day 471 if not re.fullmatch(r"\d{8}", to_day): 472 print(f"Invalid --to-day format: {to_day} (expected YYYYMMDD)", file=sys.stderr) 473 raise SystemExit(1) 474 475 if args.to_time: 476 if not re.fullmatch(r"\d{6}", args.to_time): 477 print( 478 f"Invalid --to-time format: {args.to_time} (expected HHMMSS)", 479 file=sys.stderr, 480 ) 481 raise SystemExit(1) 482 duration = src_segment.split("_", 1)[1] 483 new_segment = f"{args.to_time}_{duration}" 484 else: 485 new_segment = src_segment 486 487 if to_day == src_day and new_segment == src_segment: 488 print("Source and destination are the same", file=sys.stderr) 489 raise SystemExit(1) 490 491 dst_parent = day_path(to_day, create=False) / stream 492 dst_dir = dst_parent / new_segment 493 if dst_dir.exists(): 494 if args.to_time: 495 from observe.utils import find_available_segment 496 497 avail = find_available_segment(dst_parent, new_segment) 498 if avail is None: 499 print( 500 f"No available segment slot near {new_segment} on {to_day}", 501 file=sys.stderr, 502 ) 503 raise SystemExit(1) 504 new_segment = avail 505 dst_dir = dst_parent / new_segment 506 else: 507 print( 508 f"Segment {new_segment} already exists on {to_day}/{stream}. " 509 f"Use --to-time to specify an alternate time.", 510 file=sys.stderr, 511 ) 512 raise SystemExit(1) 513 514 marker = read_segment_stream(src_dir) 515 if not marker: 516 print("No stream.json in source segment", file=sys.stderr) 517 raise SystemExit(1) 518 if marker.get("stream") != stream: 519 print( 520 f"Stream mismatch: path says '{stream}' but stream.json says '{marker.get('stream')}'", 521 file=sys.stderr, 522 ) 523 raise SystemExit(1) 524 525 succ_day, succ_seg, succ_path = _find_successor_segment( 526 src_day, stream, src_segment 527 ) 528 529 events_path = src_dir / "events.jsonl" 530 events_count = 0 531 if events_path.exists(): 532 with open(events_path, encoding="utf-8") as handle: 533 events_count = sum(1 for line in handle if line.strip()) 534 535 journal = get_journal() 536 old_rel = f"{src_day}/{stream}/{src_segment}" 537 index_info = _segment_index_info(src_day, stream, src_segment) 538 539 print(f"Move: {src_day}/{stream}/{src_segment} -> {to_day}/{stream}/{new_segment}") 540 print(f" events.jsonl lines: {events_count}") 541 if succ_day: 542 print(f" successor to patch: {succ_day}/{stream}/{succ_seg}") 543 else: 544 print(" successor to patch: (none - stream tail)") 545 if index_info["available"]: 546 print(f" index chunks: {index_info['chunks']}") 547 print(f" health markers: {src_day}, {to_day}") 548 549 if args.dry_run: 550 print("\n[dry run] No changes made") 551 return 552 553 verbose = getattr(args, "verbose", False) 554 555 print("\nExecuting move...") 556 dst_parent.mkdir(parents=True, exist_ok=True) 557 if verbose: 558 print(f" created directory: {dst_parent}") 559 shutil.move(str(src_dir), str(dst_dir)) 560 print(f" moved directory: {src_dir.name} -> {dst_dir}") 561 562 rewritten = _rewrite_events_jsonl(dst_dir, to_day, new_segment) 563 if rewritten: 564 print( 565 f" rewrote {rewritten} events.jsonl lines (day: {src_day}->{to_day}, segment: {src_segment}->{new_segment})" 566 ) 567 elif verbose: 568 print(" no events.jsonl to rewrite") 569 570 if succ_path: 571 succ_marker = read_segment_stream(succ_path) 572 if succ_marker: 573 write_segment_stream( 574 succ_path, 575 succ_marker["stream"], 576 to_day, 577 new_segment, 578 succ_marker["seq"], 579 ) 580 print(f" patched successor {succ_day}/{stream}/{succ_seg}") 581 if verbose: 582 print(f" prev_day: {succ_marker.get('prev_day')} -> {to_day}") 583 print( 584 f" prev_segment: {succ_marker.get('prev_segment')} -> {new_segment}" 585 ) 586 elif verbose: 587 print(" no successor to patch (stream tail)") 588 589 summary = rebuild_stream_state(stream) 590 print(f" rebuilt stream state: {stream}") 591 if verbose: 592 print( 593 f" scanned {summary['segments_scanned']} segments, rebuilt {len(summary['rebuilt'])} stream(s)" 594 ) 595 596 if index_info["available"]: 597 deleted = _delete_index_rows(journal, old_rel) 598 if any(deleted.values()) or verbose: 599 print( 600 f" deleted index rows: chunks={deleted['chunks']}, files={deleted['files']}, entities={deleted['entities']}, signals={deleted['entity_signals']}" 601 ) 602 new_rel = f"{to_day}/{stream}/{new_segment}" 603 indexed = _reindex_segment(journal, dst_dir) 604 print(f" re-indexed: {indexed} files at {new_rel}") 605 elif verbose: 606 print(" index not available, skipping reindex") 607 608 _touch_health_marker(src_day) 609 _touch_health_marker(to_day) 610 print(f" touched health markers: {src_day}, {to_day}") 611 if verbose: 612 print(" think will re-run daily talents on both days") 613 614 # Post-move verify is informational — the move already completed. 615 print() 616 results = _run_checks(to_day, stream, new_segment) 617 _print_check_results(results) 618 passed = sum(1 for result in results if result["passed"]) 619 print(f"\n{passed}/{len(results)} checks passed") 620 621 622def cmd_list(args: argparse.Namespace) -> None: 623 """List segments for a day.""" 624 segments = iter_segments(args.day) 625 if args.stream: 626 segments = [entry for entry in segments if entry[0] == args.stream] 627 628 if not segments: 629 print(f"No segments found for {args.day}") 630 return 631 632 rows = [] 633 for stream_name, seg_key, seg_path in segments: 634 start, end = _segment_time_strings(seg_path) 635 stats = _segment_stats(seg_path) 636 rows.append( 637 { 638 "stream": stream_name, 639 "segment": seg_key, 640 "start": start, 641 "end": end, 642 "duration": _segment_duration(seg_key), 643 "files": stats["files"], 644 "talents": stats["talents"], 645 "size": stats["size"], 646 } 647 ) 648 649 if args.json_output: 650 print(json.dumps(rows, indent=2)) 651 return 652 653 print( 654 f"{'STREAM':<20} {'SEGMENT':<14} {'TIME':<15} " 655 f"{'DUR':>5} {'FILES':>5} {'TALENTS':>7} {'SIZE':>8}" 656 ) 657 print("-" * 78) 658 for row in rows: 659 time_str = ( 660 f"{row['start']}-{row['end']}" 661 if row["start"] is not None and row["end"] is not None 662 else "?" 663 ) 664 dur_str = f"{row['duration']}s" 665 print( 666 f"{row['stream']:<20} {row['segment']:<14} {time_str:<15} " 667 f"{dur_str:>5} {row['files']:>5} {row['talents']:>7} " 668 f"{_format_size(int(row['size'])):>8}" 669 ) 670 671 672def cmd_inspect(args: argparse.Namespace) -> None: 673 """Inspect one segment.""" 674 day, stream, segment = _split_segment_path(args.path) 675 seg_dir = _find_segment_dir_readonly(day, segment, stream) 676 if seg_dir is None: 677 print(f"Segment not found: {args.path}", file=sys.stderr) 678 raise SystemExit(1) 679 680 marker = read_segment_stream(seg_dir) or {} 681 stream_name = marker.get("stream") or stream 682 start, end = _segment_time_strings(seg_dir) 683 duration = _segment_duration(segment) 684 prev_desc = _describe_prev(day, stream_name, marker) 685 next_desc = _describe_next(day, stream_name, segment) 686 files = _segment_files(seg_dir) 687 talents = _agent_files(seg_dir) 688 stats = _segment_stats(seg_dir) 689 events = _events_summary(seg_dir) 690 index_info = _segment_index_info(day, stream_name, segment) 691 692 payload = { 693 "path": f"{day}/{stream}/{segment}", 694 "stream": stream_name, 695 "segment": segment, 696 "seq": marker.get("seq"), 697 "prev_day": marker.get("prev_day"), 698 "prev_segment": marker.get("prev_segment"), 699 "start": start, 700 "end": end, 701 "duration": duration, 702 "chain": {"prev": prev_desc, "next": next_desc}, 703 "files": files, 704 "talents": talents, 705 "stats": stats, 706 "events": events, 707 "index": index_info, 708 } 709 710 if args.json_output: 711 print(json.dumps(payload, indent=2)) 712 return 713 714 time_range = "?" 715 if start is not None and end is not None: 716 time_range = f"{start} - {end}" 717 718 print(f"Segment: {day}/{stream}/{segment}") 719 if marker.get("seq") is not None: 720 print(f"Stream: {stream_name} (seq {marker['seq']})") 721 else: 722 print(f"Stream: {stream_name}") 723 print(f"Time: {time_range} ({duration}s)") 724 print() 725 print("Chain:") 726 print(f" prev: {prev_desc}") 727 print(f" next: {next_desc}") 728 print() 729 print(f"Files ({len(files)}):") 730 if files: 731 print(f" {', '.join(files)}") 732 print() 733 print(f"Talents ({len(talents)}):") 734 if talents: 735 print(f" {', '.join(talents)}") 736 print() 737 print(f"Size: {_format_size(stats['size'])}") 738 if index_info["available"]: 739 if index_info["indexed"]: 740 print(f"Index: indexed ({index_info['chunks']} chunks)") 741 else: 742 print("Index: not-indexed") 743 else: 744 print("Index: unavailable") 745 tracts = ", ".join(events["tracts"]) 746 if tracts: 747 print(f"Events: {events['entries']} entries ({tracts})") 748 else: 749 print(f"Events: {events['entries']} entries") 750 751 752def _print_check_results(results: list[dict[str, object]]) -> None: 753 """Print PASS/FAIL lines for verify output.""" 754 for result in results: 755 status = "PASS" if result["passed"] else "FAIL" 756 detail = str(result["detail"]) 757 if result["passed"]: 758 print(f"{status:<5} {result['check']}") 759 else: 760 print(f"{status:<5} {result['check']}: {detail}") 761 762 763def cmd_verify(args: argparse.Namespace) -> None: 764 """Verify one segment or all segments for a day.""" 765 if args.path: 766 day, stream, segment = _split_segment_path(args.path) 767 results = _run_checks(day, stream, segment) 768 if args.json_output: 769 print(json.dumps(results, indent=2)) 770 else: 771 _print_check_results(results) 772 passed = sum(1 for result in results if result["passed"]) 773 print() 774 print(f"{passed}/{len(results)} checks passed") 775 raise SystemExit(0 if all(result["passed"] for result in results) else 1) 776 777 if not args.day: 778 print("verify requires a segment path or --day", file=sys.stderr) 779 raise SystemExit(1) 780 781 segments = iter_segments(args.day) 782 if not segments: 783 print(f"No segments found for {args.day}", file=sys.stderr) 784 raise SystemExit(1) 785 786 all_results: dict[str, list[dict[str, object]]] = {} 787 total_passed = 0 788 total_failed = 0 789 790 for stream_name, seg_key, _seg_path in segments: 791 seg_id = f"{args.day}/{stream_name}/{seg_key}" 792 results = _run_checks(args.day, stream_name, seg_key) 793 all_results[seg_id] = results 794 total_passed += sum(1 for result in results if result["passed"]) 795 total_failed += sum(1 for result in results if not result["passed"]) 796 797 if args.json_output: 798 print( 799 json.dumps( 800 { 801 "segments": all_results, 802 "summary": {"passed": total_passed, "failed": total_failed}, 803 }, 804 indent=2, 805 ) 806 ) 807 else: 808 for seg_id, results in all_results.items(): 809 print(f"--- {seg_id} ---") 810 _print_check_results(results) 811 print() 812 print(f"Summary: {total_passed}/{total_passed + total_failed} checks passed") 813 814 raise SystemExit(0 if total_failed == 0 else 1) 815 816 817def main() -> None: 818 """CLI entry point for sol segment.""" 819 parser = argparse.ArgumentParser( 820 description="Inspect and manage journal segments", 821 usage="sol segment <command> [options]", 822 ) 823 sub = parser.add_subparsers(dest="subcommand") 824 825 p_list = sub.add_parser("list", help="List segments for a day") 826 p_list.add_argument("day", help="Day in YYYYMMDD format") 827 p_list.add_argument("--stream", help="Filter to a specific stream") 828 p_list.add_argument( 829 "--json", dest="json_output", action="store_true", help="Output as JSON" 830 ) 831 832 p_inspect = sub.add_parser("inspect", help="Show segment metadata") 833 p_inspect.add_argument( 834 "path", 835 help="Segment path: day/stream/segment (e.g. 20260304/default/090000_300)", 836 ) 837 p_inspect.add_argument( 838 "--json", dest="json_output", action="store_true", help="Output as JSON" 839 ) 840 841 p_verify = sub.add_parser("verify", help="Verify segment integrity") 842 p_verify.add_argument("path", nargs="?", help="Segment path: day/stream/segment") 843 p_verify.add_argument("--day", help="Verify all segments for a day") 844 p_verify.add_argument( 845 "--json", dest="json_output", action="store_true", help="Output as JSON" 846 ) 847 848 p_move = sub.add_parser("move", help="Move segment to a different day/time") 849 p_move.add_argument( 850 "path", 851 help="Segment path: day/stream/segment (e.g. 20260304/default/090000_300)", 852 ) 853 p_move.add_argument("--to-day", required=True, help="Destination day (YYYYMMDD)") 854 p_move.add_argument( 855 "--to-time", help="New time (HHMMSS), preserving original duration" 856 ) 857 p_move.add_argument( 858 "--dry-run", action="store_true", help="Show plan without making changes" 859 ) 860 p_move.add_argument( 861 "-v", "--verbose", action="store_true", help="Show detailed progress" 862 ) 863 864 args = setup_cli(parser) 865 require_solstone() 866 867 if args.subcommand is None: 868 parser.print_help() 869 raise SystemExit(1) 870 871 if args.subcommand == "list": 872 cmd_list(args) 873 elif args.subcommand == "inspect": 874 cmd_inspect(args) 875 elif args.subcommand == "verify": 876 cmd_verify(args) 877 elif args.subcommand == "move": 878 cmd_move(args)