jj workspaces over the network
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(cli): add jjf clone command - clone from forge

+175
+175
crates/tandem-cli/src/clone.rs
··· 1 + use std::path::{Path, PathBuf}; 2 + use crate::link::LinkError; 3 + use tokio_tungstenite::{connect_async, tungstenite::Message}; 4 + use futures_util::{SinkExt, StreamExt}; 5 + use tandem_core::sync::ForgeDoc; 6 + use yrs::{Transact, StateVector, ReadTxn}; 7 + 8 + #[derive(Debug, thiserror::Error)] 9 + pub enum CloneError { 10 + #[error("Directory already exists: {0}")] 11 + DirectoryExists(PathBuf), 12 + #[error("Failed to create directory: {0}")] 13 + CreateDir(#[from] std::io::Error), 14 + #[error("Failed to initialize jj: {0}")] 15 + JjInit(String), 16 + #[error("Link error: {0}")] 17 + Link(#[from] LinkError), 18 + #[error("Sync error: {0}")] 19 + Sync(String), 20 + #[error("HTTP error: {0}")] 21 + Http(String), 22 + } 23 + 24 + /// Clone a repository from forge 25 + pub async fn clone_repo( 26 + forge_url: &str, 27 + target_dir: Option<&Path>, 28 + token: Option<&str>, 29 + ) -> Result<PathBuf, CloneError> { 30 + // Parse repo name from URL 31 + let repo_name = parse_repo_name(forge_url)?; 32 + 33 + // Determine target directory 34 + let target = match target_dir { 35 + Some(p) => p.to_path_buf(), 36 + None => std::env::current_dir()?.join(&repo_name), 37 + }; 38 + 39 + // Check if directory exists 40 + if target.exists() { 41 + return Err(CloneError::DirectoryExists(target)); 42 + } 43 + 44 + println!("Cloning into '{}'...", target.display()); 45 + 46 + // Create directory 47 + std::fs::create_dir_all(&target)?; 48 + 49 + // Initialize jj repo 50 + init_jj_repo(&target)?; 51 + 52 + // Link to forge 53 + crate::link::link_repo(&target, forge_url, token).await?; 54 + 55 + // Pull initial state 56 + pull_initial_state(&target, forge_url, token).await?; 57 + 58 + println!("✓ Cloned repository to {}", target.display()); 59 + 60 + Ok(target) 61 + } 62 + 63 + /// Parse repository name from forge URL 64 + fn parse_repo_name(url: &str) -> Result<String, CloneError> { 65 + // URL format: https://forge.example.com/org/repo 66 + let url = url.trim_end_matches('/'); 67 + let name = url.rsplit('/').next() 68 + .ok_or_else(|| CloneError::Sync("Invalid URL format".to_string()))?; 69 + Ok(name.to_string()) 70 + } 71 + 72 + /// Initialize a new jj repository 73 + fn init_jj_repo(path: &Path) -> Result<(), CloneError> { 74 + let output = std::process::Command::new("jj") 75 + .arg("init") 76 + .current_dir(path) 77 + .output() 78 + .map_err(|e| CloneError::JjInit(e.to_string()))?; 79 + 80 + if !output.status.success() { 81 + let stderr = String::from_utf8_lossy(&output.stderr); 82 + return Err(CloneError::JjInit(stderr.to_string())); 83 + } 84 + 85 + Ok(()) 86 + } 87 + 88 + /// Pull initial state from forge 89 + async fn pull_initial_state( 90 + path: &Path, 91 + forge_url: &str, 92 + _token: Option<&str>, 93 + ) -> Result<(), CloneError> { 94 + println!(" Syncing initial state..."); 95 + 96 + // Extract repo ID from URL 97 + let repo_id = forge_url 98 + .trim_end_matches('/') 99 + .rsplit('/') 100 + .next() 101 + .ok_or_else(|| CloneError::Sync("Invalid URL".to_string()))?; 102 + 103 + // Build WebSocket URL 104 + let ws_url = format!("{}/sync/{}", 105 + forge_url.replace("https://", "wss://").replace("http://", "ws://"), 106 + repo_id 107 + ); 108 + 109 + // Connect to forge 110 + let (ws_stream, _) = connect_async(&ws_url).await 111 + .map_err(|e| CloneError::Sync(format!("Connection failed: {}", e)))?; 112 + 113 + let (mut write, mut read) = ws_stream.split(); 114 + 115 + // Create empty ForgeDoc 116 + let doc = ForgeDoc::new(); 117 + 118 + // Send empty state vector to get full state 119 + let sv = doc.encode_state_vector(); 120 + write.send(Message::Binary(sv.into())).await 121 + .map_err(|e| CloneError::Sync(format!("Send failed: {}", e)))?; 122 + 123 + // Receive initial state 124 + let mut received_update = false; 125 + let timeout = tokio::time::timeout( 126 + tokio::time::Duration::from_secs(30), 127 + async { 128 + while let Some(msg) = read.next().await { 129 + match msg { 130 + Ok(Message::Binary(data)) => { 131 + doc.apply_update(&data) 132 + .map_err(|e| CloneError::Sync(format!("Apply failed: {:?}", e)))?; 133 + received_update = true; 134 + 135 + // After receiving update, we have the initial state 136 + // In a more sophisticated impl, we'd wait for multiple updates 137 + break; 138 + } 139 + Ok(Message::Close(_)) => break, 140 + Err(e) => return Err(CloneError::Sync(format!("Receive error: {}", e))), 141 + _ => continue, 142 + } 143 + } 144 + Ok::<(), CloneError>(()) 145 + } 146 + ).await; 147 + 148 + match timeout { 149 + Ok(Ok(())) if received_update => { 150 + println!(" ✓ Received {} changes, {} bookmarks", 151 + doc.get_all_change_records().len(), 152 + doc.get_all_bookmarks().len() 153 + ); 154 + } 155 + Ok(Ok(())) => { 156 + println!(" ⚠ No data received (empty repository?)"); 157 + } 158 + Ok(Err(e)) => return Err(e), 159 + Err(_) => { 160 + return Err(CloneError::Sync("Timeout waiting for initial state".to_string())); 161 + } 162 + } 163 + 164 + // Save the ForgeDoc state to a local file for the daemon to use 165 + let doc_path = path.join(".jj").join("forge-doc.bin"); 166 + let txn = doc.doc().transact(); 167 + let state = txn.encode_diff_v1(&StateVector::default()); 168 + std::fs::write(&doc_path, state) 169 + .map_err(|e| CloneError::Sync(format!("Failed to save state: {}", e)))?; 170 + 171 + // Close connection 172 + let _ = write.send(Message::Close(None)).await; 173 + 174 + Ok(()) 175 + }