jj workspaces over the network
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(server): add Y.Doc persistence and per-repo management

+131
+131
crates/tandem-server/src/docs.rs
··· 1 + use std::collections::HashMap; 2 + use std::path::{Path, PathBuf}; 3 + use std::sync::Arc; 4 + use tandem_core::sync::ForgeDoc; 5 + use tokio::sync::RwLock; 6 + use yrs::{ReadTxn, StateVector, Transact}; 7 + 8 + #[derive(Debug, thiserror::Error)] 9 + pub enum DocError { 10 + #[error("Repository not found: {0}")] 11 + NotFound(String), 12 + #[error("IO error: {0}")] 13 + Io(#[from] std::io::Error), 14 + #[error("Serialization error: {0}")] 15 + Serialization(String), 16 + } 17 + 18 + /// Manages Y.Doc instances for all repositories 19 + pub struct DocManager { 20 + /// Directory where doc files are stored 21 + data_dir: PathBuf, 22 + /// In-memory cache of loaded docs 23 + docs: RwLock<HashMap<String, Arc<RwLock<ForgeDoc>>>>, 24 + } 25 + 26 + impl DocManager { 27 + pub fn new(data_dir: impl AsRef<Path>) -> Self { 28 + Self { 29 + data_dir: data_dir.as_ref().to_path_buf(), 30 + docs: RwLock::new(HashMap::new()), 31 + } 32 + } 33 + 34 + /// Get or load a ForgeDoc for a repository 35 + pub async fn get_or_load(&self, repo_id: &str) -> Result<Arc<RwLock<ForgeDoc>>, DocError> { 36 + // Check cache first 37 + { 38 + let docs = self.docs.read().await; 39 + if let Some(doc) = docs.get(repo_id) { 40 + return Ok(Arc::clone(doc)); 41 + } 42 + } 43 + 44 + // Load from disk or create new 45 + let doc = self.load_or_create(repo_id).await?; 46 + let doc = Arc::new(RwLock::new(doc)); 47 + 48 + // Cache it 49 + { 50 + let mut docs = self.docs.write().await; 51 + docs.insert(repo_id.to_string(), Arc::clone(&doc)); 52 + } 53 + 54 + Ok(doc) 55 + } 56 + 57 + /// Create a new doc for a repository 58 + pub async fn create(&self, repo_id: &str) -> Result<Arc<RwLock<ForgeDoc>>, DocError> { 59 + let doc = ForgeDoc::new(); 60 + let doc = Arc::new(RwLock::new(doc)); 61 + 62 + // Cache it 63 + { 64 + let mut docs = self.docs.write().await; 65 + docs.insert(repo_id.to_string(), Arc::clone(&doc)); 66 + } 67 + 68 + // Save to disk 69 + self.save(repo_id).await?; 70 + 71 + Ok(doc) 72 + } 73 + 74 + /// Save a doc to disk 75 + pub async fn save(&self, repo_id: &str) -> Result<(), DocError> { 76 + let docs = self.docs.read().await; 77 + let doc = docs 78 + .get(repo_id) 79 + .ok_or_else(|| DocError::NotFound(repo_id.to_string()))?; 80 + 81 + let doc = doc.read().await; 82 + 83 + // Encode the full document state 84 + let state = { 85 + let txn = doc.doc().transact(); 86 + txn.encode_diff_v1(&StateVector::default()) 87 + }; 88 + 89 + // Write to file 90 + let path = self.doc_path(repo_id); 91 + if let Some(parent) = path.parent() { 92 + tokio::fs::create_dir_all(parent).await?; 93 + } 94 + tokio::fs::write(&path, state).await?; 95 + 96 + Ok(()) 97 + } 98 + 99 + /// Save all docs to disk 100 + pub async fn save_all(&self) -> Result<(), DocError> { 101 + let repo_ids: Vec<String> = { 102 + let docs = self.docs.read().await; 103 + docs.keys().cloned().collect() 104 + }; 105 + 106 + for repo_id in repo_ids { 107 + self.save(&repo_id).await?; 108 + } 109 + 110 + Ok(()) 111 + } 112 + 113 + /// Load doc from disk or create new 114 + async fn load_or_create(&self, repo_id: &str) -> Result<ForgeDoc, DocError> { 115 + let path = self.doc_path(repo_id); 116 + 117 + if path.exists() { 118 + let data = tokio::fs::read(&path).await?; 119 + let doc = ForgeDoc::new(); 120 + doc.apply_update(&data) 121 + .map_err(|e| DocError::Serialization(e.to_string()))?; 122 + Ok(doc) 123 + } else { 124 + Ok(ForgeDoc::new()) 125 + } 126 + } 127 + 128 + fn doc_path(&self, repo_id: &str) -> PathBuf { 129 + self.data_dir.join(format!("{}.yrs", repo_id)) 130 + } 131 + }