jj workspaces over the network
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(cli): add jjf daemon - background sync process

+327
+327
crates/tandem-cli/src/daemon.rs
··· 1 + use std::path::PathBuf; 2 + use std::sync::Arc; 3 + use tokio::sync::{RwLock, mpsc}; 4 + use tandem_core::sync::ForgeDoc; 5 + use crate::presence::PresenceManager; 6 + use crate::content::ContentManager; 7 + use crate::offline::{self, OperationQueue, QueuedOperation}; 8 + use crate::repo::JjRepo; 9 + use tokio_tungstenite::{connect_async, tungstenite::Message}; 10 + use futures_util::{SinkExt, StreamExt}; 11 + 12 + #[derive(Debug, thiserror::Error)] 13 + pub enum DaemonError { 14 + #[error("Not connected to forge")] 15 + NotConnected, 16 + #[error("Connection failed: {0}")] 17 + ConnectionFailed(String), 18 + #[error("Repo error: {0}")] 19 + Repo(#[from] crate::repo::RepoError), 20 + #[error("WebSocket error: {0}")] 21 + WebSocket(String), 22 + #[error("Sync error: {0}")] 23 + Sync(String), 24 + #[error("Offline error: {0}")] 25 + Offline(#[from] crate::offline::OfflineError), 26 + } 27 + 28 + #[derive(Debug)] 29 + pub enum DaemonCommand { 30 + SyncNow, 31 + UpdatePresence { change_id: String }, 32 + Shutdown, 33 + } 34 + 35 + #[derive(Debug, Clone)] 36 + pub enum DaemonEvent { 37 + Connected, 38 + Disconnected, 39 + SyncCompleted, 40 + PresenceWarning { change_id: String, user: String }, 41 + Error(String), 42 + } 43 + 44 + pub struct Daemon { 45 + repo_path: PathBuf, 46 + _forge_url: String, 47 + doc: Arc<RwLock<ForgeDoc>>, 48 + presence_manager: PresenceManager, 49 + content_manager: Option<ContentManager>, 50 + command_rx: mpsc::Receiver<DaemonCommand>, 51 + event_tx: mpsc::Sender<DaemonEvent>, 52 + is_connected: bool, 53 + operation_queue: OperationQueue, 54 + } 55 + 56 + impl Daemon { 57 + pub fn new( 58 + repo_path: PathBuf, 59 + forge_url: String, 60 + command_rx: mpsc::Receiver<DaemonCommand>, 61 + event_tx: mpsc::Sender<DaemonEvent>, 62 + ) -> Self { 63 + let doc = Arc::new(RwLock::new(ForgeDoc::new())); 64 + let user_id = whoami::username().unwrap_or_else(|_| "unknown".to_string()); 65 + let device = whoami::devicename().unwrap_or_else(|_| "unknown".to_string()); 66 + let presence_manager = PresenceManager::new(doc.clone(), user_id, device); 67 + let operation_queue = OperationQueue::load(&repo_path).unwrap_or_default(); 68 + 69 + Self { 70 + repo_path, 71 + _forge_url: forge_url, 72 + doc, 73 + presence_manager, 74 + content_manager: None, 75 + command_rx, 76 + event_tx, 77 + is_connected: false, 78 + operation_queue, 79 + } 80 + } 81 + 82 + pub async fn run(&mut self) -> Result<(), DaemonError> { 83 + // Get repo ID from config 84 + let repo = JjRepo::open(&self.repo_path)?; 85 + let config = repo.forge_config()? 86 + .ok_or_else(|| DaemonError::ConnectionFailed("No forge config".to_string()))?; 87 + 88 + // Extract repo ID from URL (last path segment) 89 + let repo_id = config.forge.url 90 + .rsplit('/') 91 + .next() 92 + .unwrap_or("unknown") 93 + .to_string(); 94 + 95 + // Initialize content manager with repo_id 96 + self.content_manager = Some(ContentManager::new( 97 + self.doc.clone(), 98 + config.forge.url.clone(), 99 + repo_id.clone() 100 + )); 101 + 102 + // Build WebSocket URL 103 + let ws_url = format!("{}/sync/{}", 104 + config.forge.url.replace("https://", "wss://").replace("http://", "ws://"), 105 + repo_id 106 + ); 107 + 108 + tracing::info!("Connecting to forge: {}", ws_url); 109 + 110 + self.preload_content_on_startup().await; 111 + 112 + // Connect with retry loop 113 + loop { 114 + match self.connect_and_sync(&ws_url).await { 115 + Ok(()) => { 116 + tracing::info!("Disconnected from forge, reconnecting..."); 117 + } 118 + Err(e) => { 119 + tracing::error!("Connection error: {}, retrying in 5s...", e); 120 + let _ = self.on_disconnected().await; 121 + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; 122 + } 123 + } 124 + 125 + // Check for shutdown 126 + if let Ok(cmd) = self.command_rx.try_recv() { 127 + if matches!(cmd, DaemonCommand::Shutdown) { 128 + tracing::info!("Daemon shutting down"); 129 + self.presence_manager.clear_presence().await; 130 + break; 131 + } 132 + } 133 + } 134 + 135 + Ok(()) 136 + } 137 + 138 + pub fn is_connected(&self) -> bool { 139 + self.is_connected 140 + } 141 + 142 + async fn connect_and_sync(&mut self, ws_url: &str) -> Result<(), DaemonError> { 143 + let (ws_stream, _) = connect_async(ws_url).await 144 + .map_err(|e| DaemonError::ConnectionFailed(e.to_string()))?; 145 + 146 + let (mut write, mut read) = ws_stream.split(); 147 + 148 + // Mark as connected 149 + self.on_connected().await?; 150 + 151 + // Send our state vector to request initial sync 152 + { 153 + let doc = self.doc.read().await; 154 + let sv = doc.encode_state_vector(); 155 + write.send(Message::Binary(sv.into())).await 156 + .map_err(|e| DaemonError::WebSocket(e.to_string()))?; 157 + } 158 + 159 + loop { 160 + tokio::select! { 161 + // Handle commands from the application 162 + Some(cmd) = self.command_rx.recv() => { 163 + match cmd { 164 + DaemonCommand::Shutdown => { 165 + tracing::info!("Daemon shutting down"); 166 + return Ok(()); 167 + } 168 + DaemonCommand::SyncNow => { 169 + // Send current state 170 + let doc = self.doc.read().await; 171 + let sv = doc.encode_state_vector(); 172 + drop(doc); 173 + write.send(Message::Binary(sv.into())).await 174 + .map_err(|e| DaemonError::WebSocket(e.to_string()))?; 175 + } 176 + DaemonCommand::UpdatePresence { change_id } => { 177 + // Update presence in local doc 178 + if let Ok(cid) = change_id.parse() { 179 + self.presence_manager.update_presence(&cid).await; 180 + 181 + let conflicts = self.presence_manager.check_conflict(&cid).await; 182 + if !conflicts.is_empty() { 183 + for conflict in conflicts { 184 + let _ = self.event_tx.send(DaemonEvent::PresenceWarning { 185 + change_id: cid.to_string(), 186 + user: format!("{}@{}", conflict.user_id, conflict.device), 187 + }).await; 188 + } 189 + } 190 + } 191 + } 192 + } 193 + } 194 + 195 + // Handle messages from the server 196 + Some(msg) = read.next() => { 197 + match msg { 198 + Ok(Message::Binary(data)) => { 199 + // Apply update from server 200 + let doc = self.doc.read().await; 201 + if let Err(e) = doc.apply_update(&data) { 202 + tracing::warn!("Failed to apply update: {:?}", e); 203 + } 204 + } 205 + Ok(Message::Close(_)) => { 206 + tracing::info!("Server closed connection"); 207 + return Ok(()); 208 + } 209 + Ok(Message::Ping(data)) => { 210 + write.send(Message::Pong(data)).await 211 + .map_err(|e| DaemonError::WebSocket(e.to_string()))?; 212 + } 213 + Err(e) => { 214 + tracing::error!("WebSocket error: {}", e); 215 + return Err(DaemonError::WebSocket(e.to_string())); 216 + } 217 + _ => {} 218 + } 219 + } 220 + } 221 + } 222 + } 223 + 224 + /// Handle connection established 225 + async fn on_connected(&mut self) -> Result<(), DaemonError> { 226 + tracing::info!("Connected to forge"); 227 + self.is_connected = true; 228 + offline::set_offline(&self.repo_path, false)?; 229 + 230 + let _ = self.event_tx.send(DaemonEvent::Connected).await; 231 + 232 + // Replay any queued operations 233 + self.replay_queued_operations().await?; 234 + 235 + Ok(()) 236 + } 237 + 238 + /// Handle connection lost 239 + async fn on_disconnected(&mut self) -> Result<(), DaemonError> { 240 + tracing::warn!("Disconnected from forge"); 241 + self.is_connected = false; 242 + offline::set_offline(&self.repo_path, true)?; 243 + 244 + let _ = self.event_tx.send(DaemonEvent::Disconnected).await; 245 + 246 + Ok(()) 247 + } 248 + 249 + /// Queue an operation for offline replay 250 + fn queue_operation(&mut self, op: QueuedOperation) -> Result<(), DaemonError> { 251 + self.operation_queue.enqueue(op); 252 + self.operation_queue.save(&self.repo_path)?; 253 + Ok(()) 254 + } 255 + 256 + /// Replay all queued operations to the forge 257 + async fn replay_queued_operations(&mut self) -> Result<(), DaemonError> { 258 + if self.operation_queue.is_empty() { 259 + return Ok(()); 260 + } 261 + 262 + tracing::info!("Replaying {} queued operations", self.operation_queue.len()); 263 + 264 + let doc = self.doc.read().await; 265 + let count = offline::replay_queue(&self.repo_path, &doc).await?; 266 + drop(doc); 267 + 268 + // Reload queue (should be empty now) 269 + self.operation_queue = OperationQueue::load(&self.repo_path)?; 270 + 271 + tracing::info!("Replayed {} operations", count); 272 + Ok(()) 273 + } 274 + 275 + async fn preload_content_on_startup(&self) { 276 + if let Some(content_manager) = &self.content_manager { 277 + if let Ok(count_loaded) = content_manager.preload_bookmarks().await { 278 + tracing::info!("Preloaded {} bookmarked changes", count_loaded); 279 + } 280 + 281 + if let Ok(count_loaded) = content_manager.preload_recent(10).await { 282 + tracing::info!("Preloaded {} recent changes", count_loaded); 283 + } 284 + } 285 + } 286 + } 287 + 288 + #[derive(Clone)] 289 + pub struct DaemonHandle { 290 + command_tx: mpsc::Sender<DaemonCommand>, 291 + _event_rx: Arc<RwLock<mpsc::Receiver<DaemonEvent>>>, 292 + } 293 + 294 + impl DaemonHandle { 295 + pub async fn sync_now(&self) -> Result<(), DaemonError> { 296 + self.command_tx.send(DaemonCommand::SyncNow).await 297 + .map_err(|_| DaemonError::NotConnected) 298 + } 299 + 300 + pub async fn update_presence(&self, change_id: String) -> Result<(), DaemonError> { 301 + self.command_tx.send(DaemonCommand::UpdatePresence { change_id }).await 302 + .map_err(|_| DaemonError::NotConnected) 303 + } 304 + 305 + pub async fn shutdown(&self) -> Result<(), DaemonError> { 306 + self.command_tx.send(DaemonCommand::Shutdown).await 307 + .map_err(|_| DaemonError::NotConnected) 308 + } 309 + } 310 + 311 + pub fn spawn_daemon(repo_path: PathBuf, forge_url: String) -> DaemonHandle { 312 + let (command_tx, command_rx) = mpsc::channel(32); 313 + let (event_tx, event_rx) = mpsc::channel(32); 314 + 315 + let mut daemon = Daemon::new(repo_path, forge_url, command_rx, event_tx); 316 + 317 + tokio::spawn(async move { 318 + if let Err(e) = daemon.run().await { 319 + tracing::error!("Daemon error: {}", e); 320 + } 321 + }); 322 + 323 + DaemonHandle { 324 + command_tx, 325 + _event_rx: Arc::new(RwLock::new(event_rx)), 326 + } 327 + }