Restore a rocks database from object storage
1use std::path::PathBuf;
2use std::sync::Arc;
3
4use clap::{Parser, Subcommand};
5use object_store::{ObjectStore, aws::AmazonS3Builder};
6use tracing::warn;
7
8#[derive(Debug, thiserror::Error)]
9enum CliError {
10 #[error("failed to initialize S3 store for {endpoint:?}")]
11 StoreInit {
12 endpoint: String,
13 #[source]
14 source: object_store::Error,
15 },
16
17 #[error("failed to list backups at prefix {prefix:?}")]
18 List {
19 prefix: String,
20 #[source]
21 source: eat_rocks::Error,
22 },
23
24 #[error("restore to {} failed", target.display())]
25 Restore {
26 target: PathBuf,
27 #[source]
28 source: eat_rocks::Error,
29 },
30}
31
32#[derive(Parser)]
33#[command(version, about)]
34struct Cli {
35 /// s3-compatible endpoint url
36 ///
37 /// for subdomain style (aka virtual-hosted) like tigris, include the bucket
38 /// in the hostname, like `--endpoint https://constellation.t3.storage.dev`.
39 #[arg(long)]
40 endpoint: String,
41
42 /// bucket name
43 ///
44 /// only for path-style buckets (minio, localstack,). see `--endpoint`.
45 #[arg(long)]
46 bucket: Option<String>,
47
48 /// Prefix within the bucket (path to backup root)
49 #[arg(long, default_value = "")]
50 prefix: String,
51
52 /// access key ID, omit for public access
53 #[arg(long, env = "AWS_ACCESS_KEY_ID", requires = "secret_access_key")]
54 access_key_id: Option<String>,
55
56 /// secret access key, omit for public access
57 #[arg(long, env = "AWS_SECRET_ACCESS_KEY", requires = "access_key_id")]
58 secret_access_key: Option<String>,
59
60 #[command(subcommand)]
61 command: Command,
62}
63
64#[derive(Subcommand)]
65enum Command {
66 /// show available backups
67 List,
68 /// restore a backup to a local directory
69 Restore {
70 /// backup to restore
71 ///
72 /// default: latest. use `list` to see available backups.
73 #[arg(long)]
74 backup_id: Option<u64>,
75
76 /// WAL directory (default: same as target)
77 #[arg(long)]
78 wal_dir: Option<PathBuf>,
79
80 /// max concurrent actions against object storage
81 #[arg(long, default_value_t = eat_rocks::DEFAULT_CONCURRENCY)]
82 concurrency: usize,
83
84 /// skip verifying crc32c checksums
85 ///
86 /// these are almost free (they're computed as the file is streamed),
87 /// and the restore should typically be i/o-bound, so i'm not sure when/
88 /// why turning this off would be useful.
89 #[arg(long)]
90 no_verify: bool,
91
92 /// target directory for restored database
93 target: PathBuf,
94 },
95}
96
97impl Cli {
98 fn build_store(&self) -> Result<Arc<dyn ObjectStore>, Box<CliError>> {
99 // if `--bucket` is passed, then we get path-style buckets in the URL.
100 // if not, the caller is responsible for putting the bucket in the endpoint
101 // url, but we still need to set it, hence the `_` placeholder.
102 let bucket = self.bucket.as_deref().unwrap_or("_");
103
104 let mut builder = AmazonS3Builder::new()
105 .with_endpoint(&self.endpoint)
106 .with_bucket_name(bucket)
107 .with_allow_http(true)
108 .with_virtual_hosted_style_request(self.bucket.is_none());
109
110 builder = match (&self.access_key_id, &self.secret_access_key) {
111 (Some(key_id), Some(secret)) => builder
112 .with_access_key_id(key_id)
113 .with_secret_access_key(secret),
114 (None, None) => builder.with_skip_signature(true),
115 _ => unreachable!("clap `requires` ensures both or neither are present"),
116 };
117
118 let store = builder.build().map_err(|source| CliError::StoreInit {
119 endpoint: self.endpoint.clone(),
120 source,
121 })?;
122 Ok(Arc::new(store))
123 }
124}
125
126// ---------------------------------------------------------------------------
127
128#[tokio::main]
129async fn main() {
130 tracing_subscriber::fmt()
131 .with_env_filter(
132 tracing_subscriber::EnvFilter::try_from_default_env()
133 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
134 )
135 .with_target(false)
136 .init();
137
138 let cli = Cli::parse();
139 let result = match &cli.command {
140 Command::List => list(&cli).await,
141 Command::Restore { .. } => cmd_restore(&cli).await,
142 };
143
144 if let Err(e) = result {
145 eprintln!("error: {e}");
146 let mut err: &dyn std::error::Error = &e;
147 while let Some(source) = err.source() {
148 eprintln!(" caused by: {source}");
149 err = source;
150 }
151 std::process::exit(1);
152 }
153}
154
155async fn list(cli: &Cli) -> Result<(), Box<CliError>> {
156 let store = cli.build_store()?;
157 let prefix = &cli.prefix;
158
159 let ids = eat_rocks::list_backup_ids(&store, prefix)
160 .await
161 .map_err(|source| CliError::List {
162 prefix: prefix.clone(),
163 source,
164 })?;
165
166 if ids.is_empty() {
167 warn!("no backups found");
168 return Ok(());
169 }
170
171 for id in &ids {
172 match eat_rocks::fetch_meta(&store, prefix, *id).await {
173 Ok(meta) => {
174 println!(
175 "backup {id:>4} | seq {:>12} | ts {} | {} files",
176 meta.sequence_number,
177 meta.timestamp,
178 meta.files.len(),
179 );
180 }
181 Err(e) => warn!(backup_id = id, error = %e, "failed to read backup metadata"),
182 }
183 }
184
185 Ok(())
186}
187
188async fn cmd_restore(cli: &Cli) -> Result<(), Box<CliError>> {
189 let Command::Restore {
190 backup_id,
191 target,
192 wal_dir,
193 concurrency,
194 no_verify,
195 } = &cli.command
196 else {
197 unreachable!()
198 };
199
200 let store = cli.build_store()?;
201
202 eat_rocks::restore(
203 store,
204 &cli.prefix,
205 target,
206 eat_rocks::RestoreOptions {
207 backup_id: *backup_id,
208 concurrency: *concurrency,
209 verify: !no_verify,
210 wal_dir: wal_dir.clone(),
211 },
212 )
213 .await
214 .map_err(|source| CliError::Restore {
215 target: target.clone(),
216 source,
217 })?;
218
219 Ok(())
220}