Restore a rocks database from object storage
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 220 lines 6.2 kB view raw
1use std::path::PathBuf; 2use std::sync::Arc; 3 4use clap::{Parser, Subcommand}; 5use object_store::{ObjectStore, aws::AmazonS3Builder}; 6use tracing::warn; 7 8#[derive(Debug, thiserror::Error)] 9enum CliError { 10 #[error("failed to initialize S3 store for {endpoint:?}")] 11 StoreInit { 12 endpoint: String, 13 #[source] 14 source: object_store::Error, 15 }, 16 17 #[error("failed to list backups at prefix {prefix:?}")] 18 List { 19 prefix: String, 20 #[source] 21 source: eat_rocks::Error, 22 }, 23 24 #[error("restore to {} failed", target.display())] 25 Restore { 26 target: PathBuf, 27 #[source] 28 source: eat_rocks::Error, 29 }, 30} 31 32#[derive(Parser)] 33#[command(version, about)] 34struct Cli { 35 /// s3-compatible endpoint url 36 /// 37 /// for subdomain style (aka virtual-hosted) like tigris, include the bucket 38 /// in the hostname, like `--endpoint https://constellation.t3.storage.dev`. 39 #[arg(long)] 40 endpoint: String, 41 42 /// bucket name 43 /// 44 /// only for path-style buckets (minio, localstack,). see `--endpoint`. 45 #[arg(long)] 46 bucket: Option<String>, 47 48 /// Prefix within the bucket (path to backup root) 49 #[arg(long, default_value = "")] 50 prefix: String, 51 52 /// access key ID, omit for public access 53 #[arg(long, env = "AWS_ACCESS_KEY_ID", requires = "secret_access_key")] 54 access_key_id: Option<String>, 55 56 /// secret access key, omit for public access 57 #[arg(long, env = "AWS_SECRET_ACCESS_KEY", requires = "access_key_id")] 58 secret_access_key: Option<String>, 59 60 #[command(subcommand)] 61 command: Command, 62} 63 64#[derive(Subcommand)] 65enum Command { 66 /// show available backups 67 List, 68 /// restore a backup to a local directory 69 Restore { 70 /// backup to restore 71 /// 72 /// default: latest. use `list` to see available backups. 73 #[arg(long)] 74 backup_id: Option<u64>, 75 76 /// WAL directory (default: same as target) 77 #[arg(long)] 78 wal_dir: Option<PathBuf>, 79 80 /// max concurrent actions against object storage 81 #[arg(long, default_value_t = eat_rocks::DEFAULT_CONCURRENCY)] 82 concurrency: usize, 83 84 /// skip verifying crc32c checksums 85 /// 86 /// these are almost free (they're computed as the file is streamed), 87 /// and the restore should typically be i/o-bound, so i'm not sure when/ 88 /// why turning this off would be useful. 89 #[arg(long)] 90 no_verify: bool, 91 92 /// target directory for restored database 93 target: PathBuf, 94 }, 95} 96 97impl Cli { 98 fn build_store(&self) -> Result<Arc<dyn ObjectStore>, Box<CliError>> { 99 // if `--bucket` is passed, then we get path-style buckets in the URL. 100 // if not, the caller is responsible for putting the bucket in the endpoint 101 // url, but we still need to set it, hence the `_` placeholder. 102 let bucket = self.bucket.as_deref().unwrap_or("_"); 103 104 let mut builder = AmazonS3Builder::new() 105 .with_endpoint(&self.endpoint) 106 .with_bucket_name(bucket) 107 .with_allow_http(true) 108 .with_virtual_hosted_style_request(self.bucket.is_none()); 109 110 builder = match (&self.access_key_id, &self.secret_access_key) { 111 (Some(key_id), Some(secret)) => builder 112 .with_access_key_id(key_id) 113 .with_secret_access_key(secret), 114 (None, None) => builder.with_skip_signature(true), 115 _ => unreachable!("clap `requires` ensures both or neither are present"), 116 }; 117 118 let store = builder.build().map_err(|source| CliError::StoreInit { 119 endpoint: self.endpoint.clone(), 120 source, 121 })?; 122 Ok(Arc::new(store)) 123 } 124} 125 126// --------------------------------------------------------------------------- 127 128#[tokio::main] 129async fn main() { 130 tracing_subscriber::fmt() 131 .with_env_filter( 132 tracing_subscriber::EnvFilter::try_from_default_env() 133 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), 134 ) 135 .with_target(false) 136 .init(); 137 138 let cli = Cli::parse(); 139 let result = match &cli.command { 140 Command::List => list(&cli).await, 141 Command::Restore { .. } => cmd_restore(&cli).await, 142 }; 143 144 if let Err(e) = result { 145 eprintln!("error: {e}"); 146 let mut err: &dyn std::error::Error = &e; 147 while let Some(source) = err.source() { 148 eprintln!(" caused by: {source}"); 149 err = source; 150 } 151 std::process::exit(1); 152 } 153} 154 155async fn list(cli: &Cli) -> Result<(), Box<CliError>> { 156 let store = cli.build_store()?; 157 let prefix = &cli.prefix; 158 159 let ids = eat_rocks::list_backup_ids(&store, prefix) 160 .await 161 .map_err(|source| CliError::List { 162 prefix: prefix.clone(), 163 source, 164 })?; 165 166 if ids.is_empty() { 167 warn!("no backups found"); 168 return Ok(()); 169 } 170 171 for id in &ids { 172 match eat_rocks::fetch_meta(&store, prefix, *id).await { 173 Ok(meta) => { 174 println!( 175 "backup {id:>4} | seq {:>12} | ts {} | {} files", 176 meta.sequence_number, 177 meta.timestamp, 178 meta.files.len(), 179 ); 180 } 181 Err(e) => warn!(backup_id = id, error = %e, "failed to read backup metadata"), 182 } 183 } 184 185 Ok(()) 186} 187 188async fn cmd_restore(cli: &Cli) -> Result<(), Box<CliError>> { 189 let Command::Restore { 190 backup_id, 191 target, 192 wal_dir, 193 concurrency, 194 no_verify, 195 } = &cli.command 196 else { 197 unreachable!() 198 }; 199 200 let store = cli.build_store()?; 201 202 eat_rocks::restore( 203 store, 204 &cli.prefix, 205 target, 206 eat_rocks::RestoreOptions { 207 backup_id: *backup_id, 208 concurrency: *concurrency, 209 verify: !no_verify, 210 wal_dir: wal_dir.clone(), 211 }, 212 ) 213 .await 214 .map_err(|source| CliError::Restore { 215 target: target.clone(), 216 source, 217 })?; 218 219 Ok(()) 220}