Real-time index of opencode sessions
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

doc: expand Option C two-tier design with event-driven hot index

Replace directory-listing approach with targeted updates from watchman
subscription metadata. Hot tier fed directly from watchman mtime/size
fields or fallback stat calls. Cold tier uses mtime comparison for
staleness. Includes watcher integration sketch.

rektide ea8229f2 fab31de2

+228 -17
+228 -17
doc/discovery/watchman.md
··· 586 586 587 587 ### C. Two-Tier Index: Hot Metadata + Cold Content 588 588 589 - Split the index into two layers. The **hot tier** holds lightweight metadata (ids, timestamps, counts) and is always eagerly rebuilt on change. The **cold tier** holds deserialized content (`SessionInfo`, `Message`, `Part`) and is lazily loaded. 589 + Split the index into two layers. The **hot tier** holds lightweight metadata (ids, timestamps, counts) and is always kept current. The **cold tier** holds deserialized content (`SessionInfo`, `Message`, `Part`) and is lazily loaded on access. 590 + 591 + The key insight: watchman already tells us *exactly which files* changed, along with their metadata (`mtime`, `size`, `exists`). We don't need directory listings at all — we apply targeted updates to the hot index directly from watchman's subscription events. 592 + 593 + #### Hot tier: event-driven stat updates (no directory listing) 594 + 595 + Watchman's `SessionFile` subscription already provides `MTimeField` and `SizeField`. For new or modified files, we upsert a lightweight metadata entry. For deletions (`exists == false`), we remove the entry. No `readdir`, no `stat` — watchman has already done both. 590 596 591 597 ```rust 592 - /// Always current — rebuilt eagerly from directory listings 598 + /// Lightweight metadata extracted from watchman events or stat calls. 599 + /// No file content is read — just identity + filesystem metadata. 600 + pub struct HotSessionMeta { 601 + pub id: SessionId, 602 + pub project_id: String, 603 + pub mtime: i64, 604 + pub size: u64, 605 + } 606 + 607 + pub struct HotMessageMeta { 608 + pub id: MessageId, 609 + pub session_id: SessionId, 610 + pub mtime: i64, 611 + pub size: u64, 612 + } 613 + 614 + pub struct HotPartMeta { 615 + pub id: PartId, 616 + pub message_id: MessageId, 617 + pub path: PathBuf, 618 + pub mtime: i64, 619 + pub size: u64, 620 + } 621 + 593 622 pub struct HotIndex { 594 - sessions: HashMap<SessionId, HotSessionMeta>, // id, project, mtime only 595 - messages: HashMap<MessageId, HotMessageMeta>, // id, session_id, role only 596 - parts: HashMap<PartId, HotPartMeta>, // id, message_id, path only 623 + sessions: HashMap<SessionId, HotSessionMeta>, 624 + messages: HashMap<MessageId, HotMessageMeta>, 625 + parts: HashMap<PartId, HotPartMeta>, 626 + /// Reverse indexes for structural queries 627 + sessions_by_project: HashMap<String, Vec<SessionId>>, 628 + messages_by_session: HashMap<SessionId, Vec<MessageId>>, 629 + parts_by_message: HashMap<MessageId, Vec<PartId>>, 630 + } 631 + 632 + impl HotIndex { 633 + /// Apply a single watchman file event. No I/O — metadata comes from 634 + /// the subscription fields or a prior stat call. 635 + fn apply_event(&mut self, name: &Path, exists: bool, mtime: i64, size: u64) { 636 + let components: Vec<_> = name.iter().collect(); 637 + 638 + // session/<projectID>/<sessionID>.json 639 + if components.len() >= 3 && components[0] == "session" { 640 + let project_id = components[1].to_string_lossy().into_owned(); 641 + if let Some(session_id) = SessionId::from_filename( 642 + &components[2].to_string_lossy() 643 + ) { 644 + if exists { 645 + self.upsert_session(session_id, project_id, mtime, size); 646 + } else { 647 + self.remove_session(&session_id); 648 + } 649 + } 650 + } 651 + 652 + // message/<sessionID>/<messageID>.json 653 + if components.len() >= 3 && components[0] == "message" { 654 + // ... same pattern: parse IDs from path, upsert or remove 655 + } 656 + 657 + // part/<messageID>/<partID>.json 658 + if components.len() >= 3 && components[0] == "part" { 659 + // ... 660 + } 661 + } 662 + 663 + fn upsert_session(&mut self, id: SessionId, project_id: String, mtime: i64, size: u64) { 664 + self.sessions_by_project 665 + .entry(project_id.clone()) 666 + .or_default() 667 + .retain(|s| s != &id); // deduplicate 668 + self.sessions_by_project 669 + .entry(project_id.clone()) 670 + .or_default() 671 + .push(id.clone()); 672 + 673 + self.sessions.insert(id, HotSessionMeta { 674 + id: id.clone(), 675 + project_id, 676 + mtime, 677 + size, 678 + }); 679 + } 680 + 681 + fn remove_session(&mut self, id: &SessionId) { 682 + if let Some(meta) = self.sessions.remove(id) { 683 + if let Some(list) = self.sessions_by_project.get_mut(&meta.project_id) { 684 + list.retain(|s| s != id); 685 + } 686 + } 687 + } 688 + 689 + pub fn session_count(&self) -> usize { self.sessions.len() } 690 + pub fn sessions_for_project(&self, project_id: &str) -> &[SessionId] { 691 + self.sessions_by_project.get(project_id).map(|v| v.as_slice()).unwrap_or_default() 692 + } 693 + } 694 + ``` 695 + 696 + #### Where the metadata comes from 697 + 698 + Two sources, both avoiding directory listings: 699 + 700 + 1. **Watchman subscription fields** — the `SessionFile` type already requests `MTimeField` and `SizeField`. These arrive with every change event, zero additional I/O. 701 + 702 + 2. **Targeted stat calls** — if we ever need to verify or refresh a single entry outside of a watchman event (e.g., on `is_fresh_instance` recovery), we stat the specific file rather than listing the directory: 703 + 704 + ```rust 705 + impl HotIndex { 706 + /// Refresh a single entry via stat. Used when watchman metadata 707 + /// isn't available (e.g., initial load without watchman, or manual refresh). 708 + fn stat_refresh(&mut self, path: &Path) -> std::io::Result<()> { 709 + let metadata = std::fs::metadata(path)?; 710 + let mtime = metadata.modified()? 711 + .duration_since(std::time::UNIX_EPOCH) 712 + .unwrap_or_default() 713 + .as_secs() as i64; 714 + let size = metadata.len(); 715 + self.apply_event(path, true, mtime, size); 716 + Ok(()) 717 + } 718 + 719 + /// Bulk stat for initial load or fresh-instance recovery. 720 + /// Walks the known directory structure but only stats files, 721 + /// never reads content. 722 + fn stat_all(&mut self, storage_root: &Path) -> std::io::Result<()> { 723 + for entry in walkdir::WalkDir::new(storage_root) 724 + .into_iter() 725 + .filter_map(|e| e.ok()) 726 + .filter(|e| e.path().extension().map(|x| x == "json").unwrap_or(false)) 727 + { 728 + let relative = entry.path().strip_prefix(storage_root).unwrap(); 729 + let meta = entry.metadata()?; 730 + let mtime = meta.modified()? 731 + .duration_since(std::time::UNIX_EPOCH) 732 + .unwrap_or_default() 733 + .as_secs() as i64; 734 + self.apply_event(relative, true, mtime, meta.len()); 735 + } 736 + Ok(()) 737 + } 738 + } 739 + ``` 740 + 741 + #### Cold tier: mtime-gated lazy content 742 + 743 + The cold cache stores deserialized content stamped with the `mtime` it was loaded at. On access, it compares against the hot index's `mtime` — if the hot tier has a newer mtime, the cold entry is stale and gets reloaded. 744 + 745 + ```rust 746 + pub struct ColdEntry<T> { 747 + data: T, 748 + loaded_mtime: i64, 597 749 } 598 750 599 - /// Loaded on demand — may be stale, checked via mtime 600 751 pub struct ColdCache { 601 - sessions: HashMap<SessionId, (i64, SessionInfo)>, // (mtime, data) 602 - messages: HashMap<MessageId, (i64, Message)>, 603 - parts: HashMap<PartId, (i64, Part)>, 752 + sessions: HashMap<SessionId, ColdEntry<SessionInfo>>, 753 + messages: HashMap<MessageId, ColdEntry<Message>>, 754 + parts: HashMap<PartId, ColdEntry<Part>>, 604 755 } 605 756 606 757 impl ColdCache { 607 - fn get_session(&self, id: &SessionId, hot: &HotIndex) -> Option<&SessionInfo> { 758 + fn get_session( 759 + &self, 760 + id: &SessionId, 761 + hot: &HotIndex, 762 + ) -> Option<&SessionInfo> { 608 763 let hot_meta = hot.sessions.get(id)?; 609 - let (cached_mtime, data) = self.sessions.get(id)?; 610 - if *cached_mtime >= hot_meta.mtime { 611 - Some(data) 764 + let entry = self.sessions.get(id)?; 765 + if entry.loaded_mtime >= hot_meta.mtime { 766 + Some(&entry.data) 612 767 } else { 613 - None // caller reloads 768 + None // stale — caller reloads and re-inserts 614 769 } 615 770 } 771 + 772 + fn put_session(&mut self, id: SessionId, data: SessionInfo, mtime: i64) { 773 + self.sessions.insert(id, ColdEntry { data, loaded_mtime: mtime }); 774 + } 616 775 } 617 776 ``` 618 777 619 - **Pros:** Structural queries ("how many sessions?", "list messages for session X") are always instant and never stale. Only content access triggers I/O. Natural separation of concerns. 778 + #### Watcher integration 779 + 780 + The watcher task feeds the hot index directly from watchman events — no intermediate dirty map needed: 781 + 782 + ```rust 783 + pub async fn watcher_task( 784 + storage_root: PathBuf, 785 + hot: Arc<RwLock<HotIndex>>, 786 + mut shutdown: tokio::sync::watch::Receiver<bool>, 787 + ) -> Result<(), Error> { 788 + let client = Connector::new().connect().await?; 789 + let resolved = client 790 + .resolve_root(CanonicalPath::canonicalize(&storage_root)?) 791 + .await?; 792 + 793 + let (mut sub, _) = client 794 + .subscribe::<SessionFile>(&resolved, SubscribeRequest { 795 + expression: Some(Expr::Suffix(vec!["json".into()])), 796 + ..Default::default() 797 + }) 798 + .await?; 799 + 800 + loop { 801 + tokio::select! { 802 + event = sub.next() => { 803 + match event? { 804 + SubscriptionData::FilesChanged(result) => { 805 + if result.is_fresh_instance { 806 + // Full recovery: stat every file watchman reports 807 + let mut idx = hot.write(); 808 + // Could clear and rebuild, or merge 809 + for f in result.files.unwrap_or_default() { 810 + idx.apply_event(&f.name, *f.exists, *f.mtime, *f.size); 811 + } 812 + } else if let Some(files) = result.files { 813 + let mut idx = hot.write(); 814 + for f in files { 815 + idx.apply_event(&f.name, *f.exists, *f.mtime, *f.size); 816 + } 817 + } 818 + } 819 + SubscriptionData::Canceled => break, 820 + _ => {} 821 + } 822 + } 823 + _ = shutdown.changed() => break, 824 + } 825 + } 826 + Ok(()) 827 + } 828 + ``` 620 829 621 - **Cons:** Hot tier rebuild still requires directory listing I/O on every change batch. Two data structures to maintain. Mtime comparison is racy — a file could be written twice within the same second. 830 + **Pros:** Structural queries ("how many sessions?", "list messages for session X") are always instant and never stale. Only content access triggers I/O. No directory listings — watchman provides file-level metadata, or we stat individual files as fallback. Natural separation of concerns. 831 + 832 + **Cons:** Two data structures to maintain. Mtime has 1-second granularity on some filesystems — two writes within the same second could be missed. The hot index write lock is held during event processing (though the work is trivial: HashMap inserts, no I/O). 622 833 623 834 --- 624 835 ··· 674 885 | **Generation (chosen)** | Medium | 1 atomic load | Hash insert + atomic tick | Until next access | 675 886 | **A. Eager channel** | Medium | None (always fresh) | Full re-parse per change | None | 676 887 | **B. Content hash** | Medium | Hash compare | Watchman hashes server-side | Until next access | 677 - | **C. Two-tier** | High | Hot=none, Cold=mtime check | Dir listing + hot rebuild | Hot=none, Cold=until access | 888 + | **C. Two-tier** | Medium-High | Hot=none, Cold=mtime check | HashMap upsert (no I/O) | Hot=none, Cold=until access | 678 889 | **D. Epoch** | Low | 1 atomic load | 1 atomic tick per dir | Until next access (over-invalidates) | 679 890 | **E. Arc eviction** | Low | Cache miss on evicted | Hash remove | Until next access |