jj workspaces over the network
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(cli): add lazy content fetching on demand

+166
+166
crates/tandem-cli/src/content.rs
··· 1 + use std::collections::HashSet; 2 + use tandem_core::sync::ForgeDoc; 3 + use tokio::sync::RwLock; 4 + use std::sync::Arc; 5 + 6 + #[derive(Debug, thiserror::Error)] 7 + pub enum ContentError { 8 + #[error("Content not available: {0}")] 9 + NotAvailable(String), 10 + #[error("Fetch failed: {0}")] 11 + FetchFailed(String), 12 + #[error("Network error: {0}")] 13 + Network(String), 14 + } 15 + 16 + /// Manages lazy content fetching 17 + pub struct ContentManager { 18 + doc: Arc<RwLock<ForgeDoc>>, 19 + forge_url: String, 20 + repo_id: String, 21 + /// Hashes we're currently fetching to avoid duplicate requests 22 + pending: RwLock<HashSet<String>>, 23 + } 24 + 25 + impl ContentManager { 26 + pub fn new(doc: Arc<RwLock<ForgeDoc>>, forge_url: String, repo_id: String) -> Self { 27 + Self { 28 + doc, 29 + forge_url, 30 + repo_id, 31 + pending: RwLock::new(HashSet::new()), 32 + } 33 + } 34 + 35 + /// Check if content is available locally 36 + pub async fn has_content(&self, hash: &str) -> bool { 37 + let doc = self.doc.read().await; 38 + doc.has_content(hash) 39 + } 40 + 41 + /// Get content, fetching from forge if needed 42 + pub async fn get_content(&self, hash: &str) -> Result<Vec<u8>, ContentError> { 43 + { 44 + let doc = self.doc.read().await; 45 + if let Some(content) = doc.get_content(hash) { 46 + return Ok(content); 47 + } 48 + } 49 + 50 + self.fetch_content(hash).await 51 + } 52 + 53 + /// Fetch content from forge 54 + async fn fetch_content(&self, hash: &str) -> Result<Vec<u8>, ContentError> { 55 + { 56 + let mut pending = self.pending.write().await; 57 + if pending.contains(hash) { 58 + return Err(ContentError::NotAvailable("Fetch in progress".to_string())); 59 + } 60 + pending.insert(hash.to_string()); 61 + } 62 + 63 + let _cleanup = PendingCleanup { 64 + pending: &self.pending, 65 + hash: hash.to_string(), 66 + }; 67 + 68 + let client = reqwest::Client::new(); 69 + let url = format!("{}/api/repos/{}/content/{}", 70 + self.forge_url, 71 + self.repo_id, 72 + hash 73 + ); 74 + 75 + let response = client.get(&url) 76 + .send() 77 + .await 78 + .map_err(|e| ContentError::Network(e.to_string()))?; 79 + 80 + if !response.status().is_success() { 81 + return Err(ContentError::FetchFailed( 82 + format!("HTTP {}", response.status()) 83 + )); 84 + } 85 + 86 + let content = response.bytes().await 87 + .map_err(|e| ContentError::Network(e.to_string()))?; 88 + 89 + { 90 + let doc = self.doc.read().await; 91 + doc.put_content(hash, content.to_vec()); 92 + } 93 + 94 + Ok(content.to_vec()) 95 + } 96 + 97 + /// Preload content for a set of hashes 98 + pub async fn preload(&self, hashes: &[String]) -> Vec<String> { 99 + let mut failed = Vec::new(); 100 + 101 + for hash in hashes { 102 + if !self.has_content(hash).await { 103 + if self.fetch_content(hash).await.is_err() { 104 + failed.push(hash.clone()); 105 + } 106 + } 107 + } 108 + 109 + failed 110 + } 111 + 112 + /// Preload content for recent changes 113 + pub async fn preload_recent(&self, count_limit: usize) -> Result<usize, ContentError> { 114 + let doc = self.doc.read().await; 115 + let records = doc.get_all_change_records(); 116 + 117 + let mut records: Vec<_> = records.into_iter() 118 + .filter(|r| r.visible) 119 + .collect(); 120 + records.sort_by(|a, b| b.timestamp.cmp(&a.timestamp)); 121 + 122 + let hashes: Vec<String> = records 123 + .into_iter() 124 + .take(count_limit) 125 + .map(|r| r.tree.to_string()) 126 + .collect(); 127 + 128 + drop(doc); 129 + 130 + let failed = self.preload(&hashes).await; 131 + Ok(hashes.len() - failed.len()) 132 + } 133 + 134 + /// Preload content for bookmarked changes 135 + pub async fn preload_bookmarks(&self) -> Result<usize, ContentError> { 136 + let doc = self.doc.read().await; 137 + let bookmarks = doc.get_all_bookmarks(); 138 + 139 + let mut hashes = Vec::new(); 140 + for (_name, change_id) in bookmarks { 141 + let records = doc.get_change_records(&change_id); 142 + if let Some(record) = records.first() { 143 + hashes.push(record.tree.to_string()); 144 + } 145 + } 146 + 147 + drop(doc); 148 + 149 + let failed = self.preload(&hashes).await; 150 + Ok(hashes.len() - failed.len()) 151 + } 152 + } 153 + 154 + /// Helper to clean up pending set 155 + struct PendingCleanup<'a> { 156 + pending: &'a RwLock<HashSet<String>>, 157 + hash: String, 158 + } 159 + 160 + impl Drop for PendingCleanup<'_> { 161 + fn drop(&mut self) { 162 + if let Ok(mut pending) = self.pending.try_write() { 163 + pending.remove(&self.hash); 164 + } 165 + } 166 + }