Real-time index of opencode sessions
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

doc: rewrite watchman discovery with dirty-tracking & lazy-reload design

Replaces the previous connection/subscription-focused watchman.md with a
design centered on generation-based dirty tracking and lazy reload.

Key design elements:
- GenerationClock: monotonic atomic counter, avoids ABA flag problems
- DirtyTracker: maps paths to the generation they were dirtied at
- TrackedMappedFileCache: generation-aware cache that reloads on access
- Watcher task: background tokio task feeding DirtyTracker from watchman
- 5-phase integration plan from primitives through graceful degradation

rektide 279d48eb 37092096

+370 -520
+370 -520
doc/discovery/watchman.md
··· 1 - # Watchman Integration Design 1 + # Watchman Integration: Dirty-Tracking & Lazy Reload 2 2 3 - > Design for using Facebook's Watchman to watch opencode session directories and emit real-time change events 3 + > Design for using [`watchman_client`](https://github.com/facebook/watchman/tree/main/watchman/rust/watchman_client) to detect file changes, mark in-memory objects dirty, and lazily reload them on next access. 4 4 5 - **Repository:** facebook/watchman 6 - **Rust Crate:** `watchman_client` (in-tree at `watchman/rust/watchman_client/`) 5 + **Crate:** `watchman_client` `0.1` (feature-gated behind `watch`) 6 + **Docs:** [Watchman Documentation](https://facebook.github.io/watchman/), [Subscribe API](https://facebook.github.io/watchman/docs/cmd/subscribe), [Clockspec](https://facebook.github.io/watchman/docs/clockspec) 7 7 8 8 --- 9 9 10 - ## Overview 10 + ## Problem 11 11 12 - Watchman is a file watching service by Facebook that provides efficient, cross-platform file system monitoring. For `opencode-session-rs`, watchman will: 12 + Today [`SessionMaterializer`](/src/materializer.rs) builds its entire [`SessionIndex`](/src/index.rs) eagerly at construction and the [`MappedFileCache`](/src/storage/mmap.rs) caches mmaps forever. When opencode writes a new part or updates a session file, the materializer serves stale data until it's reconstructed from scratch. 13 13 14 - 1. **Watch the opencode storage directory** for changes 15 - 2. **Emit an event stream** of file additions, modifications, and deletions 16 - 3. **Trigger updates** to the in-memory materialized session view 14 + We need: 15 + 16 + 1. **Change detection** — know *which* files changed without polling. 17 + 2. **Dirty marking** — flag the affected in-memory objects (session meta, message meta, part ref, cached mmap) as stale. 18 + 3. **Lazy reload** — on next access, reload only the dirty objects; clean objects are served immediately. 19 + 20 + --- 21 + 22 + ## Architecture 17 23 18 24 ``` 19 - ┌─────────────────────────────────────────────────────────────────────┐ 20 - │ Session Materializer │ 21 - ├─────────────────────────────────────────────────────────────────────┤ 22 - │ │ 23 - │ ┌──────────────────┐ Events ┌────────────────────┐ │ 24 - │ │ Watchman │ ───────────────►│ Session Index │ │ 25 - │ │ (daemon) │ │ (in-memory) │ │ 26 - │ └──────────────────┘ └────────────────────┘ │ 27 - │ │ │ │ 28 - │ │ watches │ references │ 29 - │ ▼ ▼ │ 30 - │ ┌──────────────────────────────────────────────────────────┐ │ 31 - │ │ ~/.local/share/opencode/storage/ │ │ 32 - │ │ ├── session/ ├── message/ ├── part/ ... │ │ 33 - │ │ └───────────────────────────────────────────────────────┘ │ 34 - │ Memory-Mapped Files │ 35 - └─────────────────────────────────────────────────────────────────────┘ 25 + ┌──────────────────────────────────────────────────────┐ 26 + │ SessionMaterializer │ 27 + │ │ 28 + watchman daemon │ ┌──────────┐ dirty set ┌──────────────────┐ │ 29 + ───subscription────►│ │ Watcher │ ──────────────► │ DirtyTracker │ │ 30 + (inotify/fsevents) │ │ Task │ │ {path→gen} │ │ 31 + │ └──────────┘ └────────┬─────────┘ │ 32 + │ │ │ 33 + │ ┌──────────────────────────────┘ │ 34 + │ ▼ │ 35 + │ ┌───────────────┐ ┌──────────────────┐ │ 36 + │ │ SessionIndex │ │ MappedFileCache │ │ 37 + │ │ {id→meta} │ │ {path→(gen,mmap)}│ │ 38 + │ │ each meta has │ │ │ │ 39 + │ │ a generation │ │ returns cached │ │ 40 + │ │ number │ │ if gen matches │ │ 41 + │ └───────────────┘ └──────────────────┘ │ 42 + └──────────────────────────────────────────────────────┘ 36 43 ``` 37 44 38 45 --- 39 46 40 - ## Tech Stack 47 + ## Core Concept: Generation Numbers 41 48 42 - | Component | Purpose | 43 - |-----------|---------| 44 - | **watchman** | Daemon that monitors filesystem (via inotify/FSEvents) | 45 - | **watchman_client** | Rust crate for communicating with watchman | 46 - | **serde_bser** | Binary serialization used by watchman protocol | 47 - | **tokio** | Async runtime required by watchman_client | 49 + Rather than boolean dirty flags (which require careful clearing), each object carries a **generation number**. A global `AtomicU64` ticks upward whenever a batch of changes arrives. Dirty paths are stamped with the generation they were dirtied at. Cached objects also carry the generation they were loaded at. On access: 48 50 49 - ### Key watchman_client Types 51 + - `cached.gen >= dirty.gen` → clean, serve immediately 52 + - `cached.gen < dirty.gen` → stale, reload from disk, stamp with current gen 50 53 51 - | Type | Purpose | 52 - |------|---------| 53 - | `Connector` | Establishes connection to watchman daemon | 54 - | `Client` | Main interface for queries and subscriptions | 55 - | `ResolvedRoot` | Represents a watched directory | 56 - | `Subscription` | Stream of file change events | 57 - | `SubscriptionData` | Enum of event types (FilesChanged, Canceled, StateEnter/Leave) | 54 + This is lock-free on the read path (only an atomic load) and naturally handles rapid successive changes — a second write before the first reload just bumps the dirty gen again. 55 + 56 + ```rust 57 + use std::sync::atomic::{AtomicU64, Ordering}; 58 + 59 + /// Global generation counter, shared across the materializer 60 + pub struct GenerationClock { 61 + gen: AtomicU64, 62 + } 63 + 64 + impl GenerationClock { 65 + pub fn new() -> Self { 66 + Self { gen: AtomicU64::new(1) } 67 + } 68 + 69 + /// Tick and return the new generation 70 + pub fn tick(&self) -> u64 { 71 + self.gen.fetch_add(1, Ordering::AcqRel) + 1 72 + } 73 + 74 + /// Current generation (for stamping freshly loaded objects) 75 + pub fn current(&self) -> u64 { 76 + self.gen.load(Ordering::Acquire) 77 + } 78 + } 79 + ``` 58 80 59 81 --- 60 82 61 - ## API Surface 83 + ## Dirty Tracker 62 84 63 - ### Connection 85 + Maps file paths to the generation they were dirtied at. The watcher task writes; the read path checks. 64 86 65 87 ```rust 66 - use watchman_client::prelude::*; 88 + use parking_lot::RwLock; 89 + use std::collections::HashMap; 90 + use std::path::{Path, PathBuf}; 67 91 68 - // Connect to watchman daemon (auto-starts if not running) 69 - let client = Connector::new().connect().await?; 92 + pub struct DirtyTracker { 93 + clock: Arc<GenerationClock>, 94 + /// path → generation it was dirtied at 95 + dirty: RwLock<HashMap<PathBuf, u64>>, 96 + } 70 97 71 - // Resolve a path to watch 72 - let resolved = client 73 - .resolve_root(CanonicalPath::canonicalize("/home/user/.local/share/opencode/storage")?) 74 - .await?; 75 - ``` 98 + impl DirtyTracker { 99 + pub fn new(clock: Arc<GenerationClock>) -> Self { 100 + Self { 101 + clock, 102 + dirty: RwLock::new(HashMap::new()), 103 + } 104 + } 76 105 77 - ### Subscription Pattern 106 + /// Mark paths as dirty. Called by the watcher task. 107 + pub fn mark_dirty(&self, paths: impl IntoIterator<Item = PathBuf>) { 108 + let gen = self.clock.tick(); 109 + let mut dirty = self.dirty.write(); 110 + for path in paths { 111 + dirty.insert(path, gen); 112 + } 113 + } 78 114 79 - ```rust 80 - // Define fields to receive in change events 81 - query_result_type! { 82 - struct FileChange { 83 - name: NameField, 84 - exists: ExistsField, 85 - file_type: FileTypeField, 86 - mtime: MTimeField, 115 + /// Check if a path is dirty since `loaded_gen`. 116 + /// Returns Some(dirty_gen) if stale, None if clean. 117 + pub fn check(&self, path: &Path, loaded_gen: u64) -> Option<u64> { 118 + let dirty = self.dirty.read(); 119 + dirty.get(path).copied().filter(|&g| g > loaded_gen) 87 120 } 88 - } 89 121 90 - // Create subscription 91 - let (mut subscription, initial) = client 92 - .subscribe::<FileChange>(&resolved, SubscribeRequest { 93 - expression: Some(Expr::All(vec![ 94 - Expr::Suffix(vec!["json".into()]), // Only .json files 95 - Expr::Exists, // That exist 96 - ])), 97 - ..Default::default() 98 - }) 99 - .await?; 100 - 101 - // Process events 102 - loop { 103 - match subscription.next().await? { 104 - SubscriptionData::FilesChanged(result) => { 105 - if result.is_fresh_instance { 106 - // Full rebuild needed - all files in result.files 107 - } else { 108 - // Incremental update - only changed files 122 + /// After reloading, the caller stamps their new gen. 123 + /// Optionally clear the dirty entry if gen matches (prevents unbounded growth). 124 + pub fn clear_if_current(&self, path: &Path, reloaded_gen: u64) { 125 + let mut dirty = self.dirty.write(); 126 + if let Some(&g) = dirty.get(path) { 127 + if g <= reloaded_gen { 128 + dirty.remove(path); 109 129 } 110 - 111 - if let Some(files) = result.files { 112 - for file in files { 113 - println!("Changed: {:?}", file.name); 114 - } 115 - } 116 - 117 - // Save clock for next query 118 - let clock = result.clock; 119 130 } 120 - SubscriptionData::Canceled => { 121 - // Watch was canceled (dir deleted, etc.) 122 - break; 123 - } 124 - SubscriptionData::StateEnter { state_name, .. } => { 125 - // Some state change (e.g., VCS operation) 126 - } 127 - SubscriptionData::StateLeave { state_name, .. } => { 128 - // State exited 129 - } 131 + } 132 + 133 + /// Full clear (used on FullRebuild events) 134 + pub fn clear_all(&self) { 135 + self.dirty.write().clear(); 130 136 } 131 137 } 132 138 ``` 133 139 134 - ### Query (Non-Streaming) 140 + --- 141 + 142 + ## Lazy-Reloading Cache 135 143 136 - For initial load or manual refresh: 144 + The existing `MappedFileCache` needs a generation stamp per entry. On `get()`, if the entry is dirty, it remaps the file. 137 145 138 146 ```rust 139 - let result = client 140 - .query::<FileChange>(&resolved, QueryRequestCommon { 141 - expression: Some(Expr::Suffix(vec!["json".into()])), 142 - ..Default::default() 143 - }) 144 - .await?; 147 + pub struct CacheEntry { 148 + mmap: Arc<MappedFile>, 149 + loaded_gen: u64, 150 + } 145 151 146 - // result.is_fresh_instance is always true for queries 147 - // result.files contains all matching files 148 - // result.clock can be saved for since-queries 149 - ``` 152 + pub struct TrackedMappedFileCache { 153 + tracker: Arc<DirtyTracker>, 154 + clock: Arc<GenerationClock>, 155 + files: RwLock<HashMap<PathBuf, CacheEntry>>, 156 + } 150 157 151 - ### Since Queries (Delta) 158 + impl TrackedMappedFileCache { 159 + pub fn get(&self, path: &Path) -> Result<Arc<MappedFile>> { 160 + // Fast path: read lock, check gen 161 + { 162 + let files = self.files.read(); 163 + if let Some(entry) = files.get(path) { 164 + if self.tracker.check(path, entry.loaded_gen).is_none() { 165 + return Ok(Arc::clone(&entry.mmap)); 166 + } 167 + } 168 + } 152 169 153 - Get only files changed since a clock: 170 + // Slow path: reload 171 + let mapped = Arc::new(MappedFile::open(path)?); 172 + let gen = self.clock.current(); 173 + { 174 + let mut files = self.files.write(); 175 + files.insert(path.to_path_buf(), CacheEntry { 176 + mmap: Arc::clone(&mapped), 177 + loaded_gen: gen, 178 + }); 179 + } 180 + self.tracker.clear_if_current(path, gen); 181 + Ok(mapped) 182 + } 154 183 155 - ```rust 156 - let changes = client 157 - .query::<FileChange>(&resolved, QueryRequestCommon { 158 - since: Some(Clock::Spec(saved_clock)), 159 - expression: Some(Expr::Suffix(vec!["json".into()])), 160 - ..Default::default() 161 - }) 162 - .await?; 184 + pub fn invalidate(&self, path: &Path) { 185 + self.files.write().remove(path); 186 + } 163 187 164 - if changes.is_fresh_instance { 165 - // State was lost, need full rebuild 166 - } else { 167 - // Only changed files in changes.files 188 + pub fn clear(&self) { 189 + self.files.write().clear(); 190 + } 168 191 } 169 192 ``` 170 193 171 194 --- 172 195 173 - ## Expression Language 196 + ## Lazy Index Entries 174 197 175 - Watchman supports a rich expression language for filtering: 198 + `SessionMeta`, `MessageMeta`, and `PartRef` each gain a `loaded_gen: u64`. The materializer checks before serving: 176 199 177 200 ```rust 178 - // All JSON files 179 - Expr::Suffix(vec!["json".into()]) 201 + impl SessionMaterializer { 202 + pub fn session_meta(&self, id: &SessionId) -> Option<&SessionMeta> { 203 + let meta = self.index.session(id)?; 204 + let path = self.reader.paths().session_file(&meta.project_id, id); 205 + if self.tracker.check(&path, meta.loaded_gen).is_some() { 206 + // Stale — caller should reload via load_session() 207 + // Option A: return None and let caller handle 208 + // Option B: interior-mutate (requires &mut self or Cell pattern) 209 + // Option C: return a wrapper that reloads on deref (see below) 210 + } 211 + Some(meta) 212 + } 213 + } 214 + ``` 180 215 181 - // Specific file patterns 182 - Expr::Match(MatchTerm { 183 - glob: "**/session/**/*.json".into(), 184 - wholename: true, 185 - ..Default::default() 186 - }) 216 + ### Option C: `Lazy<T>` Wrapper 217 + 218 + A zero-cost wrapper that transparently reloads on access: 219 + 220 + ```rust 221 + pub enum Freshness<T> { 222 + /// Object is current 223 + Fresh(T), 224 + /// Object was stale and has been reloaded 225 + Reloaded(T), 226 + /// Object was removed from disk 227 + Gone, 228 + } 187 229 188 - // Files matching multiple conditions 189 - Expr::All(vec![ 190 - Expr::Suffix(vec!["json".into()]), 191 - Expr::DirName(DirNameTerm { 192 - path: "session".into(), 193 - depth: Some(RelOp::GreaterOrEqual(0)), 194 - }), 195 - ]) 230 + impl<T> Freshness<T> { 231 + pub fn into_inner(self) -> Option<T> { 232 + match self { 233 + Self::Fresh(t) | Self::Reloaded(t) => Some(t), 234 + Self::Gone => None, 235 + } 236 + } 196 237 197 - // Exclude certain patterns 198 - Expr::Not(Box::new(Expr::Match(MatchTerm { 199 - glob: "**/node_modules/**".into(), 200 - wholename: true, 201 - ..Default::default() 202 - }))) 238 + pub fn is_stale(&self) -> bool { 239 + matches!(self, Self::Reloaded(_)) 240 + } 241 + } 203 242 ``` 204 243 205 244 --- 206 245 207 - ## Available Fields 246 + ## Watcher Task 208 247 209 - | Field | Type | Description | 210 - |-------|------|-------------| 211 - | `name` | PathBuf | File path relative to root | 212 - | `exists` | bool | Whether file currently exists | 213 - | `type` | FileType | File type (Regular, Directory, etc.) | 214 - | `size` | u64 | File size in bytes | 215 - | `mtime` | i64 | Modification time (seconds) | 216 - | `mtime_f` | f32 | Modification time (floating) | 217 - | `ctime` | i64 | Inode change time | 218 - | `cclock` | ClockSpec | When file was first observed | 219 - | `oclock` | ClockSpec | When file was last changed | 220 - | `content.sha1hex` | String | SHA1 hash of contents | 248 + A background tokio task that receives watchman subscription events and feeds the `DirtyTracker`. 221 249 222 - --- 250 + ```rust 251 + pub async fn watcher_task( 252 + storage_root: PathBuf, 253 + tracker: Arc<DirtyTracker>, 254 + mut shutdown: tokio::sync::watch::Receiver<bool>, 255 + ) -> Result<(), Error> { 256 + let client = Connector::new().connect().await?; 257 + let resolved = client 258 + .resolve_root(CanonicalPath::canonicalize(&storage_root)?) 259 + .await?; 223 260 224 - ## Design for opencode-session-rs 261 + let (mut sub, _initial) = client 262 + .subscribe::<SessionFile>(&resolved, SubscribeRequest { 263 + expression: Some(Expr::Suffix(vec!["json".into()])), 264 + ..Default::default() 265 + }) 266 + .await?; 267 + 268 + loop { 269 + tokio::select! { 270 + event = sub.next() => { 271 + match event? { 272 + SubscriptionData::FilesChanged(result) => { 273 + if result.is_fresh_instance { 274 + // Watchman state lost — everything is dirty 275 + tracker.mark_dirty( 276 + result.files.unwrap_or_default() 277 + .into_iter() 278 + .map(|f| storage_root.join(&*f.name)) 279 + ); 280 + } else if let Some(files) = result.files { 281 + tracker.mark_dirty( 282 + files.into_iter() 283 + .map(|f| storage_root.join(&*f.name)) 284 + ); 285 + } 286 + } 287 + SubscriptionData::Canceled => break, 288 + _ => {} 289 + } 290 + } 291 + _ = shutdown.changed() => break, 292 + } 293 + } 294 + Ok(()) 295 + } 296 + ``` 225 297 226 - ### Watch Configuration 298 + ### `SessionFile` field type 227 299 228 300 ```rust 229 - /// What to watch in opencode storage 230 - pub struct OpenCodeWatch { 231 - /// Path to storage root (XDG data dir) 232 - storage_path: PathBuf, 233 - /// Active watchman subscription 234 - subscription: Subscription<SessionFile>, 235 - /// Last observed clock (for reconnection) 236 - clock: Option<Clock>, 237 - } 301 + use watchman_client::prelude::*; 238 302 239 - /// Fields we care about for session files 240 303 query_result_type! { 241 304 struct SessionFile { 242 305 name: NameField, ··· 248 311 } 249 312 ``` 250 313 251 - ### Event Types 314 + --- 252 315 253 - ```rust 254 - /// Events emitted when session data changes 255 - pub enum SessionEvent { 256 - /// A session was created or modified 257 - SessionChanged { 258 - project_id: String, 259 - session_id: SessionId, 260 - path: PathBuf, 261 - }, 262 - /// A session was deleted 263 - SessionDeleted { 264 - project_id: String, 265 - session_id: SessionId, 266 - }, 267 - /// A message was created or modified 268 - MessageChanged { 269 - session_id: SessionId, 270 - message_id: MessageId, 271 - path: PathBuf, 272 - }, 273 - /// A message was deleted 274 - MessageDeleted { 275 - session_id: SessionId, 276 - message_id: MessageId, 277 - }, 278 - /// A part was created or modified 279 - PartChanged { 280 - message_id: MessageId, 281 - part_id: PartId, 282 - path: PathBuf, 283 - }, 284 - /// A part was deleted 285 - PartDeleted { 286 - message_id: MessageId, 287 - part_id: PartId, 288 - }, 289 - /// A diff file changed 290 - DiffChanged { 291 - session_id: SessionId, 292 - }, 293 - /// Full rebuild needed (watchman state lost) 294 - FullRebuild, 295 - } 296 - ``` 316 + ## Event Classification 297 317 298 - ### Path Parser 318 + Watchman gives us raw file paths. We classify them into domain events for index operations. Paths are relative to the watch root (`storage/`): 319 + 320 + | Path Pattern | Domain | Entity | 321 + |---|---|---| 322 + | `session/<projectID>/<sessionID>.json` | SessionChanged / SessionDeleted | `SessionMeta` | 323 + | `message/<sessionID>/<messageID>.json` | MessageChanged / MessageDeleted | `MessageMeta` | 324 + | `part/<messageID>/<partID>.json` | PartChanged / PartDeleted | `PartRef` | 325 + | `session_diff/<sessionID>.json` | DiffChanged | diff cache | 299 326 300 327 ```rust 301 - impl SessionEvent { 302 - /// Parse a watchman file change into a session event 303 - fn from_path(path: &Path, exists: bool) -> Option<Self> { 304 - let parts: Vec<_> = path.iter().collect(); 305 - 306 - // storage/session/<projectID>/<sessionID>.json 307 - if parts.len() == 4 && parts[1] == "session" { 308 - let project_id = parts[2].to_string_lossy().into_owned(); 309 - let session_id = SessionId::from_filename(parts[3])?; 310 - return Some(if exists { 311 - Self::SessionChanged { project_id, session_id, path: path.into() } 312 - } else { 313 - Self::SessionDeleted { project_id, session_id } 314 - }); 315 - } 316 - 317 - // storage/message/<sessionID>/<messageID>.json 318 - if parts.len() == 4 && parts[1] == "message" { 319 - let session_id = SessionId::from_filename(parts[2])?; 320 - let message_id = MessageId::from_filename(parts[3])?; 321 - return Some(if exists { 322 - Self::MessageChanged { session_id, message_id, path: path.into() } 323 - } else { 324 - Self::MessageDeleted { session_id, message_id } 325 - }); 326 - } 327 - 328 - // storage/part/<messageID>/<partID>.json 329 - if parts.len() == 4 && parts[1] == "part" { 330 - let message_id = MessageId::from_filename(parts[2])?; 331 - let part_id = PartId::from_filename(parts[3])?; 332 - return Some(if exists { 333 - Self::PartChanged { message_id, part_id, path: path.into() } 334 - } else { 335 - Self::PartDeleted { message_id, part_id } 336 - }); 337 - } 338 - 339 - // storage/session_diff/<sessionID>.json 340 - if parts.len() == 3 && parts[1] == "session_diff" { 341 - let session_id = SessionId::from_filename(parts[2])?; 342 - return Some(Self::DiffChanged { session_id }); 343 - } 344 - 345 - None 346 - } 328 + pub enum SessionEvent { 329 + SessionChanged { project_id: String, session_id: SessionId, path: PathBuf }, 330 + SessionDeleted { project_id: String, session_id: SessionId }, 331 + MessageChanged { session_id: SessionId, message_id: MessageId, path: PathBuf }, 332 + MessageDeleted { session_id: SessionId, message_id: MessageId }, 333 + PartChanged { message_id: MessageId, part_id: PartId, path: PathBuf }, 334 + PartDeleted { message_id: MessageId, part_id: PartId }, 335 + DiffChanged { session_id: SessionId }, 336 + FullRebuild, 347 337 } 348 338 ``` 349 339 350 - ### Watcher Implementation 340 + ### Deletion Handling 351 341 352 - ```rust 353 - pub struct SessionWatcher { 354 - client: Client, 355 - resolved: ResolvedRoot, 356 - subscription: Subscription<SessionFile>, 357 - clock: Clock, 358 - } 342 + When `exists == false`, the object was deleted. The index should: 343 + - Remove the entry from the index maps 344 + - Remove the mmap from the cache 345 + - Cascade: deleting a session should also remove its messages and parts from the index 359 346 360 - impl SessionWatcher { 361 - pub async fn new(storage_path: &Path) -> Result<Self, Error> { 362 - let client = Connector::new().connect().await?; 363 - let resolved = client 364 - .resolve_root(CanonicalPath::canonicalize(storage_path)?) 365 - .await?; 366 - 367 - let (subscription, initial) = client 368 - .subscribe::<SessionFile>(&resolved, SubscribeRequest { 369 - expression: Some(Expr::All(vec![ 370 - Expr::Suffix(vec!["json".into()]), 371 - Expr::Exists, 372 - ])), 373 - ..Default::default() 374 - }) 375 - .await?; 376 - 377 - let clock = initial.clock; 378 - 379 - Ok(Self { client, resolved, subscription, clock }) 380 - } 381 - 382 - /// Get the next batch of session events 383 - pub async fn next_events(&mut self) -> Result<Vec<SessionEvent>, Error> { 384 - match self.subscription.next().await? { 385 - SubscriptionData::FilesChanged(result) => { 386 - self.clock = result.clock.clone(); 387 - 388 - if result.is_fresh_instance { 389 - return Ok(vec![SessionEvent::FullRebuild]); 390 - } 391 - 392 - let events = result.files 393 - .unwrap_or_default() 394 - .into_iter() 395 - .filter_map(|f| SessionEvent::from_path(&f.name, *f.exists)) 396 - .collect(); 397 - 398 - Ok(events) 399 - } 400 - SubscriptionData::Canceled => { 401 - Err(Error::WatchCanceled) 402 - } 403 - _ => Ok(vec![]) 404 - } 405 - } 406 - 407 - /// Perform initial scan of all files 408 - pub async fn initial_scan(&self) -> Result<Vec<SessionEvent>, Error> { 409 - let result = self.client 410 - .query::<SessionFile>(&self.resolved, QueryRequestCommon { 411 - expression: Some(Expr::All(vec![ 412 - Expr::Suffix(vec!["json".into()]), 413 - Expr::Exists, 414 - ])), 415 - ..Default::default() 416 - }) 417 - .await?; 418 - 419 - Ok(result.files 420 - .unwrap_or_default() 421 - .into_iter() 422 - .filter_map(|f| SessionEvent::from_path(&f.name, *f.exists)) 423 - .collect()) 424 - } 425 - } 426 - ``` 347 + When `exists == true`, we only mark dirty — the actual reload happens lazily. 427 348 428 349 --- 429 350 430 - ## Integration with Session Materializer 351 + ## Integration Plan 431 352 432 - ```rust 433 - pub struct SessionMaterializer { 434 - /// Path to storage 435 - storage_path: PathBuf, 436 - /// In-memory index (always resident) 437 - index: SessionIndex, 438 - /// Memory-mapped files (pageable) 439 - files: HashMap<PathBuf, Arc<Mmap>>, 440 - /// File watcher 441 - watcher: SessionWatcher, 442 - } 353 + ### Phase 1: Generation Infrastructure 354 + 355 + Add `GenerationClock` and `DirtyTracker` to `storage/` module. Add `loaded_gen` field to `MappedFileCache` entries. No watcher yet — just the tracking primitives. 443 356 444 - impl SessionMaterializer { 445 - pub async fn run(&mut self) -> Result<(), Error> { 446 - // Initial load 447 - for event in self.watcher.initial_scan().await? { 448 - self.handle_event(event).await?; 449 - } 450 - 451 - // Watch loop 452 - loop { 453 - let events = self.watcher.next_events().await?; 454 - for event in events { 455 - self.handle_event(event).await?; 456 - } 457 - } 458 - } 459 - 460 - async fn handle_event(&mut self, event: SessionEvent) -> Result<(), Error> { 461 - match event { 462 - SessionEvent::FullRebuild => { 463 - self.index.clear(); 464 - self.files.clear(); 465 - // Re-scan everything 466 - } 467 - SessionEvent::SessionChanged { path, .. } => { 468 - let mmap = self.mmap_file(&path).await?; 469 - let data: SessionInfo = serde_json::from_slice(&mmap)?; 470 - self.index.update_session(data); 471 - } 472 - SessionEvent::SessionDeleted { session_id, .. } => { 473 - self.index.remove_session(&session_id); 474 - } 475 - SessionEvent::PartChanged { path, message_id, .. } => { 476 - let mmap = self.mmap_file(&path).await?; 477 - self.index.update_part_location(message_id, path, mmap); 478 - } 479 - SessionEvent::PartDeleted { part_id, .. } => { 480 - self.index.remove_part(&part_id); 481 - } 482 - // ... other events 483 - } 484 - Ok(()) 485 - } 486 - 487 - async fn mmap_file(&mut self, path: &Path) -> Result<Arc<Mmap>, Error> { 488 - if let Some(mmap) = self.files.get(path) { 489 - return Ok(Arc::clone(mmap)); 490 - } 491 - 492 - let file = File::open(path)?; 493 - let mmap = unsafe { Mmap::map(&file)? }; 494 - let mmap = Arc::new(mmap); 495 - self.files.insert(path.into(), Arc::clone(&mmap)); 496 - Ok(mmap) 497 - } 498 - } 499 - ``` 357 + **Files touched:** [`src/storage/mmap.rs`](/src/storage/mmap.rs), new `src/storage/generation.rs` 500 358 501 - --- 359 + ### Phase 2: Tracked Cache 502 360 503 - ## Options and Alternatives 361 + Replace `MappedFileCache` with `TrackedMappedFileCache` (or add generation-awareness to the existing one). The `get()` method checks dirty state and reloads transparently. 504 362 505 - ### Option 1: watchman_client (Recommended) 363 + **Files touched:** [`src/storage/mmap.rs`](/src/storage/mmap.rs), [`src/storage/mod.rs`](/src/storage/mod.rs) 506 364 507 - **Pros:** 508 - - Official Facebook crate, well-maintained 509 - - Efficient: uses inotify/FSEvents, no polling 510 - - Supports subscriptions (real-time push) 511 - - Supports "since" queries for efficient sync 512 - - Handles edge cases (directory moves, etc.) 365 + ### Phase 3: Index Dirty Tracking 513 366 514 - **Cons:** 515 - - Requires watchman daemon installation 516 - - Async-only API (requires tokio) 517 - - BSER serialization can be confusing 367 + Add `loaded_gen` to `SessionMeta`, `MessageMeta`, `PartRef`. The materializer checks dirty state on access and reloads stale entries. 518 368 519 - ### Option 2: notify crate 369 + **Files touched:** [`src/index.rs`](/src/index.rs), [`src/materializer.rs`](/src/materializer.rs) 520 370 521 - ```rust 522 - // Fallback if watchman not available 523 - use notify::{Watcher, RecursiveMode, watcher}; 371 + ### Phase 4: Watcher Task 524 372 525 - let (tx, rx) = channel(); 526 - let mut watcher = watcher(tx, Duration::from_millis(100))?; 527 - watcher.watch(path, RecursiveMode::Recursive)?; 528 - ``` 373 + Implement the watchman subscription loop in `src/watch/`. Wire it into `SessionMaterializer` so that constructing a materializer with the `watch` feature starts the background task. 529 374 530 - **Pros:** 531 - - Pure Rust, no external daemon 532 - - Simpler API 375 + **Files touched:** new `src/watch/watchman.rs`, `src/watch/mod.rs`, [`src/materializer.rs`](/src/materializer.rs) 533 376 534 - **Cons:** 535 - - No "since" queries - must track state yourself 536 - - Less efficient on large directories 537 - - No clock-based synchronization 377 + ### Phase 5: Graceful Degradation 538 378 539 - ### Option 3: Polling 379 + If watchman is unavailable, fall back to: 380 + 1. `notify` crate (feature `watch-fallback`) — same dirty-tracking interface, different event source 381 + 2. Manual `refresh()` method — user calls it, we re-scan directories and diff against index 540 382 541 383 ```rust 542 - // Simple fallback for environments without any file watching 543 - loop { 544 - let changes = scan_directory()?; 545 - process_changes(changes); 546 - tokio::time::sleep(Duration::from_secs(1)).await; 384 + pub enum WatchBackend { 385 + Watchman(tokio::task::JoinHandle<()>), 386 + Notify(notify::RecommendedWatcher), 387 + Manual, // no background watching; user calls refresh() 547 388 } 548 389 ``` 549 390 550 - **Pros:** 551 - - Works everywhere 552 - - No dependencies 391 + --- 553 392 554 - **Cons:** 555 - - Inefficient 556 - - High latency 557 - - Doesn't scale to large directories 393 + ## Key Design Decisions 558 394 559 - ### Recommended Approach 395 + ### Why generations instead of dirty flags? 560 396 561 - 1. **Primary**: `watchman_client` for real-time updates 562 - 2. **Fallback**: `notify` crate if watchman not available 563 - 3. **Ultimate fallback**: Polling with configurable interval 397 + A `bool` dirty flag has an ABA problem: if a file changes twice before the first reload completes, clearing the flag after the first reload would hide the second change. Generations are monotonic — a reload at gen 5 doesn't clear a dirty mark at gen 7. 564 398 565 - ```rust 566 - pub enum WatchStrategy { 567 - Watchman(SessionWatcher), 568 - Notify(NotifyWatcher), 569 - Poll(PollWatcher), 570 - } 399 + ### Why lazy reload instead of eager? 571 400 572 - impl WatchStrategy { 573 - pub async fn detect() -> Self { 574 - if let Ok(client) = Connector::new().connect().await { 575 - return Self::Watchman(/* ... */); 576 - } 577 - if NotifyWatcher::is_available() { 578 - return Self::Notify(/* ... */); 579 - } 580 - Self::Poll(/* ... */) 581 - } 582 - } 583 - ``` 401 + Eager reload (re-read on every watchman event) wastes work. If 50 parts change in rapid succession during an active session, we'd re-parse 50 files immediately, but the consumer might only access 2 of them. Lazy reload defers the I/O to the actual access point. 584 402 585 - --- 403 + ### Why mark dirty + mmap reload instead of just invalidating the cache? 586 404 587 - ## Decision Points 405 + Cache invalidation forces a miss on next access, but loses the ability to serve the (slightly stale) old data in error cases. With dirty tracking, if the reload fails (e.g., the file is being atomically replaced), we can still serve the previous version and retry later. 588 406 589 - ### 1. Async vs Sync API 407 + ### Where does `is_fresh_instance` lead? 590 408 591 - **Question**: Should the watcher be async-only? 592 - 593 - **Recommendation**: Yes - watchman_client requires tokio. Offer a blocking wrapper if needed. 409 + When watchman reports `is_fresh_instance` (e.g., after daemon restart), we can't trust incrementals. Two options: 410 + 1. **Mark everything dirty** — conservative, forces lazy reload of everything on next access 411 + 2. **Full rebuild** — clear and re-scan the index 594 412 595 - ### 2. Event Batching 413 + Option 1 is better for our lazy model: we mark everything dirty but don't do any I/O until something is actually accessed. 596 414 597 - **Question**: Should events be batched or delivered individually? 415 + --- 598 416 599 - **Recommendation**: Batch by watchman tick - watchman naturally batches events within a settling period. 417 + ## Watchman API Reference 600 418 601 - ### 3. Error Recovery 419 + ### Connection 602 420 603 - **Question**: How to handle watchman daemon crashes/restarts? 421 + ```rust 422 + let client = Connector::new().connect().await?; 423 + let resolved = client 424 + .resolve_root(CanonicalPath::canonicalize(storage_path)?) 425 + .await?; 426 + ``` 604 427 605 - **Recommendation**: 606 - - Save clock value periodically 607 - - On reconnection, use since-query with saved clock 608 - - If `is_fresh_instance`, trigger full rebuild 428 + ### Subscription 609 429 610 - ### 4. Initial Load 430 + ```rust 431 + let (mut sub, initial) = client 432 + .subscribe::<SessionFile>(&resolved, SubscribeRequest { 433 + expression: Some(Expr::All(vec![ 434 + Expr::Suffix(vec!["json".into()]), 435 + ])), 436 + ..Default::default() 437 + }) 438 + .await?; 439 + ``` 611 440 612 - **Question**: How to handle initial state on startup? 441 + ### Event Loop 613 442 614 - **Recommendation**: 615 - - Use watchman query (not subscription) for initial scan 616 - - Subscription clock starts after initial query completes 617 - - Ensures no events missed between scan and subscribe 443 + ```rust 444 + loop { 445 + match sub.next().await? { 446 + SubscriptionData::FilesChanged(result) => { 447 + // result.is_fresh_instance — full snapshot vs incremental 448 + // result.files — changed files 449 + // result.clock — save for reconnection 450 + } 451 + SubscriptionData::Canceled => break, 452 + SubscriptionData::StateEnter { state_name, .. } => { /* VCS op */ } 453 + SubscriptionData::StateLeave { state_name, .. } => { /* VCS op done */ } 454 + } 455 + } 456 + ``` 618 457 619 - --- 458 + ### Since Query (for reconnection) 620 459 621 - ## Reference Materials 460 + ```rust 461 + let result = client 462 + .query::<SessionFile>(&resolved, QueryRequestCommon { 463 + since: Some(Clock::Spec(saved_clock)), 464 + expression: Some(Expr::Suffix(vec!["json".into()])), 465 + ..Default::default() 466 + }) 467 + .await?; 468 + ``` 622 469 623 - ### Code Locations 470 + ### Expression Reference 624 471 625 - | Component | Location | 626 - |-----------|----------| 627 - | watchman_client lib | `~/archive/facebook/watchman/watchman/rust/watchman_client/src/lib.rs` | 628 - | Subscription example | `~/archive/facebook/watchman/watchman/rust/watchman_client/examples/subscribe.rs` | 629 - | Since query example | `~/archive/facebook/watchman/watchman/rust/watchman_client/examples/since.rs` | 630 - | Expression types | `~/archive/facebook/watchman/watchman/rust/watchman_client/src/expr.rs` | 631 - | Field types | `~/archive/facebook/watchman/watchman/rust/watchman_client/src/fields.rs` | 632 - | Protocol types | `~/archive/facebook/watchman/watchman/rust/watchman_client/src/pdu.rs` | 472 + | Expression | Purpose | 473 + |---|---| 474 + | `Expr::Suffix(vec!["json".into()])` | Match by extension | 475 + | `Expr::Exists` | Only existing files | 476 + | `Expr::All(vec![...])` | AND | 477 + | `Expr::Any(vec![...])` | OR | 478 + | `Expr::Not(Box::new(...))` | Negate | 479 + | `Expr::DirName(DirNameTerm { path, depth })` | Match by directory | 480 + | `Expr::FileType(FileType::Regular)` | Only regular files | 633 481 634 - ### External Documentation 482 + ### Field Types 635 483 636 - - [Watchman Documentation](https://facebook.github.io/watchman/) 637 - - [Watchman Query Syntax](https://facebook.github.io/watchman/docs/file-query.html) 638 - - [Watchman Expression Syntax](https://facebook.github.io/watchman/docs/expr/empty.html) 639 - - [Clockspec Documentation](https://facebook.github.io/watchman/docs/clockspec.html) 484 + | Field | Type | Watchman name | 485 + |---|---|---| 486 + | `NameField` | `PathBuf` | `name` | 487 + | `ExistsField` | `bool` | `exists` | 488 + | `FileTypeField` | `FileType` | `type` | 489 + | `MTimeField` | `i64` | `mtime` | 490 + | `SizeField` | `u64` | `size` | 491 + | `ContentSha1HexField` | `ContentSha1Hex` | `content.sha1hex` | 492 + | `CreatedClockField` | `ClockSpec` | `cclock` | 493 + | `ObservedClockField` | `ClockSpec` | `oclock` | 640 494 641 495 --- 642 496 643 - ## Summary 497 + ## External References 644 498 645 - Watchman provides an efficient, event-driven way to keep the session materializer synchronized with on-disk changes. The integration pattern: 646 - 647 - 1. **Connect** to watchman daemon 648 - 2. **Subscribe** to the opencode storage directory with JSON filter 649 - 3. **Process** initial scan to build complete index 650 - 4. **Stream** change events to update index incrementally 651 - 5. **Handle** reconnection with since-queries using saved clock 652 - 653 - Combined with memory-mapped files for content access, this creates a low-overhead, always-up-to-date view of session data. 499 + - [`facebook/watchman`](https://github.com/facebook/watchman) — watchman source and Rust client 500 + - [`watchman_client` crate source](https://github.com/facebook/watchman/tree/main/watchman/rust/watchman_client/src) 501 + - [Watchman Subscribe API](https://facebook.github.io/watchman/docs/cmd/subscribe) 502 + - [Watchman Expression Syntax](https://facebook.github.io/watchman/docs/expr/allof) 503 + - [Clockspec Documentation](https://facebook.github.io/watchman/docs/clockspec)