Real-time index of opencode sessions
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Migrate core registries to papaya concurrent hash maps

Replace std/locking map usage in SessionIndex and MappedFileCache with papaya HashMap-backed registries. This aligns registry internals with the recombination direction and prepares for watch-driven concurrent mutation patterns.

Key updates:
- SessionIndex registries now use papaya for session/message/part maps and reverse-index maps.
- Index read APIs were made clone-based to avoid leaking guard lifetimes across module boundaries.
- MappedFileCache now uses papaya pin-based operations for get/insert/remove/clear/prune.
- Materializer signatures were adjusted to consume owned index outputs (Vec/clone) cleanly.

Behavior remains equivalent from a consumer perspective while reducing lock management and unifying registry infrastructure for upcoming ref-tree flow work.

rektide 63eb2b2b 53cb8fef

+108 -87
+62 -49
src/index.rs
··· 2 2 use crate::storage::{FileReader, StoragePaths}; 3 3 use crate::types::{Message, Part}; 4 4 use crate::Result; 5 - use std::collections::HashMap; 5 + use papaya::HashMap; 6 6 use std::path::PathBuf; 7 7 8 8 #[derive(Debug, Clone)] ··· 104 104 } 105 105 } 106 106 107 - #[derive(Debug, Default)] 107 + #[derive(Debug)] 108 108 pub struct SessionIndex { 109 109 session_metas: HashMap<SessionId, SessionMeta>, 110 110 message_metas: HashMap<MessageId, MessageMeta>, ··· 114 114 part_ids_by_message: HashMap<MessageId, Vec<PartId>>, 115 115 } 116 116 117 + impl Default for SessionIndex { 118 + fn default() -> Self { 119 + Self { 120 + session_metas: HashMap::new(), 121 + message_metas: HashMap::new(), 122 + part_refs: HashMap::new(), 123 + session_ids_by_project: HashMap::new(), 124 + message_ids_by_session: HashMap::new(), 125 + part_ids_by_message: HashMap::new(), 126 + } 127 + } 128 + } 129 + 117 130 impl SessionIndex { 118 131 pub fn new() -> Self { 119 132 Self::default() ··· 129 142 Ok(builder.finish()) 130 143 } 131 144 132 - pub fn session_meta(&self, id: &SessionId) -> Option<&SessionMeta> { 133 - self.session_metas.get(id) 145 + pub fn session_meta(&self, id: &SessionId) -> Option<SessionMeta> { 146 + let metas = self.session_metas.pin(); 147 + metas.get(id).cloned() 134 148 } 135 149 136 - pub fn message_meta(&self, id: &MessageId) -> Option<&MessageMeta> { 137 - self.message_metas.get(id) 150 + pub fn message_meta(&self, id: &MessageId) -> Option<MessageMeta> { 151 + let metas = self.message_metas.pin(); 152 + metas.get(id).cloned() 138 153 } 139 154 140 - pub fn part_ref(&self, id: &PartId) -> Option<&PartRef> { 141 - self.part_refs.get(id) 155 + pub fn part_ref(&self, id: &PartId) -> Option<PartRef> { 156 + let refs = self.part_refs.pin(); 157 + refs.get(id).cloned() 142 158 } 143 159 144 - pub fn projects(&self) -> Vec<&str> { 145 - let mut projects: Vec<&str> = self 146 - .session_ids_by_project 147 - .keys() 148 - .map(String::as_str) 149 - .collect(); 150 - projects.sort_unstable(); 151 - projects 160 + pub fn projects(&self) -> Vec<String> { 161 + let projects = self.session_ids_by_project.pin(); 162 + let mut names: Vec<String> = projects.iter().map(|(name, _)| name.clone()).collect(); 163 + names.sort_unstable(); 164 + names 152 165 } 153 166 154 - pub fn session_metas(&self) -> impl Iterator<Item = &SessionMeta> { 155 - self.session_metas.values() 167 + pub fn session_metas(&self) -> Vec<SessionMeta> { 168 + let metas = self.session_metas.pin(); 169 + metas.iter().map(|(_, meta)| meta.clone()).collect() 156 170 } 157 171 158 - pub fn message_metas(&self) -> impl Iterator<Item = &MessageMeta> { 159 - self.message_metas.values() 172 + pub fn message_metas(&self) -> Vec<MessageMeta> { 173 + let metas = self.message_metas.pin(); 174 + metas.iter().map(|(_, meta)| meta.clone()).collect() 160 175 } 161 176 162 - pub fn part_refs(&self) -> impl Iterator<Item = &PartRef> { 163 - self.part_refs.values() 177 + pub fn part_refs(&self) -> Vec<PartRef> { 178 + let refs = self.part_refs.pin(); 179 + refs.iter().map(|(_, part_ref)| part_ref.clone()).collect() 164 180 } 165 181 166 - pub fn session_ids_for_project(&self, project_id: &str) -> &[SessionId] { 167 - self.session_ids_by_project 168 - .get(project_id) 169 - .map(Vec::as_slice) 170 - .unwrap_or(&[]) 182 + pub fn session_ids_for_project(&self, project_id: &str) -> Vec<SessionId> { 183 + let by_project = self.session_ids_by_project.pin(); 184 + by_project.get(project_id).cloned().unwrap_or_default() 171 185 } 172 186 173 - pub fn message_ids_for_session(&self, session_id: &SessionId) -> &[MessageId] { 174 - self.message_ids_by_session 175 - .get(session_id) 176 - .map(Vec::as_slice) 177 - .unwrap_or(&[]) 187 + pub fn message_ids_for_session(&self, session_id: &SessionId) -> Vec<MessageId> { 188 + let by_session = self.message_ids_by_session.pin(); 189 + by_session.get(session_id).cloned().unwrap_or_default() 178 190 } 179 191 180 - pub fn part_ids_for_message(&self, message_id: &MessageId) -> &[PartId] { 181 - self.part_ids_by_message 182 - .get(message_id) 183 - .map(Vec::as_slice) 184 - .unwrap_or(&[]) 192 + pub fn part_ids_for_message(&self, message_id: &MessageId) -> Vec<PartId> { 193 + let by_message = self.part_ids_by_message.pin(); 194 + by_message.get(message_id).cloned().unwrap_or_default() 185 195 } 186 196 187 - pub fn session_metas_for_project(&self, project_id: &str) -> Vec<&SessionMeta> { 197 + pub fn session_metas_for_project(&self, project_id: &str) -> Vec<SessionMeta> { 188 198 self.session_ids_for_project(project_id) 189 - .iter() 190 - .filter_map(|id| self.session_meta(id)) 199 + .into_iter() 200 + .filter_map(|id| self.session_meta(&id)) 191 201 .collect() 192 202 } 193 203 194 - pub fn message_metas_for_session(&self, session_id: &SessionId) -> Vec<&MessageMeta> { 204 + pub fn message_metas_for_session(&self, session_id: &SessionId) -> Vec<MessageMeta> { 195 205 self.message_ids_for_session(session_id) 196 - .iter() 197 - .filter_map(|id| self.message_meta(id)) 206 + .into_iter() 207 + .filter_map(|id| self.message_meta(&id)) 198 208 .collect() 199 209 } 200 210 201 - pub fn part_refs_for_message(&self, message_id: &MessageId) -> Vec<&PartRef> { 211 + pub fn part_refs_for_message(&self, message_id: &MessageId) -> Vec<PartRef> { 202 212 self.part_ids_for_message(message_id) 203 - .iter() 204 - .filter_map(|id| self.part_ref(id)) 213 + .into_iter() 214 + .filter_map(|id| self.part_ref(&id)) 205 215 .collect() 206 216 } 207 217 ··· 274 284 275 285 self.index 276 286 .session_ids_by_project 287 + .pin() 277 288 .insert(project_id.to_string(), indexed); 278 289 Ok(self) 279 290 } ··· 296 307 let message_count = indexed_message_ids.len(); 297 308 self.index 298 309 .message_ids_by_session 310 + .pin() 299 311 .insert(session_id.clone(), indexed_message_ids); 300 312 301 - self.index.session_metas.insert( 313 + self.index.session_metas.pin().insert( 302 314 session_id.clone(), 303 315 SessionMeta { 304 316 id: session_id.clone(), ··· 336 348 let part_count = indexed_part_ids.len(); 337 349 self.index 338 350 .part_ids_by_message 351 + .pin() 339 352 .insert(message_id.clone(), indexed_part_ids); 340 353 341 - self.index.message_metas.insert( 354 + self.index.message_metas.pin().insert( 342 355 message_id.clone(), 343 356 MessageMeta { 344 357 id: message_id.clone(), ··· 364 377 365 378 let kind = PartKind::from_part(&part); 366 379 let path = self.paths.part_file(message_id, part_id); 367 - self.index.part_refs.insert( 380 + self.index.part_refs.pin().insert( 368 381 part_id.clone(), 369 382 PartRef::new( 370 383 part_id.clone(),
+23 -21
src/materializer.rs
··· 82 82 &self.reader 83 83 } 84 84 85 - pub fn project_ids(&self) -> Vec<&str> { 85 + pub fn project_ids(&self) -> Vec<String> { 86 86 self.index.projects() 87 87 } 88 88 89 - pub fn session_meta(&self, id: &SessionId) -> Option<&SessionMeta> { 89 + pub fn session_meta(&self, id: &SessionId) -> Option<SessionMeta> { 90 90 self.index.session_meta(id) 91 91 } 92 92 93 - pub fn message_meta(&self, id: &MessageId) -> Option<&MessageMeta> { 93 + pub fn message_meta(&self, id: &MessageId) -> Option<MessageMeta> { 94 94 self.index.message_meta(id) 95 95 } 96 96 97 - pub fn part_ref(&self, id: &PartId) -> Option<&PartRef> { 97 + pub fn part_ref(&self, id: &PartId) -> Option<PartRef> { 98 98 self.index.part_ref(id) 99 99 } 100 100 101 - pub fn session_ids_for_project(&self, project_id: &str) -> &[SessionId] { 101 + pub fn session_ids_for_project(&self, project_id: &str) -> Vec<SessionId> { 102 102 self.index.session_ids_for_project(project_id) 103 103 } 104 104 105 - pub fn message_ids_for_session(&self, session_id: &SessionId) -> &[MessageId] { 105 + pub fn message_ids_for_session(&self, session_id: &SessionId) -> Vec<MessageId> { 106 106 self.index.message_ids_for_session(session_id) 107 107 } 108 108 109 - pub fn part_ids_for_message(&self, message_id: &MessageId) -> &[PartId] { 109 + pub fn part_ids_for_message(&self, message_id: &MessageId) -> Vec<PartId> { 110 110 self.index.part_ids_for_message(message_id) 111 111 } 112 112 113 - pub fn session_metas_for_project(&self, project_id: &str) -> Vec<&SessionMeta> { 113 + pub fn session_metas_for_project(&self, project_id: &str) -> Vec<SessionMeta> { 114 114 self.index.session_metas_for_project(project_id) 115 115 } 116 116 117 - pub fn message_metas_for_session(&self, session_id: &SessionId) -> Vec<&MessageMeta> { 117 + pub fn message_metas_for_session(&self, session_id: &SessionId) -> Vec<MessageMeta> { 118 118 self.index.message_metas_for_session(session_id) 119 119 } 120 120 121 - pub fn part_refs_for_message(&self, message_id: &MessageId) -> Vec<&PartRef> { 121 + pub fn part_refs_for_message(&self, message_id: &MessageId) -> Vec<PartRef> { 122 122 self.index.part_refs_for_message(message_id) 123 123 } 124 124 ··· 157 157 158 158 pub fn load_parts_for_message(&self, message_id: &MessageId) -> Result<Vec<Part>> { 159 159 let part_ids = self.part_ids_for_message(message_id); 160 - self.load_parts_by_ids(part_ids) 160 + self.load_parts_by_ids(&part_ids) 161 161 } 162 162 163 163 pub fn load_parts_by_ids(&self, part_ids: &[PartId]) -> Result<Vec<Part>> { ··· 170 170 171 171 pub fn load_message_with_parts(&self, message_id: &MessageId) -> Result<MessageWithParts> { 172 172 let part_ids = self.part_ids_for_message(message_id); 173 - self.load_message_with_part_ids(message_id, part_ids) 173 + self.load_message_with_part_ids(message_id, &part_ids) 174 174 } 175 175 176 176 pub fn load_message_with_part_ids( ··· 209 209 let message_ids = self.message_ids_for_session(session_id); 210 210 let mut message_scopes = Vec::with_capacity(message_ids.len()); 211 211 212 - for message_id in message_ids { 212 + for message_id in &message_ids { 213 213 message_scopes.push(MessageFlowScope { 214 214 session_id: session_id.clone(), 215 215 message_id: message_id.clone(), 216 - part_ids: self.part_ids_for_message(message_id).to_vec(), 216 + part_ids: self.part_ids_for_message(message_id), 217 217 }); 218 218 } 219 219 ··· 235 235 pub fn plan_session_flow(&self, options: &SessionFlowOptions) -> Result<SessionFlowScope> { 236 236 let meta = self.require_session_meta(&options.session_id)?; 237 237 238 - let mut message_ids = self.message_ids_for_session(&options.session_id).to_vec(); 238 + let mut message_ids = self.message_ids_for_session(&options.session_id); 239 239 if let Some(limit) = options.message_limit { 240 240 message_ids.truncate(limit); 241 241 } ··· 260 260 for message_id in &scope.message_ids { 261 261 self.require_message_meta(message_id)?; 262 262 let mut part_ids = if options.include_parts { 263 - self.part_ids_for_message(message_id).to_vec() 263 + self.part_ids_for_message(message_id) 264 264 } else { 265 265 Vec::new() 266 266 }; ··· 294 294 }) 295 295 } 296 296 297 - pub fn session_metas_created_since(&self, since: i64) -> Vec<&SessionMeta> { 297 + pub fn session_metas_created_since(&self, since: i64) -> Vec<SessionMeta> { 298 298 self.index 299 299 .session_metas() 300 + .into_iter() 300 301 .filter(|meta| meta.created >= since) 301 302 .collect() 302 303 } 303 304 304 - pub fn session_metas_updated_since(&self, since: i64) -> Vec<&SessionMeta> { 305 + pub fn session_metas_updated_since(&self, since: i64) -> Vec<SessionMeta> { 305 306 self.index 306 307 .session_metas() 308 + .into_iter() 307 309 .filter(|meta| meta.updated >= since) 308 310 .collect() 309 311 } ··· 322 324 } 323 325 } 324 326 325 - fn require_session_meta(&self, session_id: &SessionId) -> Result<&SessionMeta> { 327 + fn require_session_meta(&self, session_id: &SessionId) -> Result<SessionMeta> { 326 328 self.session_meta(session_id) 327 329 .ok_or_else(|| Error::NotFound { 328 330 entity: "session", ··· 330 332 }) 331 333 } 332 334 333 - fn require_message_meta(&self, message_id: &MessageId) -> Result<&MessageMeta> { 335 + fn require_message_meta(&self, message_id: &MessageId) -> Result<MessageMeta> { 334 336 self.message_meta(message_id) 335 337 .ok_or_else(|| Error::NotFound { 336 338 entity: "message", ··· 338 340 }) 339 341 } 340 342 341 - fn require_part_ref(&self, part_id: &PartId) -> Result<&PartRef> { 343 + fn require_part_ref(&self, part_id: &PartId) -> Result<PartRef> { 342 344 self.part_ref(part_id).ok_or_else(|| Error::NotFound { 343 345 entity: "part", 344 346 id: part_id.to_string(),
+23 -17
src/storage/mmap.rs
··· 1 1 use crate::{Error, Result}; 2 2 use memmap2::Mmap; 3 - use parking_lot::RwLock; 4 - use std::collections::HashMap; 3 + use papaya::HashMap; 5 4 use std::fs::File; 6 5 use std::path::Path; 7 6 use std::sync::Arc; ··· 43 42 44 43 #[derive(Debug, Default)] 45 44 pub struct MappedFileCache { 46 - files: RwLock<HashMap<std::path::PathBuf, Arc<MappedFile>>>, 45 + files: HashMap<std::path::PathBuf, Arc<MappedFile>>, 47 46 } 48 47 49 48 impl MappedFileCache { ··· 62 61 } 63 62 64 63 fn find_cached(&self, path: &Path) -> Option<Arc<MappedFile>> { 65 - let files = self.files.read(); 64 + let files = self.files.pin(); 66 65 files.get(path).map(Arc::clone) 67 66 } 68 67 69 68 fn insert_if_absent(&self, path: &Path, mapped: &Arc<MappedFile>) { 70 - let mut files = self.files.write(); 71 - files 72 - .entry(path.to_path_buf()) 73 - .or_insert_with(|| Arc::clone(mapped)); 69 + let files = self.files.pin(); 70 + files.get_or_insert_with(path.to_path_buf(), || Arc::clone(mapped)); 74 71 } 75 72 76 73 pub fn contains(&self, path: &Path) -> bool { 77 - let files = self.files.read(); 74 + let files = self.files.pin(); 78 75 files.contains_key(path) 79 76 } 80 77 81 78 pub fn paths(&self) -> Vec<std::path::PathBuf> { 82 - let files = self.files.read(); 83 - files.keys().cloned().collect() 79 + let files = self.files.pin(); 80 + files.iter().map(|(path, _)| path.clone()).collect() 84 81 } 85 82 86 83 pub fn remove(&self, path: &Path) { 87 - let mut files = self.files.write(); 84 + let files = self.files.pin(); 88 85 files.remove(path); 89 86 } 90 87 91 88 pub fn clear(&self) { 92 - let mut files = self.files.write(); 89 + let files = self.files.pin(); 93 90 files.clear(); 94 91 } 95 92 96 93 pub fn len(&self) -> usize { 97 - self.files.read().len() 94 + self.files.len() 98 95 } 99 96 100 97 pub fn is_empty(&self) -> bool { 101 - self.files.read().is_empty() 98 + self.files.is_empty() 102 99 } 103 100 104 101 pub fn prune_unused(&self) { 105 - let mut files = self.files.write(); 106 - files.retain(|_, mapped| Arc::strong_count(mapped) > 1); 102 + let files = self.files.pin(); 103 + let mut stale = Vec::new(); 104 + for (path, mapped) in files.iter() { 105 + if Arc::strong_count(mapped) <= 1 { 106 + stale.push(path.clone()); 107 + } 108 + } 109 + 110 + for path in stale { 111 + files.remove(&path); 112 + } 107 113 } 108 114 109 115 pub fn with_file<T>(&self, path: &Path, f: impl FnOnce(&MappedFile) -> T) -> Result<T> {