Real-time index of opencode sessions
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Refactor API around composable index and materializer flows

rektide b7cbef06 ea8229f2

+1030 -343
+104 -1
README.md
··· 1 1 # opencode-session-rs 2 2 3 - > Reading and watching opencode session data in rust 3 + Read opencode session storage with typed Rust APIs, an indexed metadata layer, and lazy file loading. 4 + 5 + ## What this crate does 6 + 7 + - Detects opencode storage paths (`$XDG_DATA_HOME/opencode/storage` by default). 8 + - Builds an in-memory index of projects, sessions, messages, and parts. 9 + - Loads session/message/part JSON lazily through a memory-mapped file cache. 10 + - Lets you run workflows at different levels: index-only, per-entity load, or full session tree materialization. 11 + 12 + The primary high-level entrypoint is [`SessionMaterializer`](/src/materializer.rs). 13 + 14 + ## Install 15 + 16 + ```toml 17 + [dependencies] 18 + opencode-session = "0.1" 19 + ``` 20 + 21 + ## Quick start 22 + 23 + ```rust 24 + use opencode_session::{SessionId, SessionMaterializer}; 25 + use std::str::FromStr; 26 + 27 + fn main() -> opencode_session::Result<()> { 28 + let materializer = SessionMaterializer::new()?; 29 + 30 + for project_id in materializer.project_ids() { 31 + let session_ids = materializer.session_ids_for_project(project_id); 32 + println!("project={project_id} sessions={}", session_ids.len()); 33 + } 34 + 35 + let session_id = SessionId::from_str("ses_3975b29b7ffeDyjus9LjxKUoeX")?; 36 + let tree = materializer.load_session_tree(&session_id)?; 37 + println!("loaded {} messages", tree.messages.len()); 38 + 39 + Ok(()) 40 + } 41 + ``` 42 + 43 + ## API surfaces 44 + 45 + ### 1. Structural index access 46 + 47 + Use this when you want fast metadata queries without loading full JSON payloads. 48 + 49 + ```rust 50 + let materializer = SessionMaterializer::new()?; 51 + 52 + let session_meta = materializer.session_meta(&session_id); 53 + let message_ids = materializer.message_ids_for_session(&session_id); 54 + ``` 55 + 56 + ### 2. Piece-by-piece loading 57 + 58 + Use this when you want to deconstruct control flow and run explicit stages. 59 + 60 + ```rust 61 + let message_ids = materializer.message_ids_for_session(&session_id); 62 + 63 + for message_id in message_ids { 64 + let message = materializer.load_message(message_id)?; 65 + let parts = materializer.load_parts_for_message(message_id)?; 66 + println!("message={} parts={}", message.id(), parts.len()); 67 + } 68 + ``` 69 + 70 + ### 3. Full tree materialization 71 + 72 + Use this when you want one call returning session + messages + parts. 73 + 74 + ```rust 75 + let tree = materializer.load_session_tree(&session_id)?; 76 + println!("session={} messages={}", tree.session.info.id, tree.messages.len()); 77 + ``` 78 + 79 + ### 4. Incremental index construction 80 + 81 + Use this when you want to control indexing scope (for example, one project at a time). 82 + 83 + ```rust 84 + use opencode_session::{SessionIndex, StoragePaths}; 85 + 86 + let paths = StoragePaths::detect()?; 87 + let mut builder = SessionIndex::builder(paths); 88 + builder.index_project("05fc47d0307a3740bf2ac963190b7c5b029b6ab1")?; 89 + let index = builder.finish(); 90 + 91 + println!("indexed sessions={}", index.session_count()); 92 + ``` 93 + 94 + ## Core types 95 + 96 + - [`SessionIndex`](/src/index.rs): metadata graph (projects -> sessions -> messages -> parts) 97 + - [`SessionMaterializer`](/src/materializer.rs): high-level indexed read API 98 + - [`SessionLoader`](/src/loader.rs): lower-level loader over `FileReader` 99 + - [`FileReader`](/src/storage/reader.rs): path-based typed JSON reads + listing helpers 100 + - [`MappedFileCache`](/src/storage/mmap.rs): memory-mapped file cache 101 + 102 + ## Notes 103 + 104 + - This crate is pre-1.0 and API shaping is active. 105 + - Backward compatibility between minor revisions is not guaranteed yet. 106 + - `watch` and `watch-fallback` features are reserved for live change tracking integration.
+351
doc/discovery/watchman.md
··· 888 888 | **C. Two-tier** | Medium-High | Hot=none, Cold=mtime check | HashMap upsert (no I/O) | Hot=none, Cold=until access | 889 889 | **D. Epoch** | Low | 1 atomic load | 1 atomic tick per dir | Until next access (over-invalidates) | 890 890 | **E. Arc eviction** | Low | Cache miss on evicted | Hash remove | Until next access | 891 + 892 + --- 893 + 894 + # Proposed Hybrid Design: Event-Driven Hot Index + Generation-Gated Cold Cache 895 + 896 + This design combines the strongest parts of the current "Generation" model and alternative "Two-tier" model: 897 + 898 + - Keep a **hot metadata index** updated directly from watch events so structural queries are always current. 899 + - Keep **cold content caches** lazy, but gate freshness with **generation stamps** (not mtime alone). 900 + - Preserve stale-on-error behavior by only replacing old cached content after a successful reload. 901 + 902 + In short: watch events keep metadata accurate immediately, while actual JSON parse and mmap work remains on-demand. 903 + 904 + ## Why this hybrid 905 + 906 + The pure generation model has excellent correctness but does not expose "always fresh" structural metadata cheaply. The pure two-tier mtime model has good shape but risks same-second write misses on coarse filesystems. 907 + 908 + This hybrid resolves both: 909 + 910 + 1. **Structural correctness now**: index membership/count/list queries are updated at event time. 911 + 2. **Content correctness on access**: generation stamps avoid ABA and mtime granularity issues. 912 + 3. **Operational resilience**: stale cache entry remains available if reload fails. 913 + 914 + ## Architecture (refined) 915 + 916 + ```mermaid 917 + flowchart LR 918 + Watchman[Watchman/notify/manual source] --> Batcher[Event batcher] 919 + Batcher --> Classifier[Path classifier] 920 + Classifier --> HotIndex[HotIndex metadata tables] 921 + Classifier --> GenTracker[EntityGenerationTracker] 922 + 923 + Reader[SessionMaterializer read APIs] --> HotIndex 924 + Reader --> ColdCache[ColdContentCache and mmap cache] 925 + ColdCache --> GenTracker 926 + ColdCache --> Disk[(JSON files)] 927 + ``` 928 + 929 + ## Core data model 930 + 931 + ### Entity identity 932 + 933 + Use logical keys for dirtiness, not only raw paths: 934 + 935 + ```rust 936 + pub enum EntityKey { 937 + Session { project_id: String, session_id: SessionId }, 938 + Message { session_id: SessionId, message_id: MessageId }, 939 + Part { message_id: MessageId, part_id: PartId }, 940 + SessionDiff { session_id: SessionId }, 941 + } 942 + ``` 943 + 944 + ### Hot metadata index (event-driven) 945 + 946 + `HotIndex` stores only identity + filesystem metadata + reverse indexes. It is updated immediately on `exists/mtime/size` event payloads. 947 + 948 + Key invariant: if a file is deleted (`exists == false`), membership in hot indexes is removed immediately, including cascades. 949 + 950 + ### Generation tracker (entity-scoped) 951 + 952 + Track dirtiness by `EntityKey` and by `PathBuf` for mmap-level invalidation: 953 + 954 + ```rust 955 + pub struct EntityGenerationTracker { 956 + clock: Arc<GenerationClock>, 957 + // entity-level stale checks for parsed JSON objects 958 + entity_dirty: RwLock<HashMap<EntityKey, u64>>, 959 + // path-level stale checks for mmap entries 960 + path_dirty: RwLock<HashMap<PathBuf, u64>>, 961 + } 962 + ``` 963 + 964 + `mark_batch_dirty()` ticks once per batch and stamps all touched keys/paths with that generation. 965 + 966 + ### Cold caches (lazy) 967 + 968 + Cold entries store loaded generation: 969 + 970 + ```rust 971 + pub struct ColdEntry<T> { 972 + data: Arc<T>, 973 + loaded_gen: u64, 974 + } 975 + ``` 976 + 977 + On access: 978 + 979 + - if `tracker.entity_dirty_gen(key) <= loaded_gen`: serve cached value 980 + - else: reload, parse, swap cache entry, and clear dirty if current 981 + 982 + ## Event pipeline 983 + 984 + Each watch batch goes through four deterministic steps: 985 + 986 + 1. **Classify** path -> `EntityKey` + operation (`Changed` or `Deleted`) 987 + 2. **Apply hot metadata update** (upsert/remove + reverse index maintenance) 988 + 3. **Mark dirty generation** for entity and path 989 + 4. **Process deletion side effects** (evict cold caches and mmap entries; cascade children) 990 + 991 + This makes metadata updates eager and content updates lazy, without losing coherence. 992 + 993 + ## Read path contract 994 + 995 + Read APIs split into two categories: 996 + 997 + - **Structural reads** (`list sessions/messages/parts`, counts): read from `HotIndex` only. 998 + - **Content reads** (`session info`, `message body`, `part payload`): resolve key from `HotIndex`, then fetch from generation-gated `ColdCache`. 999 + 1000 + Pseudo-flow: 1001 + 1002 + ```rust 1003 + fn get_session_info(&self, key: &EntityKey) -> Result<Option<Arc<SessionInfo>>> { 1004 + // 1) key existence from hot index 1005 + if !self.hot_index.contains(key) { 1006 + return Ok(None); 1007 + } 1008 + 1009 + // 2) cold cache fast path 1010 + if let Some(entry) = self.cold.sessions.get(key) { 1011 + if self.tracker.is_entity_clean(key, entry.loaded_gen) { 1012 + return Ok(Some(Arc::clone(&entry.data))); 1013 + } 1014 + } 1015 + 1016 + // 3) reload path (do not discard old cache until success) 1017 + let parsed = Arc::new(self.reader.read_session_info(key)?); 1018 + let gen = self.clock.current(); 1019 + self.cold.sessions.insert(key.clone(), ColdEntry { data: Arc::clone(&parsed), loaded_gen: gen }); 1020 + self.tracker.clear_entity_if_current(key, gen); 1021 + Ok(Some(parsed)) 1022 + } 1023 + ``` 1024 + 1025 + ## `is_fresh_instance` and recovery 1026 + 1027 + When watchman reports `is_fresh_instance`, treat incremental continuity as broken. 1028 + 1029 + Hybrid policy: 1030 + 1031 + 1. Mark a **global recovery generation** (all entities considered potentially stale). 1032 + 2. Rebuild `HotIndex` via targeted scan or watchman snapshot payload. 1033 + 3. Keep cold caches lazy; stale checks force reload on access. 1034 + 1035 + This avoids eager full content parsing while restoring structural correctness immediately. 1036 + 1037 + ## Concurrency and locking 1038 + 1039 + - One writer task owns event application (`HotIndex` mutation + dirty stamping). 1040 + - Read paths use shared locks and mostly hit cache fast paths. 1041 + - Reload path performs I/O outside write lock; write lock is only taken to swap entry. 1042 + 1043 + This prevents long lock hold times under burst writes. 1044 + 1045 + ## Feature-gated backend abstraction 1046 + 1047 + Keep watch source pluggable behind one trait: 1048 + 1049 + ```rust 1050 + pub trait ChangeFeed { 1051 + async fn next_batch(&mut self) -> Result<Vec<FileChange>, Error>; 1052 + } 1053 + ``` 1054 + 1055 + Implementations: 1056 + 1057 + - `WatchmanFeed` (default with `watch` feature) 1058 + - `NotifyFeed` (`watch-fallback` feature) 1059 + - `ManualRefreshFeed` (explicit refresh API) 1060 + 1061 + All feeds emit the same `FileChange` model so the pipeline remains identical. 1062 + 1063 + ## Refined rollout plan 1064 + 1065 + ### Step 1: Introduce shared generation primitives 1066 + 1067 + Add/retain `GenerationClock` and tracker APIs in [`/src/storage/generation.rs`](/src/storage/generation.rs). 1068 + 1069 + ### Step 2: Add hot metadata index layer 1070 + 1071 + Split structural metadata from content cache in [`/src/index.rs`](/src/index.rs) and wire structural reads in [`/src/materializer.rs`](/src/materializer.rs). 1072 + 1073 + ### Step 3: Convert caches to generation-gated lazy reload 1074 + 1075 + Add `loaded_gen` semantics to mmap/content caches in [`/src/storage/mmap.rs`](/src/storage/mmap.rs) and materializer-owned cold stores. 1076 + 1077 + ### Step 4: Route watch events through classifier pipeline 1078 + 1079 + Implement event apply pipeline in [`/src/watch/watchman.rs`](/src/watch/watchman.rs) and expose backend abstraction in [`/src/watch/mod.rs`](/src/watch/mod.rs). 1080 + 1081 + ### Step 5: Recovery and fallback hardening 1082 + 1083 + Add `is_fresh_instance` recovery path and `notify/manual` feeds, keeping identical dirty semantics. 1084 + 1085 + ## Acceptance criteria 1086 + 1087 + 1. After create/update/delete, structural queries reflect changes without materializer rebuild. 1088 + 2. Content reads reload only on first access after dirty event. 1089 + 3. Two rapid writes before a read never regress to stale content (ABA-safe). 1090 + 4. Reload failure keeps prior cache entry readable and retries on subsequent access. 1091 + 5. Watchman outage degrades to fallback/manual mode without API shape changes. 1092 + 1093 + ## Observability and Change Data Capture (CDC) 1094 + 1095 + This subsystem should expose changes as **structured events** carried over an **async stream**. The async stream gives transport/backpressure semantics; the event envelope gives domain structure and versioning. 1096 + 1097 + ### Recommended model 1098 + 1099 + Use a dual-surface API: 1100 + 1101 + 1. **Low-level entity stream** for infra consumers (replication, cache invalidation, analytics). 1102 + 2. **High-level session stream** for product consumers (UIs, SDK state sync) that only care "which session changed". 1103 + 1104 + Both are produced from the same internal event hub. 1105 + 1106 + ```rust 1107 + pub trait ChangePublisher { 1108 + fn subscribe_events(&self) -> Pin<Box<dyn Stream<Item = CdcEvent> + Send>>; 1109 + fn subscribe_sessions(&self) -> Pin<Box<dyn Stream<Item = SessionUpdate> + Send>>; 1110 + fn current_generation(&self) -> u64; 1111 + } 1112 + ``` 1113 + 1114 + ### Event envelope (structured, versioned) 1115 + 1116 + ```rust 1117 + pub struct EventCursor { 1118 + pub generation: u64, 1119 + pub seq_in_generation: u32, 1120 + } 1121 + 1122 + pub enum ChangeOp { 1123 + Upsert, 1124 + Delete, 1125 + Reloaded, 1126 + ReloadFailed, 1127 + ResyncStarted, 1128 + ResyncCompleted, 1129 + } 1130 + 1131 + pub enum ChangeEntity { 1132 + Session { project_id: String, session_id: SessionId }, 1133 + Message { session_id: SessionId, message_id: MessageId }, 1134 + Part { message_id: MessageId, part_id: PartId }, 1135 + SessionDiff { session_id: SessionId }, 1136 + } 1137 + 1138 + pub struct CdcEvent { 1139 + pub cursor: EventCursor, 1140 + pub entity: ChangeEntity, 1141 + pub op: ChangeOp, 1142 + pub path: Option<PathBuf>, 1143 + pub watch_clock: Option<String>, 1144 + pub emitted_at_unix_ms: i64, 1145 + } 1146 + ``` 1147 + 1148 + Notes: 1149 + 1150 + - `generation` is the coarse ordering/version boundary. 1151 + - `seq_in_generation` disambiguates multiple events in the same generation. 1152 + - `cursor` (`generation`, `seq_in_generation`) is the consumer-facing event id. 1153 + 1154 + ### Primary keys to emit 1155 + 1156 + Emit keys that let consumers join without reading payload bodies: 1157 + 1158 + - **Session key:** `(project_id, session_id)` 1159 + - **Message key:** `(session_id, message_id)` 1160 + - **Part key:** `(message_id, part_id)` 1161 + - **Diff key:** `session_id` 1162 + 1163 + `generation` is not an entity primary key; it is a **version/order key**. Consumers should index both: 1164 + 1165 + 1. entity key for identity 1166 + 2. cursor for ordering + dedupe 1167 + 1168 + ### What should "message ids" look like? 1169 + 1170 + There are two distinct ids to keep separate: 1171 + 1172 + 1. **Domain message id**: existing `MessageId` from storage paths (identity of the message entity). 1173 + 2. **CDC event message id**: `EventCursor` (`generation`, `seq_in_generation`) for ordering/replay. 1174 + 1175 + If a single string id is needed for external systems, encode cursor as `<generation>-<seq>` (for example, `1842-7`). 1176 + 1177 + ### Emitting updated sessions to consuming libraries 1178 + 1179 + For most consumers, emit a compact session-level projection event: 1180 + 1181 + ```rust 1182 + pub struct SessionUpdate { 1183 + pub project_id: String, 1184 + pub session_id: SessionId, 1185 + pub generation: u64, 1186 + pub deleted: bool, 1187 + pub changed_messages: Vec<MessageId>, 1188 + pub changed_parts: Vec<PartId>, 1189 + } 1190 + ``` 1191 + 1192 + Guidance: 1193 + 1194 + - Emit `SessionUpdate` once per session per generation (coalesced), not once per file. 1195 + - Keep payload key-first; consumers fetch full content lazily from materializer APIs. 1196 + - Include `deleted=true` tombstones so mirrors can remove local state. 1197 + 1198 + ### Delivery semantics 1199 + 1200 + Recommended semantics for in-process stream subscribers: 1201 + 1202 + - **At-least-once** delivery. 1203 + - **In-order per process** by `EventCursor`. 1204 + - **Idempotent consumption** required (dedupe by cursor). 1205 + 1206 + Backpressure policy should be explicit: 1207 + 1208 + - Small bounded channel for low latency. 1209 + - On overflow, emit `ResyncStarted/ResyncCompleted` and require consumers to re-sync from materializer state. 1210 + 1211 + ### Observability signals 1212 + 1213 + Expose metrics and traces around three points: ingest, apply, consume. 1214 + 1215 + Suggested metrics: 1216 + 1217 + - `watch_events_total{backend,entity,op}` 1218 + - `cdc_events_emitted_total{entity,op}` 1219 + - `cdc_events_dropped_total{reason}` 1220 + - `hot_index_apply_latency_seconds` 1221 + - `cold_reload_latency_seconds{entity,outcome}` 1222 + - `cold_reload_failures_total{entity,error_kind}` 1223 + - `subscriber_lag_generations{subscriber}` 1224 + - `dirty_entities{entity}` (gauge) 1225 + 1226 + Suggested tracing spans: 1227 + 1228 + - `watch.batch` with `generation`, `batch_size`, `is_fresh_instance` 1229 + - `index.apply_event` with `entity`, `op`, `path` 1230 + - `cache.reload` with `entity`, `result`, `generation` 1231 + - `cdc.emit` with `cursor`, `entity`, `op` 1232 + 1233 + ### Optional durable CDC log 1234 + 1235 + If downstream consumers need replay across process restarts, add an optional append-only local log keyed by `EventCursor`. 1236 + 1237 + - Keep this behind a feature flag (for example, `cdc-log`). 1238 + - Persist only envelopes/keys, not full session payloads. 1239 + - On startup, stream historical events from last acknowledged cursor, then switch to live stream. 1240 + 1241 + This keeps the core subsystem lightweight while enabling stronger integration modes when needed.
+253 -147
src/index.rs
··· 1 1 use crate::id::{MessageId, PartId, SessionId}; 2 - use crate::storage::{FileReader, MappedFile, StoragePaths}; 3 - use crate::types::Part; 2 + use crate::storage::{FileReader, StoragePaths}; 3 + use crate::types::{Message, Part}; 4 4 use crate::Result; 5 5 use std::collections::HashMap; 6 6 use std::path::PathBuf; 7 - use std::sync::Arc; 8 7 9 8 #[derive(Debug, Clone)] 10 9 pub struct SessionMeta { ··· 30 29 Assistant, 31 30 } 32 31 32 + impl MessageRole { 33 + fn from_message(message: &Message) -> Self { 34 + match message { 35 + Message::User(_) => Self::User, 36 + Message::Assistant(_) => Self::Assistant, 37 + } 38 + } 39 + } 40 + 33 41 #[derive(Debug, Clone)] 34 42 pub struct PartRef { 35 43 pub id: PartId, 36 44 pub message_id: MessageId, 37 45 pub session_id: SessionId, 38 - pub part_type: PartType, 46 + pub kind: PartKind, 39 47 pub path: PathBuf, 40 - mmap: Option<Arc<MappedFile>>, 41 48 } 42 49 43 50 impl PartRef { ··· 45 52 id: PartId, 46 53 message_id: MessageId, 47 54 session_id: SessionId, 48 - part_type: PartType, 55 + kind: PartKind, 49 56 path: PathBuf, 50 57 ) -> Self { 51 58 Self { 52 59 id, 53 60 message_id, 54 61 session_id, 55 - part_type, 62 + kind, 56 63 path, 57 - mmap: None, 58 64 } 59 65 } 60 66 61 - pub fn load(&mut self, reader: &FileReader) -> Result<Part> { 67 + pub fn read(&self, reader: &FileReader) -> Result<Part> { 62 68 reader.read_part(&self.message_id, &self.id) 63 69 } 64 - 65 - pub fn load_mapped( 66 - &mut self, 67 - cache: &crate::storage::MappedFileCache, 68 - ) -> Result<Arc<MappedFile>> { 69 - if self.mmap.is_none() { 70 - self.mmap = Some(cache.get(&self.path)?); 71 - } 72 - Ok(self.mmap.as_ref().unwrap().clone()) 73 - } 74 70 } 75 71 76 72 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 77 - pub enum PartType { 73 + pub enum PartKind { 78 74 Text, 79 75 Reasoning, 80 76 Tool, ··· 89 85 Retry, 90 86 } 91 87 92 - impl std::str::FromStr for PartType { 93 - type Err = (); 94 - 95 - fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { 96 - match s { 97 - "text" => Ok(Self::Text), 98 - "reasoning" => Ok(Self::Reasoning), 99 - "tool" => Ok(Self::Tool), 100 - "file" => Ok(Self::File), 101 - "step-start" => Ok(Self::StepStart), 102 - "step-finish" => Ok(Self::StepFinish), 103 - "snapshot" => Ok(Self::Snapshot), 104 - "patch" => Ok(Self::Patch), 105 - "agent" => Ok(Self::Agent), 106 - "subtask" => Ok(Self::Subtask), 107 - "compaction" => Ok(Self::Compaction), 108 - "retry" => Ok(Self::Retry), 109 - _ => Err(()), 88 + impl PartKind { 89 + fn from_part(part: &Part) -> Self { 90 + match part { 91 + Part::Text(_) => Self::Text, 92 + Part::Reasoning(_) => Self::Reasoning, 93 + Part::Tool(_) => Self::Tool, 94 + Part::File(_) => Self::File, 95 + Part::StepStart(_) => Self::StepStart, 96 + Part::StepFinish(_) => Self::StepFinish, 97 + Part::Snapshot(_) => Self::Snapshot, 98 + Part::Patch(_) => Self::Patch, 99 + Part::Agent(_) => Self::Agent, 100 + Part::Subtask(_) => Self::Subtask, 101 + Part::Compaction(_) => Self::Compaction, 102 + Part::Retry(_) => Self::Retry, 110 103 } 111 104 } 112 105 } 113 106 114 107 #[derive(Debug, Default)] 115 108 pub struct SessionIndex { 116 - pub sessions: HashMap<SessionId, SessionMeta>, 117 - pub messages: HashMap<MessageId, MessageMeta>, 118 - pub parts: HashMap<PartId, PartRef>, 119 - pub by_session: HashMap<SessionId, Vec<MessageId>>, 120 - pub by_message: HashMap<MessageId, Vec<PartId>>, 121 - pub by_project: HashMap<String, Vec<SessionId>>, 109 + session_metas: HashMap<SessionId, SessionMeta>, 110 + message_metas: HashMap<MessageId, MessageMeta>, 111 + part_refs: HashMap<PartId, PartRef>, 112 + session_ids_by_project: HashMap<String, Vec<SessionId>>, 113 + message_ids_by_session: HashMap<SessionId, Vec<MessageId>>, 114 + part_ids_by_message: HashMap<MessageId, Vec<PartId>>, 122 115 } 123 116 124 117 impl SessionIndex { ··· 126 119 Self::default() 127 120 } 128 121 122 + pub fn builder(paths: StoragePaths) -> SessionIndexBuilder { 123 + SessionIndexBuilder::new(paths) 124 + } 125 + 129 126 pub fn build(paths: &StoragePaths) -> Result<Self> { 130 - let mut index = Self::new(); 127 + let mut builder = Self::builder(paths.clone()); 128 + builder.index_all_projects()?; 129 + Ok(builder.finish()) 130 + } 131 + 132 + pub fn session_meta(&self, id: &SessionId) -> Option<&SessionMeta> { 133 + self.session_metas.get(id) 134 + } 135 + 136 + pub fn message_meta(&self, id: &MessageId) -> Option<&MessageMeta> { 137 + self.message_metas.get(id) 138 + } 139 + 140 + pub fn part_ref(&self, id: &PartId) -> Option<&PartRef> { 141 + self.part_refs.get(id) 142 + } 143 + 144 + pub fn projects(&self) -> Vec<&str> { 145 + let mut projects: Vec<&str> = self 146 + .session_ids_by_project 147 + .keys() 148 + .map(String::as_str) 149 + .collect(); 150 + projects.sort_unstable(); 151 + projects 152 + } 153 + 154 + pub fn session_metas(&self) -> impl Iterator<Item = &SessionMeta> { 155 + self.session_metas.values() 156 + } 157 + 158 + pub fn message_metas(&self) -> impl Iterator<Item = &MessageMeta> { 159 + self.message_metas.values() 160 + } 161 + 162 + pub fn part_refs(&self) -> impl Iterator<Item = &PartRef> { 163 + self.part_refs.values() 164 + } 165 + 166 + pub fn session_ids_for_project(&self, project_id: &str) -> &[SessionId] { 167 + self.session_ids_by_project 168 + .get(project_id) 169 + .map(Vec::as_slice) 170 + .unwrap_or(&[]) 171 + } 172 + 173 + pub fn message_ids_for_session(&self, session_id: &SessionId) -> &[MessageId] { 174 + self.message_ids_by_session 175 + .get(session_id) 176 + .map(Vec::as_slice) 177 + .unwrap_or(&[]) 178 + } 179 + 180 + pub fn part_ids_for_message(&self, message_id: &MessageId) -> &[PartId] { 181 + self.part_ids_by_message 182 + .get(message_id) 183 + .map(Vec::as_slice) 184 + .unwrap_or(&[]) 185 + } 186 + 187 + pub fn session_metas_for_project(&self, project_id: &str) -> Vec<&SessionMeta> { 188 + self.session_ids_for_project(project_id) 189 + .iter() 190 + .filter_map(|id| self.session_meta(id)) 191 + .collect() 192 + } 193 + 194 + pub fn message_metas_for_session(&self, session_id: &SessionId) -> Vec<&MessageMeta> { 195 + self.message_ids_for_session(session_id) 196 + .iter() 197 + .filter_map(|id| self.message_meta(id)) 198 + .collect() 199 + } 200 + 201 + pub fn part_refs_for_message(&self, message_id: &MessageId) -> Vec<&PartRef> { 202 + self.part_ids_for_message(message_id) 203 + .iter() 204 + .filter_map(|id| self.part_ref(id)) 205 + .collect() 206 + } 207 + 208 + pub fn session_count(&self) -> usize { 209 + self.session_metas.len() 210 + } 211 + 212 + pub fn message_count(&self) -> usize { 213 + self.message_metas.len() 214 + } 215 + 216 + pub fn part_count(&self) -> usize { 217 + self.part_refs.len() 218 + } 219 + 220 + pub fn project_count(&self) -> usize { 221 + self.session_ids_by_project.len() 222 + } 223 + } 224 + 225 + pub struct SessionIndexBuilder { 226 + reader: FileReader, 227 + paths: StoragePaths, 228 + index: SessionIndex, 229 + } 230 + 231 + impl SessionIndexBuilder { 232 + pub fn new(paths: StoragePaths) -> Self { 131 233 let reader = FileReader::with_paths(paths.clone()); 234 + Self { 235 + reader, 236 + paths, 237 + index: SessionIndex::new(), 238 + } 239 + } 132 240 133 - for project_id in paths.project_dirs()? { 134 - let session_ids = reader.list_sessions(&project_id)?; 135 - let mut built_session_ids = Vec::with_capacity(session_ids.len()); 241 + pub fn with_reader(reader: FileReader) -> Self { 242 + let paths = reader.paths().clone(); 243 + Self { 244 + reader, 245 + paths, 246 + index: SessionIndex::new(), 247 + } 248 + } 249 + 250 + pub fn reader(&self) -> &FileReader { 251 + &self.reader 252 + } 136 253 137 - for session_id in &session_ids { 138 - if index.build_session(&reader, paths, &project_id, session_id)? { 139 - built_session_ids.push(session_id.clone()); 140 - } 141 - } 254 + pub fn paths(&self) -> &StoragePaths { 255 + &self.paths 256 + } 142 257 143 - index.by_project.insert(project_id, built_session_ids); 258 + pub fn index_all_projects(&mut self) -> Result<&mut Self> { 259 + for project_id in self.paths.project_dirs()? { 260 + self.index_project(&project_id)?; 144 261 } 262 + Ok(self) 263 + } 145 264 146 - Ok(index) 265 + pub fn index_project(&mut self, project_id: &str) -> Result<&mut Self> { 266 + let session_ids = self.reader.list_sessions(project_id)?; 267 + let mut indexed = Vec::with_capacity(session_ids.len()); 268 + 269 + for session_id in session_ids { 270 + if self.index_session(project_id, &session_id)? { 271 + indexed.push(session_id); 272 + } 273 + } 274 + 275 + self.index 276 + .session_ids_by_project 277 + .insert(project_id.to_string(), indexed); 278 + Ok(self) 147 279 } 148 280 149 - fn build_session( 150 - &mut self, 151 - reader: &FileReader, 152 - paths: &StoragePaths, 153 - project_id: &str, 154 - session_id: &SessionId, 155 - ) -> Result<bool> { 156 - let session = match reader.read_session(project_id, session_id) { 157 - Ok(s) => s, 281 + pub fn index_session(&mut self, project_id: &str, session_id: &SessionId) -> Result<bool> { 282 + let session = match self.reader.read_session(project_id, session_id) { 283 + Ok(session) => session, 158 284 Err(_) => return Ok(false), 159 285 }; 160 286 161 - let message_ids = reader.list_messages(session_id).unwrap_or_default(); 162 - let message_count = message_ids.len(); 287 + let message_ids = self.reader.list_messages(session_id).unwrap_or_default(); 288 + let mut indexed_message_ids = Vec::with_capacity(message_ids.len()); 289 + 290 + for message_id in message_ids { 291 + if self.index_message(session_id, &message_id)? { 292 + indexed_message_ids.push(message_id); 293 + } 294 + } 295 + 296 + let message_count = indexed_message_ids.len(); 297 + self.index 298 + .message_ids_by_session 299 + .insert(session_id.clone(), indexed_message_ids); 163 300 164 - self.sessions.insert( 301 + self.index.session_metas.insert( 165 302 session_id.clone(), 166 303 SessionMeta { 167 304 id: session_id.clone(), ··· 173 310 }, 174 311 ); 175 312 176 - self.by_session 177 - .insert(session_id.clone(), message_ids.clone()); 178 - 179 - for msg_id in &message_ids { 180 - self.build_message(reader, paths, session_id, msg_id)?; 181 - } 182 - 183 313 Ok(true) 184 314 } 185 315 186 - fn build_message( 316 + pub fn index_message( 187 317 &mut self, 188 - reader: &FileReader, 189 - paths: &StoragePaths, 190 318 session_id: &SessionId, 191 - msg_id: &MessageId, 192 - ) -> Result<()> { 193 - let msg = match reader.read_message(session_id, msg_id) { 194 - Ok(m) => m, 195 - Err(_) => return Ok(()), 319 + message_id: &MessageId, 320 + ) -> Result<bool> { 321 + let message = match self.reader.read_message(session_id, message_id) { 322 + Ok(message) => message, 323 + Err(_) => return Ok(false), 196 324 }; 197 325 198 - let role = match &msg { 199 - crate::types::Message::User(_) => MessageRole::User, 200 - crate::types::Message::Assistant(_) => MessageRole::Assistant, 201 - }; 326 + let role = MessageRole::from_message(&message); 327 + let part_ids = self.reader.list_parts(message_id).unwrap_or_default(); 328 + let mut indexed_part_ids = Vec::with_capacity(part_ids.len()); 202 329 203 - let part_ids = reader.list_parts(msg_id).unwrap_or_default(); 204 - let part_count = part_ids.len(); 330 + for part_id in part_ids { 331 + if self.index_part(session_id, message_id, &part_id)? { 332 + indexed_part_ids.push(part_id); 333 + } 334 + } 335 + 336 + let part_count = indexed_part_ids.len(); 337 + self.index 338 + .part_ids_by_message 339 + .insert(message_id.clone(), indexed_part_ids); 205 340 206 - self.messages.insert( 207 - msg_id.clone(), 341 + self.index.message_metas.insert( 342 + message_id.clone(), 208 343 MessageMeta { 209 - id: msg_id.clone(), 344 + id: message_id.clone(), 210 345 session_id: session_id.clone(), 211 346 role, 212 347 part_count, 213 348 }, 214 349 ); 215 350 216 - self.by_message.insert(msg_id.clone(), part_ids.clone()); 217 - 218 - for part_id in &part_ids { 219 - let path = paths.part_file(msg_id, part_id); 220 - self.parts.insert( 221 - part_id.clone(), 222 - PartRef::new( 223 - part_id.clone(), 224 - msg_id.clone(), 225 - session_id.clone(), 226 - PartType::Text, 227 - path, 228 - ), 229 - ); 230 - } 231 - 232 - Ok(()) 233 - } 234 - 235 - pub fn session(&self, id: &SessionId) -> Option<&SessionMeta> { 236 - self.sessions.get(id) 237 - } 238 - 239 - pub fn message(&self, id: &MessageId) -> Option<&MessageMeta> { 240 - self.messages.get(id) 241 - } 242 - 243 - pub fn part(&self, id: &PartId) -> Option<&PartRef> { 244 - self.parts.get(id) 245 - } 246 - 247 - pub fn sessions_for_project(&self, project_id: &str) -> Vec<&SessionMeta> { 248 - self.by_project 249 - .get(project_id) 250 - .map(|ids| ids.iter().filter_map(|id| self.sessions.get(id)).collect()) 251 - .unwrap_or_default() 351 + Ok(true) 252 352 } 253 353 254 - pub fn messages_for_session(&self, session_id: &SessionId) -> Vec<&MessageMeta> { 255 - self.by_session 256 - .get(session_id) 257 - .map(|ids| ids.iter().filter_map(|id| self.messages.get(id)).collect()) 258 - .unwrap_or_default() 259 - } 260 - 261 - pub fn parts_for_message(&self, message_id: &MessageId) -> Vec<&PartRef> { 262 - self.by_message 263 - .get(message_id) 264 - .map(|ids| ids.iter().filter_map(|id| self.parts.get(id)).collect()) 265 - .unwrap_or_default() 266 - } 354 + fn index_part( 355 + &mut self, 356 + session_id: &SessionId, 357 + message_id: &MessageId, 358 + part_id: &PartId, 359 + ) -> Result<bool> { 360 + let part = match self.reader.read_part(message_id, part_id) { 361 + Ok(part) => part, 362 + Err(_) => return Ok(false), 363 + }; 267 364 268 - pub fn session_count(&self) -> usize { 269 - self.sessions.len() 270 - } 365 + let kind = PartKind::from_part(&part); 366 + let path = self.paths.part_file(message_id, part_id); 367 + self.index.part_refs.insert( 368 + part_id.clone(), 369 + PartRef::new( 370 + part_id.clone(), 371 + message_id.clone(), 372 + session_id.clone(), 373 + kind, 374 + path, 375 + ), 376 + ); 271 377 272 - pub fn message_count(&self) -> usize { 273 - self.messages.len() 378 + Ok(true) 274 379 } 275 380 276 - pub fn part_count(&self) -> usize { 277 - self.parts.len() 381 + pub fn finish(self) -> SessionIndex { 382 + self.index 278 383 } 279 384 } 280 385 ··· 288 393 assert_eq!(index.session_count(), 0); 289 394 assert_eq!(index.message_count(), 0); 290 395 assert_eq!(index.part_count(), 0); 396 + assert_eq!(index.project_count(), 0); 291 397 } 292 398 }
+8 -5
src/lib.rs
··· 11 11 12 12 pub use error::{Error, Result}; 13 13 pub use id::{MessageId, PartId, SessionId}; 14 - pub use index::{SessionIndex, MessageMeta, PartRef, SessionMeta}; 15 - pub use loader::SessionLoader; 16 - pub use materializer::SessionMaterializer; 14 + pub use index::{ 15 + MessageMeta, MessageRole, PartKind, PartRef, SessionIndex, SessionIndexBuilder, SessionMeta, 16 + }; 17 + pub use loader::{LoadedSession, MessageWithParts, SessionLoader, SessionTree}; 18 + pub use materializer::{SessionMaterializer, Stats as MaterializerStats}; 19 + pub use storage::{FileReader, MappedFile, MappedFileCache, StoragePaths}; 17 20 pub use types::{ 18 - message::{AssistantMessage, Message, UserMessage}, 21 + message::{AssistantMessage, FileDiff, Message, UserMessage}, 19 22 part::Part, 20 - session::SessionInfo, 23 + session::{SessionInfo, SessionShare, SessionSummary, SessionTime}, 21 24 };
+33 -8
src/loader.rs
··· 38 38 39 39 pub fn load_messages(&self, session_id: &SessionId) -> Result<Vec<Message>> { 40 40 let message_ids = self.reader.list_messages(session_id)?; 41 + self.load_messages_by_ids(session_id, &message_ids) 42 + } 43 + 44 + pub fn load_messages_by_ids( 45 + &self, 46 + session_id: &SessionId, 47 + message_ids: &[MessageId], 48 + ) -> Result<Vec<Message>> { 41 49 let mut messages = Vec::with_capacity(message_ids.len()); 42 - for msg_id in &message_ids { 50 + for msg_id in message_ids { 43 51 messages.push(self.load_message(session_id, msg_id)?); 44 52 } 45 53 Ok(messages) ··· 47 55 48 56 pub fn load_parts(&self, message_id: &MessageId) -> Result<Vec<Part>> { 49 57 let part_ids = self.reader.list_parts(message_id)?; 58 + self.load_parts_by_ids(message_id, &part_ids) 59 + } 60 + 61 + pub fn load_parts_by_ids( 62 + &self, 63 + message_id: &MessageId, 64 + part_ids: &[PartId], 65 + ) -> Result<Vec<Part>> { 50 66 let mut parts = Vec::with_capacity(part_ids.len()); 51 - for part_id in &part_ids { 67 + for part_id in part_ids { 52 68 parts.push(self.load_part(message_id, part_id)?); 53 69 } 54 70 Ok(parts) ··· 60 76 message_id: &MessageId, 61 77 ) -> Result<MessageWithParts> { 62 78 let message = self.load_message(session_id, message_id)?; 63 - let parts = self.load_parts(message_id)?; 79 + let part_ids = self.reader.list_parts(message_id)?; 80 + let parts = self.load_parts_by_ids(message_id, &part_ids)?; 64 81 Ok(MessageWithParts { message, parts }) 65 82 } 66 83 84 + pub fn load_messages_with_parts( 85 + &self, 86 + session_id: &SessionId, 87 + message_ids: &[MessageId], 88 + ) -> Result<Vec<MessageWithParts>> { 89 + let mut messages = Vec::with_capacity(message_ids.len()); 90 + for message_id in message_ids { 91 + messages.push(self.load_message_with_parts(session_id, message_id)?); 92 + } 93 + Ok(messages) 94 + } 95 + 67 96 pub fn load_session_tree( 68 97 &self, 69 98 project_id: &str, ··· 71 100 ) -> Result<SessionTree> { 72 101 let session = self.load_session(project_id, session_id)?; 73 102 let message_ids = self.reader.list_messages(session_id)?; 74 - 75 - let mut messages = Vec::with_capacity(message_ids.len()); 76 - for msg_id in &message_ids { 77 - messages.push(self.load_message_with_parts(session_id, msg_id)?); 78 - } 103 + let messages = self.load_messages_with_parts(session_id, &message_ids)?; 79 104 80 105 Ok(SessionTree { session, messages }) 81 106 }
+116 -68
src/materializer.rs
··· 1 1 use crate::id::{MessageId, PartId, SessionId}; 2 2 use crate::index::{MessageMeta, PartRef, SessionIndex, SessionMeta}; 3 3 use crate::loader::{LoadedSession, MessageWithParts, SessionTree}; 4 - use crate::storage::{FileReader, MappedFileCache, StoragePaths}; 4 + use crate::storage::{FileReader, MappedFile, StoragePaths}; 5 5 use crate::types::{Message, Part, SessionInfo}; 6 - use crate::Result; 6 + use crate::{Error, Result}; 7 + use std::path::Path; 7 8 use std::sync::Arc; 8 9 9 10 pub struct SessionMaterializer { 10 11 reader: FileReader, 11 12 index: SessionIndex, 12 - cache: MappedFileCache, 13 13 } 14 14 15 15 impl SessionMaterializer { 16 16 pub fn new() -> Result<Self> { 17 + Self::detect() 18 + } 19 + 20 + pub fn detect() -> Result<Self> { 17 21 let paths = StoragePaths::detect()?; 18 22 Self::with_paths(paths) 19 23 } 20 24 21 25 pub fn with_paths(paths: StoragePaths) -> Result<Self> { 22 - let reader = FileReader::with_paths(paths); 26 + let reader = FileReader::with_paths(paths.clone()); 27 + let index = SessionIndex::build(&paths)?; 28 + Ok(Self { reader, index }) 29 + } 30 + 31 + pub fn with_reader(reader: FileReader) -> Result<Self> { 23 32 let index = SessionIndex::build(reader.paths())?; 24 - let cache = MappedFileCache::new(); 25 - Ok(Self { 26 - reader, 27 - index, 28 - cache, 29 - }) 33 + Ok(Self { reader, index }) 34 + } 35 + 36 + pub fn refresh_index(&mut self) -> Result<()> { 37 + self.index = SessionIndex::build(self.reader.paths())?; 38 + Ok(()) 30 39 } 31 40 32 41 pub fn index(&self) -> &SessionIndex { ··· 37 46 &self.reader 38 47 } 39 48 40 - pub fn sessions(&self) -> &std::collections::HashMap<SessionId, SessionMeta> { 41 - &self.index.sessions 49 + pub fn project_ids(&self) -> Vec<&str> { 50 + self.index.projects() 42 51 } 43 52 44 - pub fn session(&self, id: &SessionId) -> Option<&SessionMeta> { 45 - self.index.session(id) 53 + pub fn session_meta(&self, id: &SessionId) -> Option<&SessionMeta> { 54 + self.index.session_meta(id) 46 55 } 47 56 48 - pub fn projects(&self) -> &std::collections::HashMap<String, Vec<SessionId>> { 49 - &self.index.by_project 57 + pub fn message_meta(&self, id: &MessageId) -> Option<&MessageMeta> { 58 + self.index.message_meta(id) 50 59 } 51 60 52 - pub fn sessions_for_project(&self, project_id: &str) -> Vec<&SessionMeta> { 53 - self.index.sessions_for_project(project_id) 61 + pub fn part_ref(&self, id: &PartId) -> Option<&PartRef> { 62 + self.index.part_ref(id) 54 63 } 55 64 56 - pub fn messages_for_session(&self, session_id: &SessionId) -> Vec<&MessageMeta> { 57 - self.index.messages_for_session(session_id) 65 + pub fn session_ids_for_project(&self, project_id: &str) -> &[SessionId] { 66 + self.index.session_ids_for_project(project_id) 58 67 } 59 68 60 - pub fn parts_for_message(&self, message_id: &MessageId) -> Vec<&PartRef> { 61 - self.index.parts_for_message(message_id) 69 + pub fn message_ids_for_session(&self, session_id: &SessionId) -> &[MessageId] { 70 + self.index.message_ids_for_session(session_id) 62 71 } 63 72 64 - pub fn load_session(&self, project_id: &str, id: &SessionId) -> Result<SessionInfo> { 65 - self.reader.read_session(project_id, id) 73 + pub fn part_ids_for_message(&self, message_id: &MessageId) -> &[PartId] { 74 + self.index.part_ids_for_message(message_id) 75 + } 76 + 77 + pub fn session_metas_for_project(&self, project_id: &str) -> Vec<&SessionMeta> { 78 + self.index.session_metas_for_project(project_id) 66 79 } 67 80 68 - pub fn load_session_with_diff( 69 - &self, 70 - project_id: &str, 71 - id: &SessionId, 72 - ) -> Result<LoadedSession> { 73 - let info = self.reader.read_session(project_id, id)?; 74 - let diff = self.reader.read_diff(id).ok(); 81 + pub fn message_metas_for_session(&self, session_id: &SessionId) -> Vec<&MessageMeta> { 82 + self.index.message_metas_for_session(session_id) 83 + } 84 + 85 + pub fn part_refs_for_message(&self, message_id: &MessageId) -> Vec<&PartRef> { 86 + self.index.part_refs_for_message(message_id) 87 + } 88 + 89 + pub fn load_session_info(&self, session_id: &SessionId) -> Result<SessionInfo> { 90 + let meta = self.require_session_meta(session_id)?; 91 + self.reader.read_session(&meta.project_id, session_id) 92 + } 93 + 94 + pub fn load_session(&self, session_id: &SessionId) -> Result<LoadedSession> { 95 + let info = self.load_session_info(session_id)?; 96 + let diff = self.reader.read_diff(session_id).ok(); 75 97 Ok(LoadedSession { info, diff }) 76 98 } 77 99 78 - pub fn load_message(&self, session_id: &SessionId, id: &MessageId) -> Result<Message> { 79 - self.reader.read_message(session_id, id) 100 + pub fn load_message(&self, message_id: &MessageId) -> Result<Message> { 101 + let meta = self.require_message_meta(message_id)?; 102 + self.reader.read_message(&meta.session_id, message_id) 80 103 } 81 104 82 - pub fn load_part(&self, message_id: &MessageId, id: &PartId) -> Result<Part> { 83 - self.reader.read_part(message_id, id) 105 + pub fn load_part(&self, part_id: &PartId) -> Result<Part> { 106 + let part_ref = self.require_part_ref(part_id)?; 107 + self.reader.read_part(&part_ref.message_id, part_id) 84 108 } 85 109 86 - pub fn load_message_with_parts( 87 - &self, 88 - session_id: &SessionId, 89 - message_id: &MessageId, 90 - ) -> Result<MessageWithParts> { 91 - let message = self.reader.read_message(session_id, message_id)?; 92 - let part_ids = self.reader.list_parts(message_id)?; 110 + pub fn load_parts_for_message(&self, message_id: &MessageId) -> Result<Vec<Part>> { 111 + let part_ids = self.part_ids_for_message(message_id); 93 112 let mut parts = Vec::with_capacity(part_ids.len()); 94 - for part_id in &part_ids { 95 - parts.push(self.reader.read_part(message_id, part_id)?); 113 + for part_id in part_ids { 114 + parts.push(self.load_part(part_id)?); 96 115 } 116 + Ok(parts) 117 + } 118 + 119 + pub fn load_message_with_parts(&self, message_id: &MessageId) -> Result<MessageWithParts> { 120 + let message = self.load_message(message_id)?; 121 + let parts = self.load_parts_for_message(message_id)?; 97 122 Ok(MessageWithParts { message, parts }) 98 123 } 99 124 100 - pub fn load_session_tree( 125 + pub fn load_messages_with_parts_for_session( 101 126 &self, 102 - project_id: &str, 103 127 session_id: &SessionId, 104 - ) -> Result<SessionTree> { 105 - let session = self.load_session_with_diff(project_id, session_id)?; 106 - let message_metas = self.index.messages_for_session(session_id); 107 - 108 - let mut messages = Vec::with_capacity(message_metas.len()); 109 - for msg_meta in message_metas { 110 - messages.push(self.load_message_with_parts(session_id, &msg_meta.id)?); 128 + ) -> Result<Vec<MessageWithParts>> { 129 + let message_ids = self.message_ids_for_session(session_id); 130 + let mut messages = Vec::with_capacity(message_ids.len()); 131 + for message_id in message_ids { 132 + messages.push(self.load_message_with_parts(message_id)?); 111 133 } 134 + Ok(messages) 135 + } 112 136 137 + pub fn load_session_tree(&self, session_id: &SessionId) -> Result<SessionTree> { 138 + let session = self.load_session(session_id)?; 139 + let messages = self.load_messages_with_parts_for_session(session_id)?; 113 140 Ok(SessionTree { session, messages }) 114 141 } 115 142 116 - pub fn sessions_by_time(&self, since: i64) -> Vec<&SessionMeta> { 143 + pub fn session_metas_created_since(&self, since: i64) -> Vec<&SessionMeta> { 117 144 self.index 118 - .sessions 119 - .values() 120 - .filter(|s| s.created >= since) 145 + .session_metas() 146 + .filter(|meta| meta.created >= since) 121 147 .collect() 122 148 } 123 149 124 - pub fn sessions_updated_since(&self, since: i64) -> Vec<&SessionMeta> { 150 + pub fn session_metas_updated_since(&self, since: i64) -> Vec<&SessionMeta> { 125 151 self.index 126 - .sessions 127 - .values() 128 - .filter(|s| s.updated >= since) 152 + .session_metas() 153 + .filter(|meta| meta.updated >= since) 129 154 .collect() 130 155 } 131 156 132 - pub fn mapped_file(&self, path: &std::path::Path) -> Result<Arc<crate::storage::MappedFile>> { 133 - self.cache.get(path) 157 + pub fn mapped_file(&self, path: &Path) -> Result<Arc<MappedFile>> { 158 + self.reader.read_mapped(path) 134 159 } 135 160 136 161 pub fn stats(&self) -> Stats { 137 162 Stats { 138 - sessions: self.index.sessions.len(), 139 - messages: self.index.messages.len(), 140 - parts: self.index.parts.len(), 141 - projects: self.index.by_project.len(), 142 - cached_files: self.cache.len(), 163 + sessions: self.index.session_count(), 164 + messages: self.index.message_count(), 165 + parts: self.index.part_count(), 166 + projects: self.index.project_count(), 167 + cached_files: self.reader.cache().len(), 143 168 } 169 + } 170 + 171 + fn require_session_meta(&self, session_id: &SessionId) -> Result<&SessionMeta> { 172 + self.session_meta(session_id) 173 + .ok_or_else(|| Error::NotFound { 174 + entity: "session", 175 + id: session_id.to_string(), 176 + }) 177 + } 178 + 179 + fn require_message_meta(&self, message_id: &MessageId) -> Result<&MessageMeta> { 180 + self.message_meta(message_id) 181 + .ok_or_else(|| Error::NotFound { 182 + entity: "message", 183 + id: message_id.to_string(), 184 + }) 185 + } 186 + 187 + fn require_part_ref(&self, part_id: &PartId) -> Result<&PartRef> { 188 + self.part_ref(part_id).ok_or_else(|| Error::NotFound { 189 + entity: "part", 190 + id: part_id.to_string(), 191 + }) 144 192 } 145 193 } 146 194
+56 -12
src/storage/mmap.rs
··· 1 1 use crate::{Error, Result}; 2 2 use memmap2::Mmap; 3 3 use parking_lot::RwLock; 4 + use std::collections::HashMap; 4 5 use std::fs::File; 5 6 use std::path::Path; 6 7 use std::sync::Arc; ··· 42 43 43 44 #[derive(Debug, Default)] 44 45 pub struct MappedFileCache { 45 - files: RwLock<Vec<(std::path::PathBuf, Arc<MappedFile>)>>, 46 + files: RwLock<HashMap<std::path::PathBuf, Arc<MappedFile>>>, 46 47 } 47 48 48 49 impl MappedFileCache { ··· 51 52 } 52 53 53 54 pub fn get(&self, path: &Path) -> Result<Arc<MappedFile>> { 54 - { 55 - let files = self.files.read(); 56 - if let Some((_, cached)) = files.iter().find(|(p, _)| p == path) { 57 - return Ok(Arc::clone(cached)); 58 - } 55 + if let Some(cached) = self.find_cached(path) { 56 + return Ok(cached); 59 57 } 60 58 61 59 let mapped = Arc::new(MappedFile::open(path)?); 60 + self.insert_if_absent(path, &mapped); 61 + Ok(mapped) 62 + } 63 + 64 + fn find_cached(&self, path: &Path) -> Option<Arc<MappedFile>> { 65 + let files = self.files.read(); 66 + files.get(path).map(Arc::clone) 67 + } 62 68 63 - { 64 - let mut files = self.files.write(); 65 - files.push((path.to_path_buf(), Arc::clone(&mapped))); 66 - } 69 + fn insert_if_absent(&self, path: &Path, mapped: &Arc<MappedFile>) { 70 + let mut files = self.files.write(); 71 + files 72 + .entry(path.to_path_buf()) 73 + .or_insert_with(|| Arc::clone(mapped)); 74 + } 67 75 68 - Ok(mapped) 76 + pub fn contains(&self, path: &Path) -> bool { 77 + let files = self.files.read(); 78 + files.contains_key(path) 79 + } 80 + 81 + pub fn paths(&self) -> Vec<std::path::PathBuf> { 82 + let files = self.files.read(); 83 + files.keys().cloned().collect() 69 84 } 70 85 71 86 pub fn remove(&self, path: &Path) { 72 87 let mut files = self.files.write(); 73 - files.retain(|(p, _)| p != path); 88 + files.remove(path); 74 89 } 75 90 76 91 pub fn clear(&self) { ··· 84 99 85 100 pub fn is_empty(&self) -> bool { 86 101 self.files.read().is_empty() 102 + } 103 + 104 + pub fn prune_unused(&self) { 105 + let mut files = self.files.write(); 106 + files.retain(|_, mapped| Arc::strong_count(mapped) > 1); 107 + } 108 + 109 + pub fn with_file<T>(&self, path: &Path, f: impl FnOnce(&MappedFile) -> T) -> Result<T> { 110 + let mapped = self.get(path)?; 111 + Ok(f(&mapped)) 87 112 } 88 113 } 89 114 ··· 121 146 122 147 assert_eq!(cache.len(), 1); 123 148 assert!(Arc::ptr_eq(&file1, &file2)); 149 + 150 + Ok(()) 151 + } 152 + 153 + #[test] 154 + fn test_prune_unused() -> Result<()> { 155 + let mut temp = NamedTempFile::new()?; 156 + write!(temp, "prune content")?; 157 + 158 + let cache = MappedFileCache::new(); 159 + let path = temp.path(); 160 + 161 + { 162 + let _file = cache.get(path)?; 163 + } 164 + 165 + assert_eq!(cache.len(), 1); 166 + cache.prune_unused(); 167 + assert_eq!(cache.len(), 0); 124 168 125 169 Ok(()) 126 170 }
+19 -6
src/storage/paths.rs
··· 98 98 99 99 for entry in entries { 100 100 let entry = entry.map_err(Error::Io)?; 101 - if entry.file_type().map_err(Error::Io)?.is_dir() { 102 - if let Some(name) = entry.file_name().to_str() { 103 - if !name.starts_with('.') { 104 - projects.push(name.to_string()); 105 - } 106 - } 101 + if let Some(name) = Self::visible_project_dir(&entry)? { 102 + projects.push(name); 107 103 } 108 104 } 109 105 106 + projects.sort(); 110 107 Ok(projects) 108 + } 109 + 110 + fn visible_project_dir(entry: &std::fs::DirEntry) -> Result<Option<String>> { 111 + if !entry.file_type().map_err(Error::Io)?.is_dir() { 112 + return Ok(None); 113 + } 114 + 115 + let Some(name) = entry.file_name().to_str().map(str::to_owned) else { 116 + return Ok(None); 117 + }; 118 + 119 + if name.starts_with('.') { 120 + return Ok(None); 121 + } 122 + 123 + Ok(Some(name)) 111 124 } 112 125 113 126 pub fn migration_version(&self) -> Result<u32> {
+45 -64
src/storage/reader.rs
··· 4 4 use crate::types::message::FileDiff; 5 5 use crate::types::{Message, Part, SessionInfo}; 6 6 use crate::{Error, Result}; 7 + use serde::de::DeserializeOwned; 7 8 use std::path::Path; 8 9 use std::sync::Arc; 9 10 ··· 37 38 self.cache.get(path) 38 39 } 39 40 41 + pub fn read_json<T: DeserializeOwned>(&self, path: &Path) -> Result<T> { 42 + let mapped = self.read_mapped(path)?; 43 + serde_json::from_slice(mapped.as_bytes()).map_err(Error::Json) 44 + } 45 + 40 46 pub fn read_session(&self, project_id: &str, id: &SessionId) -> Result<SessionInfo> { 41 47 let path = self.paths.session_file(project_id, id); 42 - let mapped = self.read_mapped(&path)?; 43 - let session: SessionInfo = 44 - serde_json::from_slice(mapped.as_bytes()).map_err(Error::Json)?; 45 - Ok(session) 48 + self.read_json(&path) 46 49 } 47 50 48 51 pub fn read_message(&self, session_id: &SessionId, id: &MessageId) -> Result<Message> { 49 52 let path = self.paths.message_file(session_id, id); 50 - let mapped = self.read_mapped(&path)?; 51 - let message: Message = serde_json::from_slice(mapped.as_bytes()).map_err(Error::Json)?; 52 - Ok(message) 53 + self.read_json(&path) 53 54 } 54 55 55 56 pub fn read_part(&self, message_id: &MessageId, id: &PartId) -> Result<Part> { 56 57 let path = self.paths.part_file(message_id, id); 57 - let mapped = self.read_mapped(&path)?; 58 - let part: Part = serde_json::from_slice(mapped.as_bytes()).map_err(Error::Json)?; 59 - Ok(part) 58 + self.read_json(&path) 60 59 } 61 60 62 61 pub fn read_diff(&self, session_id: &SessionId) -> Result<Vec<FileDiff>> { ··· 64 63 if !path.exists() { 65 64 return Ok(Vec::new()); 66 65 } 67 - let mapped = self.read_mapped(&path)?; 68 - let diffs: Vec<FileDiff> = 69 - serde_json::from_slice(mapped.as_bytes()).map_err(Error::Json)?; 70 - Ok(diffs) 66 + self.read_json(&path) 71 67 } 72 68 73 - pub fn list_sessions(&self, project_id: &str) -> Result<Vec<SessionId>> { 74 - let dir = self.paths.session_dir(project_id); 69 + fn list_ids_in_dir<T, F>(&self, dir: &Path, parse_id: F, descending: bool) -> Result<Vec<T>> 70 + where 71 + T: AsRef<str>, 72 + F: Fn(&str) -> Option<T>, 73 + { 75 74 if !dir.exists() { 76 75 return Ok(Vec::new()); 77 76 } 78 77 79 - let mut sessions = Vec::new(); 80 - for entry in std::fs::read_dir(&dir)? { 78 + let mut ids = Vec::new(); 79 + for entry in std::fs::read_dir(dir)? { 81 80 let entry = entry?; 81 + if !entry.file_type()?.is_file() { 82 + continue; 83 + } 84 + 82 85 let path = entry.path(); 83 - if path.extension().map(|e| e == "json").unwrap_or(false) { 84 - if let Some(filename) = path.file_name().and_then(|n| n.to_str()) { 85 - if let Some(id) = SessionId::from_filename(filename) { 86 - sessions.push(id); 87 - } 88 - } 86 + if path.extension().and_then(|ext| ext.to_str()) != Some("json") { 87 + continue; 88 + } 89 + 90 + let Some(filename) = path.file_name().and_then(|name| name.to_str()) else { 91 + continue; 92 + }; 93 + 94 + if let Some(id) = parse_id(filename) { 95 + ids.push(id); 89 96 } 90 97 } 91 98 92 - sessions.sort_by(|a, b| b.as_str().cmp(a.as_str())); 93 - Ok(sessions) 99 + if descending { 100 + ids.sort_by(|a, b| b.as_ref().cmp(a.as_ref())); 101 + } else { 102 + ids.sort_by(|a, b| a.as_ref().cmp(b.as_ref())); 103 + } 104 + 105 + Ok(ids) 106 + } 107 + 108 + pub fn list_sessions(&self, project_id: &str) -> Result<Vec<SessionId>> { 109 + let dir = self.paths.session_dir(project_id); 110 + self.list_ids_in_dir(&dir, SessionId::from_filename, true) 94 111 } 95 112 96 113 pub fn list_messages(&self, session_id: &SessionId) -> Result<Vec<MessageId>> { 97 114 let dir = self.paths.message_dir(session_id); 98 - if !dir.exists() { 99 - return Ok(Vec::new()); 100 - } 101 - 102 - let mut messages = Vec::new(); 103 - for entry in std::fs::read_dir(&dir)? { 104 - let entry = entry?; 105 - let path = entry.path(); 106 - if path.extension().map(|e| e == "json").unwrap_or(false) { 107 - if let Some(filename) = path.file_name().and_then(|n| n.to_str()) { 108 - if let Some(id) = MessageId::from_filename(filename) { 109 - messages.push(id); 110 - } 111 - } 112 - } 113 - } 114 - 115 - messages.sort_by(|a, b| a.as_str().cmp(b.as_str())); 116 - Ok(messages) 115 + self.list_ids_in_dir(&dir, MessageId::from_filename, false) 117 116 } 118 117 119 118 pub fn list_parts(&self, message_id: &MessageId) -> Result<Vec<PartId>> { 120 119 let dir = self.paths.part_dir(message_id); 121 - if !dir.exists() { 122 - return Ok(Vec::new()); 123 - } 124 - 125 - let mut parts = Vec::new(); 126 - for entry in std::fs::read_dir(&dir)? { 127 - let entry = entry?; 128 - let path = entry.path(); 129 - if path.extension().map(|e| e == "json").unwrap_or(false) { 130 - if let Some(filename) = path.file_name().and_then(|n| n.to_str()) { 131 - if let Some(id) = PartId::from_filename(filename) { 132 - parts.push(id); 133 - } 134 - } 135 - } 136 - } 137 - 138 - parts.sort_by(|a, b| a.as_str().cmp(b.as_str())); 139 - Ok(parts) 120 + self.list_ids_in_dir(&dir, PartId::from_filename, false) 140 121 } 141 122 } 142 123
+4 -4
src/types/mod.rs
··· 1 - pub mod session; 2 1 pub mod message; 3 2 pub mod part; 3 + pub mod session; 4 4 5 - pub use session::SessionInfo; 6 - pub use message::{Message, UserMessage, AssistantMessage}; 7 - pub use part::Part; 5 + pub use message::{AssistantMessage, FileDiff, Message, MessageError, UserMessage}; 6 + pub use part::{Part, PartBase}; 7 + pub use session::{SessionInfo, SessionShare, SessionSummary, SessionTime};
+25 -28
src/types/part.rs
··· 20 20 Retry(RetryPart), 21 21 } 22 22 23 + macro_rules! part_base_field { 24 + ($part:expr, $field:ident) => { 25 + match $part { 26 + Part::Text(p) => &p.base.$field, 27 + Part::Reasoning(p) => &p.base.$field, 28 + Part::Tool(p) => &p.base.$field, 29 + Part::File(p) => &p.base.$field, 30 + Part::StepStart(p) => &p.base.$field, 31 + Part::StepFinish(p) => &p.base.$field, 32 + Part::Snapshot(p) => &p.base.$field, 33 + Part::Patch(p) => &p.base.$field, 34 + Part::Agent(p) => &p.base.$field, 35 + Part::Subtask(p) => &p.base.$field, 36 + Part::Compaction(p) => &p.base.$field, 37 + Part::Retry(p) => &p.base.$field, 38 + } 39 + }; 40 + } 41 + 23 42 impl Part { 24 43 pub fn id(&self) -> &PartId { 25 - match self { 26 - Part::Text(p) => &p.base.id, 27 - Part::Reasoning(p) => &p.base.id, 28 - Part::Tool(p) => &p.base.id, 29 - Part::File(p) => &p.base.id, 30 - Part::StepStart(p) => &p.base.id, 31 - Part::StepFinish(p) => &p.base.id, 32 - Part::Snapshot(p) => &p.base.id, 33 - Part::Patch(p) => &p.base.id, 34 - Part::Agent(p) => &p.base.id, 35 - Part::Subtask(p) => &p.base.id, 36 - Part::Compaction(p) => &p.base.id, 37 - Part::Retry(p) => &p.base.id, 38 - } 44 + part_base_field!(self, id) 39 45 } 40 46 41 47 pub fn message_id(&self) -> &MessageId { 42 - match self { 43 - Part::Text(p) => &p.base.message_id, 44 - Part::Reasoning(p) => &p.base.message_id, 45 - Part::Tool(p) => &p.base.message_id, 46 - Part::File(p) => &p.base.message_id, 47 - Part::StepStart(p) => &p.base.message_id, 48 - Part::StepFinish(p) => &p.base.message_id, 49 - Part::Snapshot(p) => &p.base.message_id, 50 - Part::Patch(p) => &p.base.message_id, 51 - Part::Agent(p) => &p.base.message_id, 52 - Part::Subtask(p) => &p.base.message_id, 53 - Part::Compaction(p) => &p.base.message_id, 54 - Part::Retry(p) => &p.base.message_id, 55 - } 48 + part_base_field!(self, message_id) 49 + } 50 + 51 + pub fn session_id(&self) -> &SessionId { 52 + part_base_field!(self, session_id) 56 53 } 57 54 } 58 55
+16
src/watch.rs
··· 1 + //! Watch integration placeholders. 2 + //! 3 + //! The concrete watch backends are intentionally introduced behind feature 4 + //! flags while the core read/index API stabilizes. 5 + 6 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 7 + pub enum WatchBackend { 8 + Watchman, 9 + Notify, 10 + Manual, 11 + } 12 + 13 + #[derive(Debug, Clone)] 14 + pub enum SessionEvent { 15 + FullRebuild, 16 + }