···11+ Apache License
22+ Version 2.0, January 2004
33+ http://www.apache.org/licenses/
44+55+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
66+77+1. Definitions.
88+99+ "License" shall mean the terms and conditions for use, reproduction,
1010+ and distribution as defined by Sections 1 through 9 of this document.
1111+1212+ "Licensor" shall mean the copyright owner or entity authorized by
1313+ the copyright owner that is granting the License.
1414+1515+ "Legal Entity" shall mean the union of the acting entity and all
1616+ other entities that control, are controlled by, or are under common
1717+ control with that entity. For the purposes of this definition,
1818+ "control" means (i) the power, direct or indirect, to cause the
1919+ direction or management of such entity, whether by contract or
2020+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
2121+ outstanding shares, or (iii) beneficial ownership of such entity.
2222+2323+ "You" (or "Your") shall mean an individual or Legal Entity
2424+ exercising permissions granted by this License.
2525+2626+ "Source" form shall mean the preferred form for making modifications,
2727+ including but not limited to software source code, documentation
2828+ source, and configuration files.
2929+3030+ "Object" form shall mean any form resulting from mechanical
3131+ transformation or translation of a Source form, including but
3232+ not limited to compiled object code, generated documentation,
3333+ and conversions to other media types.
3434+3535+ "Work" shall mean the work of authorship, whether in Source or
3636+ Object form, made available under the License, as indicated by a
3737+ copyright notice that is included in or attached to the work
3838+ (an example is provided in the Appendix below).
3939+4040+ "Derivative Works" shall mean any work, whether in Source or Object
4141+ form, that is based on (or derived from) the Work and for which the
4242+ editorial revisions, annotations, elaborations, or other modifications
4343+ represent, as a whole, an original work of authorship. For the purposes
4444+ of this License, Derivative Works shall not include works that remain
4545+ separable from, or merely link (or bind by name) to the interfaces of,
4646+ the Work and Derivative Works thereof.
4747+4848+ "Contribution" shall mean any work of authorship, including
4949+ the original version of the Work and any modifications or additions
5050+ to that Work or Derivative Works thereof, that is intentionally
5151+ submitted to Licensor for inclusion in the Work by the copyright owner
5252+ or by an individual or Legal Entity authorized to submit on behalf of
5353+ the copyright owner. For the purposes of this definition, "submitted"
5454+ means any form of electronic, verbal, or written communication sent
5555+ to the Licensor or its representatives, including but not limited to
5656+ communication on electronic mailing lists, source code control systems,
5757+ and issue tracking systems that are managed by, or on behalf of, the
5858+ Licensor for the purpose of discussing and improving the Work, but
5959+ excluding communication that is conspicuously marked or otherwise
6060+ designated in writing by the copyright owner as "Not a Contribution."
6161+6262+ "Contributor" shall mean Licensor and any individual or Legal Entity
6363+ on behalf of whom a Contribution has been received by Licensor and
6464+ subsequently incorporated within the Work.
6565+6666+2. Grant of Copyright License. Subject to the terms and conditions of
6767+ this License, each Contributor hereby grants to You a perpetual,
6868+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
6969+ copyright license to reproduce, prepare Derivative Works of,
7070+ publicly display, publicly perform, sublicense, and distribute the
7171+ Work and such Derivative Works in Source or Object form.
7272+7373+3. Grant of Patent License. Subject to the terms and conditions of
7474+ this License, each Contributor hereby grants to You a perpetual,
7575+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
7676+ (except as stated in this section) patent license to make, have made,
7777+ use, offer to sell, sell, import, and otherwise transfer the Work,
7878+ where such license applies only to those patent claims licensable
7979+ by such Contributor that are necessarily infringed by their
8080+ Contribution(s) alone or by combination of their Contribution(s)
8181+ with the Work to which such Contribution(s) was submitted. If You
8282+ institute patent litigation against any entity (including a
8383+ cross-claim or counterclaim in a lawsuit) alleging that the Work
8484+ or a Contribution incorporated within the Work constitutes direct
8585+ or contributory patent infringement, then any patent licenses
8686+ granted to You under this License for that Work shall terminate
8787+ as of the date such litigation is filed.
8888+8989+4. Redistribution. You may reproduce and distribute copies of the
9090+ Work or Derivative Works thereof in any medium, with or without
9191+ modifications, and in Source or Object form, provided that You
9292+ meet the following conditions:
9393+9494+ (a) You must give any other recipients of the Work or
9595+ Derivative Works a copy of this License; and
9696+9797+ (b) You must cause any modified files to carry prominent notices
9898+ stating that You changed the files; and
9999+100100+ (c) You must retain, in the Source form of any Derivative Works
101101+ that You distribute, all copyright, patent, trademark, and
102102+ attribution notices from the Source form of the Work,
103103+ excluding those notices that do not pertain to any part of
104104+ the Derivative Works; and
105105+106106+ (d) If the Work includes a "NOTICE" text file as part of its
107107+ distribution, then any Derivative Works that You distribute must
108108+ include a readable copy of the attribution notices contained
109109+ within such NOTICE file, excluding those notices that do not
110110+ pertain to any part of the Derivative Works, in at least one
111111+ of the following places: within a NOTICE text file distributed
112112+ as part of the Derivative Works; within the Source form or
113113+ documentation, if provided along with the Derivative Works; or,
114114+ within a display generated by the Derivative Works, if and
115115+ wherever such third-party notices normally appear. The contents
116116+ of the NOTICE file are for informational purposes only and
117117+ do not modify the License. You may add Your own attribution
118118+ notices within Derivative Works that You distribute, alongside
119119+ or as an addendum to the NOTICE text from the Work, provided
120120+ that such additional attribution notices cannot be construed
121121+ as modifying the License.
122122+123123+ You may add Your own copyright statement to Your modifications and
124124+ may provide additional or different license terms and conditions
125125+ for use, reproduction, or distribution of Your modifications, or
126126+ for any such Derivative Works as a whole, provided Your use,
127127+ reproduction, and distribution of the Work otherwise complies with
128128+ the conditions stated in this License.
129129+130130+5. Submission of Contributions. Unless You explicitly state otherwise,
131131+ any Contribution intentionally submitted for inclusion in the Work
132132+ by You to the Licensor shall be under the terms and conditions of
133133+ this License, without any additional terms or conditions.
134134+ Notwithstanding the above, nothing herein shall supersede or modify
135135+ the terms of any separate license agreement you may have executed
136136+ with Licensor regarding such Contributions.
137137+138138+6. Trademarks. This License does not grant permission to use the trade
139139+ names, trademarks, service marks, or product names of the Licensor,
140140+ except as required for reasonable and customary use in describing the
141141+ origin of the Work and reproducing the content of the NOTICE file.
142142+143143+7. Disclaimer of Warranty. Unless required by applicable law or
144144+ agreed to in writing, Licensor provides the Work (and each
145145+ Contributor provides its Contributions) on an "AS IS" BASIS,
146146+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
147147+ implied, including, without limitation, any warranties or conditions
148148+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
149149+ PARTICULAR PURPOSE. You are solely responsible for determining the
150150+ appropriateness of using or redistributing the Work and assume any
151151+ risks associated with Your exercise of permissions under this License.
152152+153153+8. Limitation of Liability. In no event and under no legal theory,
154154+ whether in tort (including negligence), contract, or otherwise,
155155+ unless required by applicable law (such as deliberate and grossly
156156+ negligent acts) or agreed to in writing, shall any Contributor be
157157+ liable to You for damages, including any direct, indirect, special,
158158+ incidental, or consequential damages of any character arising as a
159159+ result of this License or out of the use or inability to use the
160160+ Work (including but not limited to damages for loss of goodwill,
161161+ work stoppage, computer failure or malfunction, or any and all
162162+ other commercial damages or losses), even if such Contributor
163163+ has been advised of the possibility of such damages.
164164+165165+9. Accepting Warranty or Additional Liability. While redistributing
166166+ the Work or Derivative Works thereof, You may choose to offer,
167167+ and charge a fee for, acceptance of support, warranty, indemnity,
168168+ or other liability obligations and/or rights consistent with this
169169+ License. However, in accepting such obligations, You may act only
170170+ on Your own behalf and on Your sole responsibility, not on behalf
171171+ of any other Contributor, and only if You agree to indemnify,
172172+ defend, and hold each Contributor harmless for any liability
173173+ incurred by, or claims asserted against, such Contributor by reason
174174+ of your accepting any such warranty or additional liability.
175175+176176+END OF TERMS AND CONDITIONS
177177+178178+Copyright 2025 microcosm
179179+180180+Licensed under the Apache License, Version 2.0 (the "License");
181181+you may not use this file except in compliance with the License.
182182+You may obtain a copy of the License at
183183+184184+ http://www.apache.org/licenses/LICENSE-2.0
185185+186186+Unless required by applicable law or agreed to in writing, software
187187+distributed under the License is distributed on an "AS IS" BASIS,
188188+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
189189+See the License for the specific language governing permissions and
190190+limitations under the License.
+21
LICENSE.MIT
···11+MIT License
22+33+Copyright (c) 2025 microcosm
44+55+Permission is hereby granted, free of charge, to any person obtaining a copy
66+of this software and associated documentation files (the "Software"), to deal
77+in the Software without restriction, including without limitation the rights
88+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
99+copies of the Software, and to permit persons to whom the Software is
1010+furnished to do so, subject to the following conditions:
1111+1212+The above copyright notice and this permission notice shall be included in all
1313+copies or substantial portions of the Software.
1414+1515+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
1616+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
1717+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
1818+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
1919+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
2020+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
2121+SOFTWARE.
···11+#![no_main]
22+33+use libfuzzer_sys::fuzz_target;
44+55+fuzz_target!(|data: &str| {
66+ // parser must never panic (ok or err both fine)
77+ let _ = eat_rocks::BackupMeta::parse(data);
88+});
+99
hacking.md
···11+implementation, format stuff, etc.
22+33+in general when there's a question of how to handle something: try to match the rocksdb behaviour.
44+55+66+## backup files
77+88+see the rocksdb [wiki](https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB) and the authoritative rocks backup implementation: [`utilities/backup/backup_engine.cc`](https://github.com/facebook/rocksdb/blob/main/utilities/backup/backup_engine.cc).
99+1010+The format is line-oriented text. yay.
1111+1212+The incremental backup files are stored like this:
1313+1414+```
1515+meta/
1616+ 1 # meta file for backup 1
1717+ 2 # meta file for backup 2
1818+private/
1919+ 1/ # per-backup files (CURRENT, MANIFEST, OPTIONS, WALs)
2020+ 2/
2121+shared_checksum/
2222+ 000007_2894567812_590.sst # SSTs shared across backups, funny names
2323+```
2424+2525+The `meta` files contain everything you need to restore one backup: namely, a list of files to copy.
2626+It's all not too complicated.
2727+2828+Schema v2 added a `schema_version 2.N` on the first line but is otherwise backwards compatible with v1 (which starts with the timestamp line), so both are supported:
2929+3030+```
3131+[schema_version 2.n] # absent for v1
3232+<timestamp>
3333+<sequence_number>
3434+[metadata <hex>] # optional app metadata
3535+[<field> <value>]* # unknown fields, skippable unless ni:: prefixed
3636+<file_count>
3737+<path> [field value]* # many lines like this (<file_count> of them)
3838+```
3939+4040+File fields include `crc32` (actually crc32c), `size`, `temp`, and `ni::excluded`.
4141+Fields starting with `ni::` (non-ignorable) are meant to fail to parse unless they are specifically recognized by the parser. yay forward compat.
4242+4343+To avoid file name collisions, rocks puts `_<checksum>_<size>` suffixes on files in the `shared_checksum/` folder.
4444+These are just for uniqueness.
4545+During restore you pop them out and just write to `<name before underscore>.<ext>`, without any interpretation.
4646+4747+4848+### exclusion zone
4949+5050+rocks has fancy weird multi-backup advanced stuff where you can reference files living in one backup from another, to avoid redundant copies, and these get marked `excluded`.
5151+5252+these are outside the scope of `eat-rocks` and we error if any files in the meta are `excluded`, since we wouldn't know where to look for them.
5353+5454+5555+### get `CURRENT`
5656+5757+rocks itself uses a file called `CURRENT` as its entrypoint to the db.
5858+when restoring, we write all other files first, then atomically rename the new `CURRENT` into place, so a partial restore won't corrupt things. (just following rocks here)
5959+6060+6161+## with integrity
6262+6363+rocksdb (accidentally?) doesn't emit a `size` value in the meta for files, for... some reason.
6464+so despite implementing an unconditional validation check for it, it's really not actually checked.
6565+6666+we do reliably get the crc32c from rocks though, and `eat-rocks` will check it by default.
6767+passing `--no-verify` (or setting `RestoreOptions::verify` to false) disables the check.
6868+i'm not sure why you'd want to though -- unlike a restore from the local filesystem, downloading from object storage means the full contents get streamed through us, so checking the crc is basically free.
6969+7070+The meta file field is called `crc32` but the values are actually crc32c.
7171+grep for "WART" in rocksdb source about it. `crc32c` with `crc32c_append` for streaming works for us.
7272+7373+7474+## all together (concurrency)
7575+7676+the concurrency limit is applied in two places -- idk if this was a great idea but hey.
7777+`futures::stream::buffer_unordered` limits how many files we're asking `object_store` to work on at any given time, and `object_store::limit::LimitStore` wraps the actual `ObjectStore` backend to apply the same limit at a lower level.
7878+7979+8080+## plz don't ignore
8181+8282+non-ignorable fields (`ni::` prefix) cause hard failures in both header and file parsing if they're not recognized.
8383+Unknown ignorable fields are silently skipped.
8484+`ni::excluded` is the only recognized `ni::` field currently.
8585+8686+8787+## don't test me
8888+8989+unit tests go in the modules of things they test (normal rust)
9090+9191+`object_store::memory::InMemory` is great for stubbing out object storage space with arbitrary contents.
9292+9393+there are some neat end-to-end tests in `tests/e2e.rs` which hopefully validate the whole thing going on here, down to actually generating real backups from rocksdb (via the `rocksdb` rust crate) and restoring them with both our implementation and rocks'.
9494+9595+the meta file parser is pretty simple/small but hey why not fuzz it -- try:
9696+9797+```bash
9898+rustup run nightly cargo fuzz run parse_meta
9999+```
+80
reame.md
···11+# eat-rocks
22+33+[](https://docs.rs/eat-rocks)
44+55+restore a rocks backup from s3-compatible object storage
66+77+rocks has built-in backup/restore, but its restore function expects a local filesystem.
88+bridging object storage to a filesystem *works*, but it's really annoying.
99+1010+eat-rocks talks to object storage directly (and with high default concurrency) so it can be pretty fast at getting your database back.
1111+1212+1313+## cli
1414+1515+```bash
1616+# restore latest from a public bucket (subdomain style)
1717+eat-rocks --endpoint https://constellation.t3.storage.dev restore /data/rocksdb
1818+1919+# list available backups
2020+eat-rocks --endpoint https://constellation.t3.storage.dev list
2121+2222+# restore a specific backup
2323+eat-rocks --endpoint https://constellation.t3.storage.dev restore \
2424+ --backup-id 3 /data/rocksdb
2525+2626+# authenticated access (or set AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY)
2727+eat-rocks \
2828+ --endpoint https://constellation.t3.storage.dev \
2929+ --access-key-id AKIA... \
3030+ --secret-access-key wJal... \
3131+ restore /data/rocksdb
3232+3333+# path-style (minio, localstack)
3434+eat-rocks \
3535+ --endpoint http://localhost:9000 \
3636+ --bucket mybucket \
3737+ restore /data/rocksdb
3838+3939+# limit concurrency (poor connection, etc)
4040+eat-rocks --endpoint https://constellation.t3.storage.dev \
4141+ restore --concurrency 8 /data/rocksdb
4242+```
4343+4444+## lib
4545+4646+```rust
4747+use eat_rocks::{public_bucket, restore};
4848+4949+let store = public_bucket("https://constellation.t3.storage.dev")?;
5050+restore(store, "", "/data/rocksdb".as_ref(), Default::default()).await?;
5151+```
5252+5353+or bring your own [`ObjectStore`](https://docs.rs/object_store/latest/object_store/) implementation (S3, GCS, Azure, local filesystem, ...):
5454+5555+```rust
5656+let store: Arc<dyn ObjectStore> = /* store: all you */;
5757+eat_rocks::restore(store, "", "/data/rocksdb".as_ref(), Default::default()).await?;
5858+```
5959+6060+6161+## featureful
6262+6363+- `cli`: enable deps to build the binary
6464+- `easy` (default): `public_bucket()` convenience function with `aws` store backend
6565+6666+6767+### cli build
6868+6969+the `cli` feature flag is required to build the cli
7070+7171+```bash
7272+cargo build --release --features cli
7373+```
7474+7575+7676+## license
7777+7878+Dual-licensed under MIT and Apache 2.0.
7979+8080+`SPDX-License-Identifier: MIT OR Apache-2.0`
+138
src/lib.rs
···11+//! Restore a [rocks backup](https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB)
22+//! from object storage
33+//!
44+//! ```rust,no_run
55+//! use eat_rocks::{public_bucket, restore};
66+//! # async fn example() -> Result<(), eat_rocks::Error> {
77+//! let store = public_bucket("https://constellation.t3.storage.dev")?;
88+//! restore(store, "", "/tmp/constellation-db", Default::default()).await?;
99+//! # Ok(())
1010+//! # }
1111+//! ```
1212+//!
1313+//! the `public_bucket` function (`easy` feature, enabled by default) works with
1414+//! s3-compatible stores (like tigris) which are configured for public read access.
1515+//!
1616+//! you can use any [`ObjectStore`](object_store::ObjectStore) to restore from pretty much any backend (S3, GCS, Azure, local filesystem, ...)
1717+1818+pub mod meta;
1919+pub mod restore;
2020+2121+pub use meta::{BackupFile, BackupMeta};
2222+pub use restore::{DEFAULT_CONCURRENCY, RestoreOptions, fetch_meta, list_backup_ids, restore};
2323+2424+use std::io;
2525+use std::path::PathBuf;
2626+use std::string::FromUtf8Error;
2727+2828+use object_store::path::Path as StorePath;
2929+3030+/// Build an unsigned s3-compatible object store
3131+///
3232+/// `endpoint` should include the bucket as a subdomain for virtual-hosted style:
3333+///
3434+/// ```text
3535+/// https://constellation.t3.storage.dev
3636+/// ```
3737+///
3838+/// you can construct your own [`object_store::ObjectStore`] implementation directly for more control.
3939+#[cfg(feature = "easy")]
4040+pub fn public_bucket(
4141+ endpoint: &str,
4242+) -> Result<std::sync::Arc<dyn object_store::ObjectStore>, Error> {
4343+ use object_store::aws::AmazonS3Builder;
4444+ use std::sync::Arc;
4545+4646+ let store = AmazonS3Builder::new()
4747+ .with_endpoint(endpoint)
4848+ .with_bucket_name("_")
4949+ .with_skip_signature(true)
5050+ .with_allow_http(true)
5151+ .with_virtual_hosted_style_request(true)
5252+ .build()
5353+ .map_err(|e| Error::ObjectStoreBuild {
5454+ endpoint: endpoint.to_string(),
5555+ source: e,
5656+ })?;
5757+5858+ Ok(Arc::new(store))
5959+}
6060+6161+/// Errors that can occur during backup discovery, metadata parsing, or restore.
6262+#[derive(Debug, thiserror::Error)]
6363+pub enum Error {
6464+ #[cfg(feature = "easy")]
6565+ #[error("failed to build object store")]
6666+ ObjectStoreBuild {
6767+ endpoint: String,
6868+ #[source]
6969+ source: object_store::Error,
7070+ },
7171+7272+ #[error("failed to list objects under {prefix:?}")]
7373+ List {
7474+ prefix: StorePath,
7575+ #[source]
7676+ source: object_store::Error,
7777+ },
7878+7979+ #[error("failed to fetch {key:?}")]
8080+ Fetch {
8181+ key: StorePath,
8282+ #[source]
8383+ source: object_store::Error,
8484+ },
8585+8686+ #[error("meta file for backup {backup_id} is not valid UTF-8")]
8787+ MetaEncoding {
8888+ backup_id: u64,
8989+ #[source]
9090+ source: FromUtf8Error,
9191+ },
9292+9393+ #[error("failed to parse meta file for backup {backup_id}")]
9494+ MetaParse {
9595+ backup_id: u64,
9696+ #[source]
9797+ source: meta::ParseError,
9898+ },
9999+100100+ #[error("no backups found")]
101101+ NoBackups,
102102+103103+ #[error("backup contains {count} excluded file(s) (unsupported)")]
104104+ ExcludedFiles { count: usize },
105105+106106+ #[error("backup path has unrecognized prefix: {0:?}")]
107107+ UnrecognizedPathPrefix(String),
108108+109109+ #[error("shared_checksum filename missing underscore: {0:?}")]
110110+ SharedChecksumNoUnderscore(String),
111111+112112+ #[error("shared_checksum filename missing extension: {0:?}")]
113113+ SharedChecksumNoExtension(String),
114114+115115+ #[error("private/ path has too few components: {0:?}")]
116116+ PrivatePathTooShort(String),
117117+118118+ #[error("size mismatch on {path}: expected {expected} bytes, got {actual}")]
119119+ SizeMismatch {
120120+ path: String,
121121+ expected: u64,
122122+ actual: u64,
123123+ },
124124+125125+ #[error("crc32c mismatch on {path}: expected {expected:#010x}, got {actual:#010x}")]
126126+ ChecksumMismatch {
127127+ path: String,
128128+ expected: u32,
129129+ actual: u32,
130130+ },
131131+132132+ #[error("{}: {source}", path.display())]
133133+ Io {
134134+ path: PathBuf,
135135+ #[source]
136136+ source: io::Error,
137137+ },
138138+}
+220
src/main.rs
···11+use std::path::PathBuf;
22+use std::sync::Arc;
33+44+use clap::{Parser, Subcommand};
55+use object_store::{ObjectStore, aws::AmazonS3Builder};
66+use tracing::warn;
77+88+#[derive(Debug, thiserror::Error)]
99+enum CliError {
1010+ #[error("failed to initialize S3 store for {endpoint:?}")]
1111+ StoreInit {
1212+ endpoint: String,
1313+ #[source]
1414+ source: object_store::Error,
1515+ },
1616+1717+ #[error("failed to list backups at prefix {prefix:?}")]
1818+ List {
1919+ prefix: String,
2020+ #[source]
2121+ source: eat_rocks::Error,
2222+ },
2323+2424+ #[error("restore to {} failed", target.display())]
2525+ Restore {
2626+ target: PathBuf,
2727+ #[source]
2828+ source: eat_rocks::Error,
2929+ },
3030+}
3131+3232+#[derive(Parser)]
3333+#[command(version, about)]
3434+struct Cli {
3535+ /// s3-compatible endpoint url
3636+ ///
3737+ /// for subdomain style (aka virtual-hosted) like tigris, include the bucket
3838+ /// in the hostname, like `--endpoint https://constellation.t3.storage.dev`.
3939+ #[arg(long)]
4040+ endpoint: String,
4141+4242+ /// bucket name
4343+ ///
4444+ /// only for path-style buckets (minio, localstack,). see `--endpoint`.
4545+ #[arg(long)]
4646+ bucket: Option<String>,
4747+4848+ /// Prefix within the bucket (path to backup root)
4949+ #[arg(long, default_value = "")]
5050+ prefix: String,
5151+5252+ /// access key ID, omit for public access
5353+ #[arg(long, env = "AWS_ACCESS_KEY_ID", requires = "secret_access_key")]
5454+ access_key_id: Option<String>,
5555+5656+ /// secret access key, omit for public access
5757+ #[arg(long, env = "AWS_SECRET_ACCESS_KEY", requires = "access_key_id")]
5858+ secret_access_key: Option<String>,
5959+6060+ #[command(subcommand)]
6161+ command: Command,
6262+}
6363+6464+#[derive(Subcommand)]
6565+enum Command {
6666+ /// show available backups
6767+ List,
6868+ /// restore a backup to a local directory
6969+ Restore {
7070+ /// backup to restore
7171+ ///
7272+ /// default: latest. use `list` to see available backups.
7373+ #[arg(long)]
7474+ backup_id: Option<u64>,
7575+7676+ /// WAL directory (default: same as target)
7777+ #[arg(long)]
7878+ wal_dir: Option<PathBuf>,
7979+8080+ /// max concurrent actions against object storage
8181+ #[arg(long, default_value_t = eat_rocks::DEFAULT_CONCURRENCY)]
8282+ concurrency: usize,
8383+8484+ /// skip verifying crc32c checksums
8585+ ///
8686+ /// these are almost free (they're computed as the file is streamed),
8787+ /// and the restore should typically be i/o-bound, so i'm not sure when/
8888+ /// why turning this off would be useful.
8989+ #[arg(long)]
9090+ no_verify: bool,
9191+9292+ /// target directory for restored database
9393+ target: PathBuf,
9494+ },
9595+}
9696+9797+impl Cli {
9898+ fn build_store(&self) -> Result<Arc<dyn ObjectStore>, Box<CliError>> {
9999+ // if `--bucket` is passed, then we get path-style buckets in the URL.
100100+ // if not, the caller is responsible for putting the bucket in the endpoint
101101+ // url, but we still need to set it, hence the `_` placeholder.
102102+ let bucket = self.bucket.as_deref().unwrap_or("_");
103103+104104+ let mut builder = AmazonS3Builder::new()
105105+ .with_endpoint(&self.endpoint)
106106+ .with_bucket_name(bucket)
107107+ .with_allow_http(true)
108108+ .with_virtual_hosted_style_request(self.bucket.is_none());
109109+110110+ builder = match (&self.access_key_id, &self.secret_access_key) {
111111+ (Some(key_id), Some(secret)) => builder
112112+ .with_access_key_id(key_id)
113113+ .with_secret_access_key(secret),
114114+ (None, None) => builder.with_skip_signature(true),
115115+ _ => unreachable!("clap `requires` ensures both or neither are present"),
116116+ };
117117+118118+ let store = builder.build().map_err(|source| CliError::StoreInit {
119119+ endpoint: self.endpoint.clone(),
120120+ source,
121121+ })?;
122122+ Ok(Arc::new(store))
123123+ }
124124+}
125125+126126+// ---------------------------------------------------------------------------
127127+128128+#[tokio::main]
129129+async fn main() {
130130+ tracing_subscriber::fmt()
131131+ .with_env_filter(
132132+ tracing_subscriber::EnvFilter::try_from_default_env()
133133+ .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
134134+ )
135135+ .with_target(false)
136136+ .init();
137137+138138+ let cli = Cli::parse();
139139+ let result = match &cli.command {
140140+ Command::List => list(&cli).await,
141141+ Command::Restore { .. } => cmd_restore(&cli).await,
142142+ };
143143+144144+ if let Err(e) = result {
145145+ eprintln!("error: {e}");
146146+ let mut err: &dyn std::error::Error = &e;
147147+ while let Some(source) = err.source() {
148148+ eprintln!(" caused by: {source}");
149149+ err = source;
150150+ }
151151+ std::process::exit(1);
152152+ }
153153+}
154154+155155+async fn list(cli: &Cli) -> Result<(), Box<CliError>> {
156156+ let store = cli.build_store()?;
157157+ let prefix = &cli.prefix;
158158+159159+ let ids = eat_rocks::list_backup_ids(&store, prefix)
160160+ .await
161161+ .map_err(|source| CliError::List {
162162+ prefix: prefix.clone(),
163163+ source,
164164+ })?;
165165+166166+ if ids.is_empty() {
167167+ warn!("no backups found");
168168+ return Ok(());
169169+ }
170170+171171+ for id in &ids {
172172+ match eat_rocks::fetch_meta(&store, prefix, *id).await {
173173+ Ok(meta) => {
174174+ println!(
175175+ "backup {id:>4} | seq {:>12} | ts {} | {} files",
176176+ meta.sequence_number,
177177+ meta.timestamp,
178178+ meta.files.len(),
179179+ );
180180+ }
181181+ Err(e) => warn!(backup_id = id, error = %e, "failed to read backup metadata"),
182182+ }
183183+ }
184184+185185+ Ok(())
186186+}
187187+188188+async fn cmd_restore(cli: &Cli) -> Result<(), Box<CliError>> {
189189+ let Command::Restore {
190190+ backup_id,
191191+ target,
192192+ wal_dir,
193193+ concurrency,
194194+ no_verify,
195195+ } = &cli.command
196196+ else {
197197+ unreachable!()
198198+ };
199199+200200+ let store = cli.build_store()?;
201201+202202+ eat_rocks::restore(
203203+ store,
204204+ &cli.prefix,
205205+ target,
206206+ eat_rocks::RestoreOptions {
207207+ backup_id: *backup_id,
208208+ concurrency: *concurrency,
209209+ verify: !no_verify,
210210+ wal_dir: wal_dir.clone(),
211211+ },
212212+ )
213213+ .await
214214+ .map_err(|source| CliError::Restore {
215215+ target: target.clone(),
216216+ source,
217217+ })?;
218218+219219+ Ok(())
220220+}
+373
src/meta.rs
···11+/// rocks backup meta file parse errors
22+#[derive(Debug, PartialEq, thiserror::Error)]
33+pub enum ParseError {
44+ #[error("empty meta file")]
55+ Empty,
66+ #[error("could not parse schema version from {0:?} (v1 and v2 supported)")]
77+ InvalidSchemaVersion(String),
88+ #[error("missing timestamp line")]
99+ MissingTimestamp,
1010+ #[error("invalid timestamp: {0:?}")]
1111+ InvalidTimestamp(String),
1212+ #[error("missing sequence number line")]
1313+ MissingSequenceNumber,
1414+ #[error("invalid sequence number: {0:?}")]
1515+ InvalidSequenceNumber(String),
1616+ #[error("unexpected end of meta file while looking for file count")]
1717+ UnexpectedEndBeforeFileCount,
1818+ #[error("invalid file count: {0:?}")]
1919+ InvalidFileCount(String),
2020+ #[error("expected {expected} file entries but file ended after {actual}")]
2121+ FileCountMismatch { expected: usize, actual: usize },
2222+ #[error("empty file entry at position {0}")]
2323+ EmptyFileEntry(usize),
2424+ #[error("field {0:?} missing its value")]
2525+ MissingFieldValue(String),
2626+ #[error("invalid crc32 value: {0:?}")]
2727+ InvalidCrc32(String),
2828+ #[error("invalid size value: {0:?}")]
2929+ InvalidSize(String),
3030+ #[error("unrecognized non-ignorable field: {0:?}")]
3131+ NonIgnorableField(String),
3232+ #[error("unrecognized value {value:?} for field {field:?}")]
3333+ UnrecognizedFieldValue { field: String, value: String },
3434+}
3535+3636+/// Parsed contents of a rocks backup meta file (`meta/<id>`).
3737+///
3838+/// See [`BackupMeta::parse`] and the
3939+/// [backup format docs](https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB).
4040+#[derive(Debug, PartialEq)]
4141+pub struct BackupMeta {
4242+ pub timestamp: u64,
4343+ pub sequence_number: u64,
4444+ pub metadata: Option<String>,
4545+ pub files: Vec<BackupFile>,
4646+}
4747+4848+/// Each file entry in a rocks backup meta file
4949+#[derive(Debug, PartialEq)]
5050+pub struct BackupFile {
5151+ /// path is relative to backup root, eg. `shared_checksum/000007_123_456.sst`
5252+ pub path: String,
5353+ pub crc32c: Option<u32>,
5454+ pub size: Option<u64>,
5555+ pub excluded: bool,
5656+}
5757+5858+impl BackupMeta {
5959+ /// Parse a RocksDB backup meta file.
6060+ ///
6161+ /// works with schema v1 (implicit) and v2.
6262+ pub fn parse(content: &str) -> Result<Self, ParseError> {
6363+ let mut lines = content.lines();
6464+6565+ let first = lines.next().ok_or(ParseError::Empty)?;
6666+6767+ // schema v1: first line is the timestamp.
6868+ // schema v2 starts with "schema_version 2.x".
6969+ let ts_line = if let Some(ver_str) = first.strip_prefix("schema_version ") {
7070+ let Some("2") = ver_str.split('.').next() else {
7171+ return Err(ParseError::InvalidSchemaVersion(ver_str.to_string()));
7272+ };
7373+ lines.next().ok_or(ParseError::MissingTimestamp)?
7474+ } else {
7575+ first
7676+ };
7777+7878+ let timestamp: u64 = ts_line
7979+ .parse()
8080+ .map_err(|_| ParseError::InvalidTimestamp(ts_line.to_string()))?;
8181+8282+ let seq_line = lines.next().ok_or(ParseError::MissingSequenceNumber)?;
8383+ let sequence_number: u64 = seq_line
8484+ .parse()
8585+ .map_err(|_| ParseError::InvalidSequenceNumber(seq_line.to_string()))?;
8686+8787+ // optional metadata lines before file count (first purely numeric line)
8888+ let mut metadata = None;
8989+ let num_files: usize = loop {
9090+ let line = lines
9191+ .next()
9292+ .ok_or(ParseError::UnexpectedEndBeforeFileCount)?;
9393+ if let Ok(n) = line.parse::<usize>() {
9494+ break n;
9595+ }
9696+ if let Some(hex) = line.strip_prefix("metadata ") {
9797+ metadata = Some(hex.to_string());
9898+ } else if line.starts_with("ni::") {
9999+ let field = line.split_whitespace().next().unwrap_or(line);
100100+ return Err(ParseError::NonIgnorableField(field.to_string()));
101101+ }
102102+ // safe to skip unknown ignorable (non-"ni::") fields
103103+ };
104104+105105+ // file entries, finally
106106+ let mut files = Vec::new();
107107+ for i in 0..num_files {
108108+ let line = lines.next().ok_or(ParseError::FileCountMismatch {
109109+ expected: num_files,
110110+ actual: i, // 0-indexed so it's last round's count (what we actually did)
111111+ })?;
112112+ files.push(parse_file_line(line, i)?);
113113+ }
114114+115115+ Ok(BackupMeta {
116116+ timestamp,
117117+ sequence_number,
118118+ metadata,
119119+ files,
120120+ })
121121+ }
122122+}
123123+124124+fn parse_file_line(line: &str, position: usize) -> Result<BackupFile, ParseError> {
125125+ let mut parts = line.split_whitespace();
126126+ let path = parts
127127+ .next()
128128+ .ok_or(ParseError::EmptyFileEntry(position))?
129129+ .to_string();
130130+131131+ let mut crc32c = None;
132132+ let mut size = None;
133133+ let mut excluded = false;
134134+135135+ while let Some(field_name) = parts.next() {
136136+ let field_value = parts
137137+ .next()
138138+ .ok_or_else(|| ParseError::MissingFieldValue(field_name.to_string()))?;
139139+140140+ match field_name {
141141+ "crc32" => {
142142+ crc32c = Some(
143143+ field_value
144144+ .parse()
145145+ .map_err(|_| ParseError::InvalidCrc32(field_value.to_string()))?,
146146+ );
147147+ }
148148+ "size" => {
149149+ size = Some(
150150+ field_value
151151+ .parse()
152152+ .map_err(|_| ParseError::InvalidSize(field_value.to_string()))?,
153153+ );
154154+ }
155155+ "ni::excluded" => match field_value {
156156+ "true" => excluded = true,
157157+ "false" => excluded = false,
158158+ _ => {
159159+ return Err(ParseError::UnrecognizedFieldValue {
160160+ field: field_name.to_string(),
161161+ value: field_value.to_string(),
162162+ });
163163+ }
164164+ },
165165+ "temp" => {}
166166+ other if other.starts_with("ni::") => {
167167+ return Err(ParseError::NonIgnorableField(other.to_string()));
168168+ }
169169+ _ => {} // unknown non-"ni" fields are safe to ignore
170170+ }
171171+ }
172172+173173+ Ok(BackupFile {
174174+ path,
175175+ crc32c,
176176+ size,
177177+ excluded,
178178+ })
179179+}
180180+181181+#[cfg(test)]
182182+mod tests {
183183+ use super::*;
184184+185185+ #[test]
186186+ fn parse_schema_v1() {
187187+ let content = "\
188188+1498774076
189189+590
190190+3
191191+private/1/CURRENT crc32 123456
192192+private/1/MANIFEST-000008 crc32 789012
193193+shared_checksum/000007_1498774076_590.sst crc32 345678";
194194+195195+ let meta = BackupMeta::parse(content).unwrap();
196196+ assert_eq!(meta.timestamp, 1498774076);
197197+ assert_eq!(meta.sequence_number, 590);
198198+ assert_eq!(meta.files.len(), 3);
199199+ assert_eq!(meta.files[0].path, "private/1/CURRENT");
200200+ assert_eq!(meta.files[0].crc32c, Some(123456));
201201+ }
202202+203203+ #[test]
204204+ fn parse_schema_v2() {
205205+ let content = "\
206206+schema_version 2.1
207207+1498774076
208208+590
209209+metadata 48656c6c6f
210210+3
211211+private/1/CURRENT crc32 123456 size 16
212212+private/1/MANIFEST-000008 crc32 789012 size 1024
213213+shared_checksum/000007_1498774076_590.sst crc32 345678 size 65536 temp kCold";
214214+215215+ let meta = BackupMeta::parse(content).unwrap();
216216+ assert_eq!(meta.timestamp, 1498774076);
217217+ assert_eq!(meta.sequence_number, 590);
218218+ assert_eq!(meta.metadata.as_deref(), Some("48656c6c6f"));
219219+ assert_eq!(meta.files.len(), 3);
220220+ assert_eq!(meta.files[2].size, Some(65536));
221221+ }
222222+223223+ #[test]
224224+ fn parse_empty() {
225225+ assert!(matches!(BackupMeta::parse(""), Err(ParseError::Empty)));
226226+ }
227227+228228+ #[test]
229229+ fn parse_unsupported_schema() {
230230+ let content = "schema_version 3.0\n0\n0\n0\n";
231231+ assert_eq!(
232232+ BackupMeta::parse(content),
233233+ Err(ParseError::InvalidSchemaVersion("3.0".to_string()))
234234+ );
235235+ }
236236+237237+ #[test]
238238+ fn non_ignorable_field_rejected() {
239239+ let content = "\
240240+1498774076
241241+590
242242+1
243243+private/1/CURRENT ni::unknown_field true";
244244+245245+ assert!(matches!(
246246+ BackupMeta::parse(content),
247247+ Err(ParseError::NonIgnorableField(_))
248248+ ));
249249+ }
250250+251251+ #[test]
252252+ fn bogus_file_count_does_not_allocate() {
253253+ // Regression: "2\n2\n64406400" claimed 64M files with 0 lines remaining,
254254+ // causing a multi-GB Vec::with_capacity before the loop could fail.
255255+ let content = "2\n2\n64406400";
256256+ assert!(matches!(
257257+ BackupMeta::parse(content),
258258+ Err(ParseError::FileCountMismatch {
259259+ expected: 64406400,
260260+ actual: 0
261261+ })
262262+ ));
263263+ }
264264+265265+ #[test]
266266+ fn explicit_schema_v1_rejected() {
267267+ let content = "schema_version 1.0\n1000\n100\n0\n";
268268+ assert_eq!(
269269+ BackupMeta::parse(content),
270270+ Err(ParseError::InvalidSchemaVersion("1.0".to_string()))
271271+ );
272272+ }
273273+274274+ #[test]
275275+ fn non_ignorable_header_field_rejected() {
276276+ let content = "\
277277+schema_version 2.1
278278+1498774076
279279+590
280280+ni::future_breaking_field something
281281+0";
282282+ assert!(matches!(
283283+ BackupMeta::parse(content),
284284+ Err(ParseError::NonIgnorableField(_))
285285+ ));
286286+ }
287287+288288+ #[test]
289289+ fn unknown_ignorable_header_field_skipped() {
290290+ let content = "\
291291+schema_version 2.1
292292+1498774076
293293+590
294294+some_future_field data
295295+0";
296296+ let meta = BackupMeta::parse(content).unwrap();
297297+ assert_eq!(meta.timestamp, 1498774076);
298298+ assert_eq!(meta.files.len(), 0);
299299+ }
300300+301301+ #[test]
302302+ fn duplicate_metadata_uses_last() {
303303+ let content = "\
304304+schema_version 2.1
305305+1498774076
306306+590
307307+metadata aaa
308308+metadata bbb
309309+0";
310310+ let meta = BackupMeta::parse(content).unwrap();
311311+ assert_eq!(meta.metadata.as_deref(), Some("bbb"));
312312+ }
313313+314314+ #[test]
315315+ fn file_fields_in_any_order() {
316316+ let content = "\
317317+1498774076
318318+590
319319+1
320320+private/1/CURRENT size 16 crc32 123456 temp kHot";
321321+ let meta = BackupMeta::parse(content).unwrap();
322322+ assert_eq!(meta.files[0].crc32c, Some(123456));
323323+ assert_eq!(meta.files[0].size, Some(16));
324324+ }
325325+326326+ #[test]
327327+ fn duplicate_file_fields_use_last() {
328328+ let content = "\
329329+1498774076
330330+590
331331+1
332332+private/1/CURRENT crc32 111 crc32 222";
333333+ let meta = BackupMeta::parse(content).unwrap();
334334+ assert_eq!(meta.files[0].crc32c, Some(222));
335335+ }
336336+337337+ #[test]
338338+ fn unknown_ignorable_file_field_skipped() {
339339+ let content = "\
340340+1498774076
341341+590
342342+1
343343+private/1/CURRENT crc32 123 future_field value123";
344344+ let meta = BackupMeta::parse(content).unwrap();
345345+ assert_eq!(meta.files[0].crc32c, Some(123));
346346+ }
347347+348348+ #[test]
349349+ fn excluded_bad_value_rejected() {
350350+ let content = "\
351351+1498774076
352352+590
353353+1
354354+private/1/CURRENT ni::excluded banana";
355355+ assert!(matches!(
356356+ BackupMeta::parse(content),
357357+ Err(ParseError::UnrecognizedFieldValue { .. })
358358+ ));
359359+ }
360360+361361+ #[test]
362362+ fn excluded_file_parsed() {
363363+ let content = "\
364364+schema_version 2.1
365365+1498774076
366366+590
367367+1
368368+shared_checksum/000007_123_456.sst crc32 999 ni::excluded true";
369369+370370+ let meta = BackupMeta::parse(content).unwrap();
371371+ assert!(meta.files[0].excluded);
372372+ }
373373+}
+728
src/restore.rs
···11+use std::path::{Path, PathBuf};
22+use std::sync::Arc;
33+use std::time::Instant;
44+55+use futures::StreamExt;
66+use object_store::limit::LimitStore;
77+use object_store::path::Path as StorePath;
88+use object_store::{ObjectStore, ObjectStoreExt};
99+use tokio::io::{AsyncWriteExt, BufWriter};
1010+use tracing::{debug, info};
1111+1212+use crate::{BackupFile, Error, meta::BackupMeta};
1313+1414+/// List available backups from the `meta/` directory under `prefix`
1515+///
1616+/// returns a sorted vec of backups ids.
1717+///
1818+/// in-progress backups (`.tmp` files) are excluded.
1919+pub async fn list_backup_ids(store: &dyn ObjectStore, prefix: &str) -> Result<Vec<u64>, Error> {
2020+ let meta_prefix = StorePath::from(prefix).join("meta");
2121+ let mut ids = Vec::new();
2222+2323+ let mut stream = store.list(Some(&meta_prefix));
2424+ while let Some(item) = stream.next().await {
2525+ let item = item.map_err(|e| Error::List {
2626+ prefix: meta_prefix.clone(),
2727+ source: e,
2828+ })?;
2929+ if let Some(name) = item.location.filename()
3030+ && !name.ends_with(".tmp")
3131+ {
3232+ if let Ok(id) = name.parse::<u64>() {
3333+ ids.push(id);
3434+ } else {
3535+ debug!(name, "ignoring invalid backup id");
3636+ }
3737+ }
3838+ }
3939+4040+ ids.sort();
4141+ Ok(ids)
4242+}
4343+4444+/// default max concurrent object store operations
4545+pub const DEFAULT_CONCURRENCY: usize = 64;
4646+4747+/// configure how a backup-restore happens
4848+#[derive(Debug)]
4949+pub struct RestoreOptions {
5050+ /// backup to restore, `None` for latest
5151+ ///
5252+ /// default: [`None`]
5353+ pub backup_id: Option<u64>,
5454+ /// max concurrent object store operations
5555+ ///
5656+ /// default: [`DEFAULT_CONCURRENCY`]
5757+ pub concurrency: usize,
5858+ /// set false to disable streaming crc32c checksum verification
5959+ ///
6060+ /// default: is true
6161+ ///
6262+ /// file sizes are always verified if present in the meta file (which rocks
6363+ /// actually does not include currently), regardless of this setting.
6464+ pub verify: bool,
6565+ /// specify to place WAL files somewhere non-default
6666+ pub wal_dir: Option<PathBuf>,
6767+}
6868+6969+impl Default for RestoreOptions {
7070+ fn default() -> Self {
7171+ Self {
7272+ backup_id: None,
7373+ concurrency: DEFAULT_CONCURRENCY,
7474+ verify: true,
7575+ wal_dir: None,
7676+ }
7777+ }
7878+}
7979+8080+async fn download_file(
8181+ store: Arc<dyn ObjectStore>,
8282+ store_prefix: StorePath,
8383+ file: &BackupFile,
8484+ target: PathBuf,
8585+ wal_dir: PathBuf,
8686+ verify: bool,
8787+) -> Result<u64, Error> {
8888+ let name = db_filename(&file.path)?;
8989+9090+ let mut key = store_prefix;
9191+ key.extend(&StorePath::from(file.path.as_str()));
9292+9393+ // write CURRENT to a temp file (we atomically rename once other files are ready)
9494+ let dest = if name.as_os_str() == "CURRENT" {
9595+ target.join("CURRENT.tmp")
9696+ } else if name.extension().is_some_and(|ext| ext == "log") {
9797+ wal_dir.join(&name)
9898+ } else {
9999+ target.join(&name)
100100+ };
101101+102102+ let result = store.get(&key).await.map_err(|e| Error::Fetch {
103103+ key: key.clone(),
104104+ source: e,
105105+ })?;
106106+107107+ let mut stream = result.into_stream();
108108+109109+ let f = tokio::fs::File::create(&dest)
110110+ .await
111111+ .map_err(|e| Error::Io {
112112+ path: dest.clone(),
113113+ source: e,
114114+ })?;
115115+ let mut out = BufWriter::new(f);
116116+117117+ let mut total_size = 0u64;
118118+ let mut crc = 0u32;
119119+ let do_crc = verify && file.crc32c.is_some();
120120+121121+ while let Some(chunk) = stream.next().await {
122122+ let chunk = chunk.map_err(|e| Error::Fetch {
123123+ key: key.clone(),
124124+ source: e,
125125+ })?;
126126+ total_size += chunk.len() as u64;
127127+ if do_crc {
128128+ crc = crc32c::crc32c_append(crc, &chunk);
129129+ }
130130+ out.write_all(&chunk).await.map_err(|e| Error::Io {
131131+ path: dest.clone(),
132132+ source: e,
133133+ })?;
134134+ }
135135+136136+ out.shutdown().await.map_err(|e| Error::Io {
137137+ path: dest.clone(),
138138+ source: e,
139139+ })?;
140140+141141+ if let Some(expected) = file.size
142142+ && total_size != expected
143143+ {
144144+ return Err(Error::SizeMismatch {
145145+ path: file.path.clone(),
146146+ expected,
147147+ actual: total_size,
148148+ });
149149+ }
150150+151151+ if let Some(expected) = file.crc32c.filter(|_| verify)
152152+ && crc != expected
153153+ {
154154+ return Err(Error::ChecksumMismatch {
155155+ path: file.path.clone(),
156156+ expected,
157157+ actual: crc,
158158+ });
159159+ }
160160+161161+ Ok(total_size)
162162+}
163163+164164+/// Restore a rocks backup to a `target` directory.
165165+///
166166+/// streams files from the backup's meta file to disk. see [`RestoreOptions`]
167167+/// for the main behaviours.
168168+///
169169+/// backups with [excluded files](https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB)
170170+/// (references to other backup directories) reject with [`Error::ExcludedFiles`].
171171+pub async fn restore(
172172+ store: Arc<dyn ObjectStore>,
173173+ prefix: &str,
174174+ target: impl AsRef<Path>,
175175+ opts: RestoreOptions,
176176+) -> Result<(), Error> {
177177+ let target = target.as_ref();
178178+ let RestoreOptions {
179179+ backup_id,
180180+ concurrency,
181181+ verify,
182182+ wal_dir,
183183+ } = opts;
184184+185185+ let wal_dir = wal_dir.as_deref().unwrap_or(target);
186186+187187+ // object_store's built-in upstream concurrency limit
188188+ let store: Arc<dyn ObjectStore> = Arc::new(LimitStore::new(store, concurrency));
189189+190190+ // resolve latest backup if unspecified
191191+ let id = match backup_id {
192192+ Some(id) => id,
193193+ None => {
194194+ let ids = list_backup_ids(&*store, prefix).await?;
195195+ *ids.last().ok_or(Error::NoBackups)?
196196+ }
197197+ };
198198+199199+ let meta = fetch_meta(&*store, prefix, id).await?;
200200+ let excluded_count = meta.files.iter().filter(|f| f.excluded).count();
201201+ if excluded_count > 0 {
202202+ return Err(Error::ExcludedFiles {
203203+ count: excluded_count,
204204+ });
205205+ }
206206+ info!(
207207+ backup_id = id,
208208+ file_count = meta.files.len(),
209209+ sequence_number = meta.sequence_number,
210210+ "restoring backup"
211211+ );
212212+213213+ tokio::fs::create_dir_all(target)
214214+ .await
215215+ .map_err(|e| Error::Io {
216216+ path: target.to_path_buf(),
217217+ source: e,
218218+ })?;
219219+ if wal_dir != target {
220220+ tokio::fs::create_dir_all(wal_dir)
221221+ .await
222222+ .map_err(|e| Error::Io {
223223+ path: wal_dir.to_path_buf(),
224224+ source: e,
225225+ })?;
226226+ }
227227+228228+ let started = Instant::now();
229229+ let store_prefix = StorePath::from(prefix);
230230+ let target = target.to_path_buf();
231231+ let wal_dir = wal_dir.to_path_buf();
232232+233233+ // build concurrent download tasks
234234+ let tasks = meta.files.iter().map(|f| {
235235+ download_file(
236236+ Arc::clone(&store),
237237+ store_prefix.clone(),
238238+ f,
239239+ target.clone(),
240240+ wal_dir.clone(),
241241+ verify,
242242+ )
243243+ });
244244+245245+ let total = tasks.len();
246246+ let mut completed = 0usize;
247247+ let mut total_bytes = 0u64;
248248+249249+ // buffered_unordered keeps a kind of window over a limited number of polling tasks
250250+ // (kind of because "unordered" means slow early tasks don't stop it advancing)
251251+ let mut stream = futures::stream::iter(tasks).buffer_unordered(concurrency);
252252+ while let Some(result) = stream.next().await {
253253+ let bytes = result?; // fail if any download fails
254254+ completed += 1;
255255+ total_bytes += bytes;
256256+ if completed.is_multiple_of(100) || completed == total {
257257+ let elapsed_secs = started.elapsed().as_secs_f64();
258258+ let mb = total_bytes as f64 / 2_f64.powf(20.);
259259+ let rate_mb_s = if elapsed_secs > 0.0 {
260260+ mb / elapsed_secs
261261+ } else {
262262+ 0.0
263263+ };
264264+ info!(
265265+ completed,
266266+ total,
267267+ downloaded_mb = format_args!("{mb:.1}"),
268268+ elapsed_secs = format_args!("{elapsed_secs:.1}"),
269269+ rate_mb_s = format_args!("{rate_mb_s:.1}"),
270270+ "progress"
271271+ );
272272+ }
273273+ }
274274+275275+ // atomically do the `CURRENT` thing, finally
276276+ let current_tmp = target.join("CURRENT.tmp");
277277+ let current_final = target.join("CURRENT");
278278+ tokio::fs::rename(¤t_tmp, ¤t_final)
279279+ .await
280280+ .map_err(|e| Error::Io {
281281+ path: current_final,
282282+ source: e,
283283+ })?;
284284+285285+ let elapsed_secs = started.elapsed().as_secs_f64();
286286+ let mb = total_bytes as f64 / 2_f64.powf(20.);
287287+ let rate_mb_s = if elapsed_secs > 0.0 {
288288+ mb / elapsed_secs
289289+ } else {
290290+ 0.0
291291+ };
292292+ info!(
293293+ total_files = total,
294294+ total_mb = format_args!("{mb:.1}"),
295295+ elapsed_secs = format_args!("{elapsed_secs:.1}"),
296296+ rate_mb_s = format_args!("{rate_mb_s:.1}"),
297297+ "restore complete"
298298+ );
299299+300300+ Ok(())
301301+}
302302+303303+/// Fetch and parse the rocks meta file for a backup
304304+pub async fn fetch_meta(
305305+ store: &dyn ObjectStore,
306306+ prefix: &str,
307307+ id: u64,
308308+) -> Result<BackupMeta, Error> {
309309+ let key = StorePath::from(prefix).join("meta").join(id.to_string());
310310+311311+ let data = store
312312+ .get(&key)
313313+ .await
314314+ .map_err(|e| Error::Fetch {
315315+ key: key.clone(),
316316+ source: e,
317317+ })?
318318+ .bytes()
319319+ .await
320320+ .map_err(|e| Error::Fetch { key, source: e })?;
321321+322322+ let text = String::from_utf8(data.to_vec()).map_err(|e| Error::MetaEncoding {
323323+ backup_id: id,
324324+ source: e,
325325+ })?;
326326+327327+ BackupMeta::parse(&text).map_err(|e| Error::MetaParse {
328328+ backup_id: id,
329329+ source: e,
330330+ })
331331+}
332332+333333+/// Convert a backup-relative path to the DB filename it restores as.
334334+///
335335+/// `shared_checksum/000007_2894567812_590.sst` -> `000007.sst`
336336+/// `private/1/MANIFEST-000008` -> `MANIFEST-000008`
337337+/// `shared/000007.sst` -> `000007.sst`
338338+pub(crate) fn db_filename(backup_path: &str) -> Result<PathBuf, Error> {
339339+ let sp = StorePath::from(backup_path);
340340+ let parts: Vec<_> = sp.parts().collect();
341341+342342+ match parts.first().map(|p| p.as_ref()) {
343343+ Some("shared_checksum") => {
344344+ let filename = parts
345345+ .last()
346346+ .ok_or_else(|| Error::SharedChecksumNoExtension(backup_path.to_string()))?;
347347+ unmangle_shared_checksum(filename.as_ref())
348348+ }
349349+ Some("private") => {
350350+ // private/<id>/<filename>
351351+ parts
352352+ .get(2)
353353+ .map(|p| PathBuf::from(p.as_ref()))
354354+ .ok_or_else(|| Error::PrivatePathTooShort(backup_path.to_string()))
355355+ }
356356+ Some("shared") => parts
357357+ .last()
358358+ .map(|p| PathBuf::from(p.as_ref()))
359359+ .ok_or_else(|| Error::UnrecognizedPathPrefix(backup_path.to_string())),
360360+ _ => Err(Error::UnrecognizedPathPrefix(backup_path.to_string())),
361361+ }
362362+}
363363+364364+/// Unmangle a `shared_checksum` filename back to its original DB name.
365365+///
366366+/// `000007_2894567812_590.sst` -> `000007.sst`
367367+fn unmangle_shared_checksum(mangled: &str) -> Result<PathBuf, Error> {
368368+ let p = Path::new(mangled);
369369+ let ext = p
370370+ .extension()
371371+ .and_then(|e| e.to_str())
372372+ .ok_or_else(|| Error::SharedChecksumNoExtension(mangled.to_string()))?;
373373+ let stem = p
374374+ .file_stem()
375375+ .and_then(|s| s.to_str())
376376+ .ok_or_else(|| Error::SharedChecksumNoExtension(mangled.to_string()))?;
377377+ let underscore = stem
378378+ .find('_')
379379+ .ok_or_else(|| Error::SharedChecksumNoUnderscore(mangled.to_string()))?;
380380+ Ok(PathBuf::from(format!("{}.{ext}", &stem[..underscore])))
381381+}
382382+383383+#[cfg(test)]
384384+mod tests {
385385+ use super::*;
386386+ use object_store::{PutPayload, memory::InMemory};
387387+388388+ async fn put(store: &InMemory, path: &str, data: &[u8]) {
389389+ use object_store::ObjectStoreExt;
390390+ store
391391+ .put(
392392+ &StorePath::from(path),
393393+ PutPayload::from_iter(data.iter().copied()),
394394+ )
395395+ .await
396396+ .unwrap();
397397+ }
398398+399399+ fn build_meta(timestamp: u64, seq: u64, files: &[(&str, &[u8])]) -> String {
400400+ let mut lines = vec![
401401+ timestamp.to_string(),
402402+ seq.to_string(),
403403+ files.len().to_string(),
404404+ ];
405405+ for (path, data) in files {
406406+ let crc = crc32c::crc32c(data);
407407+ lines.push(format!("{path} crc32 {crc}"));
408408+ }
409409+ lines.join("\n")
410410+ }
411411+412412+ async fn add_backup(
413413+ store: &InMemory,
414414+ id: u64,
415415+ timestamp: u64,
416416+ seq: u64,
417417+ files: &[(&str, &[u8])],
418418+ ) {
419419+ let meta = build_meta(timestamp, seq, files);
420420+ put(store, &format!("meta/{id}"), meta.as_bytes()).await;
421421+ for (path, data) in files {
422422+ put(store, path, data).await;
423423+ }
424424+ }
425425+426426+ #[tokio::test]
427427+ async fn list_discovers_backups() {
428428+ let store = InMemory::new();
429429+ add_backup(&store, 1, 1000, 100, &[("private/1/CURRENT", b"M-1\n")]).await;
430430+ add_backup(&store, 3, 3000, 300, &[("private/3/CURRENT", b"M-3\n")]).await;
431431+ put(&store, "meta/5.tmp", b"in progress").await;
432432+433433+ let ids = list_backup_ids(&store, "").await.unwrap();
434434+ assert_eq!(ids, vec![1, 3]);
435435+ }
436436+437437+ #[tokio::test]
438438+ async fn list_with_prefix() {
439439+ let store = InMemory::new();
440440+ let meta = build_meta(1000, 100, &[("pfx/private/1/CURRENT", b"M\n")]);
441441+ put(&store, "pfx/meta/1", meta.as_bytes()).await;
442442+ put(&store, "pfx/private/1/CURRENT", b"M\n").await;
443443+444444+ let ids = list_backup_ids(&store, "pfx").await.unwrap();
445445+ assert_eq!(ids, vec![1]);
446446+ }
447447+448448+ #[tokio::test]
449449+ async fn list_empty() {
450450+ let store = InMemory::new();
451451+ let ids = list_backup_ids(&store, "").await.unwrap();
452452+ assert!(ids.is_empty());
453453+ }
454454+455455+ #[tokio::test]
456456+ async fn places_files_correctly() {
457457+ let store = InMemory::new();
458458+ let current = b"MANIFEST-000008\n";
459459+ let manifest = b"manifest-data-here";
460460+ let options = b"options-data-here";
461461+ let sst = b"sst-file-contents!";
462462+463463+ add_backup(
464464+ &store,
465465+ 1,
466466+ 1000,
467467+ 100,
468468+ &[
469469+ ("private/1/CURRENT", current),
470470+ ("private/1/MANIFEST-000008", manifest),
471471+ ("private/1/OPTIONS-000009", options),
472472+ ("shared_checksum/000007_123_456.sst", sst),
473473+ ],
474474+ )
475475+ .await;
476476+477477+ let target = tempfile::tempdir().unwrap();
478478+ let tp = target.path();
479479+480480+ restore(
481481+ Arc::new(store),
482482+ "",
483483+ tp,
484484+ RestoreOptions {
485485+ backup_id: Some(1),
486486+ ..Default::default()
487487+ },
488488+ )
489489+ .await
490490+ .unwrap();
491491+492492+ assert_eq!(std::fs::read(tp.join("CURRENT")).unwrap(), current);
493493+ assert_eq!(std::fs::read(tp.join("MANIFEST-000008")).unwrap(), manifest);
494494+ assert_eq!(std::fs::read(tp.join("OPTIONS-000009")).unwrap(), options);
495495+ assert_eq!(std::fs::read(tp.join("000007.sst")).unwrap(), sst);
496496+ }
497497+498498+ #[tokio::test]
499499+ async fn routes_wal_to_wal_dir() {
500500+ let store = InMemory::new();
501501+ add_backup(
502502+ &store,
503503+ 1,
504504+ 1000,
505505+ 100,
506506+ &[
507507+ ("private/1/CURRENT", b"M-1\n"),
508508+ ("private/1/000003.log", b"wal-data"),
509509+ ],
510510+ )
511511+ .await;
512512+513513+ let target = tempfile::tempdir().unwrap();
514514+ let wal = tempfile::tempdir().unwrap();
515515+516516+ restore(
517517+ Arc::new(store),
518518+ "",
519519+ target.path(),
520520+ RestoreOptions {
521521+ backup_id: Some(1),
522522+ wal_dir: Some(wal.path().to_path_buf()),
523523+ ..Default::default()
524524+ },
525525+ )
526526+ .await
527527+ .unwrap();
528528+529529+ assert!(wal.path().join("000003.log").exists());
530530+ assert!(!target.path().join("000003.log").exists());
531531+ assert!(target.path().join("CURRENT").exists());
532532+ }
533533+534534+ #[tokio::test]
535535+ async fn defaults_to_latest() {
536536+ let store = InMemory::new();
537537+ add_backup(&store, 1, 1000, 100, &[("private/1/CURRENT", b"old\n")]).await;
538538+ add_backup(&store, 5, 5000, 500, &[("private/5/CURRENT", b"new\n")]).await;
539539+540540+ let target = tempfile::tempdir().unwrap();
541541+542542+ restore(Arc::new(store), "", target.path(), Default::default())
543543+ .await
544544+ .unwrap();
545545+546546+ assert_eq!(
547547+ std::fs::read(target.path().join("CURRENT")).unwrap(),
548548+ b"new\n"
549549+ );
550550+ }
551551+552552+ #[tokio::test]
553553+ async fn rejects_excluded_files() {
554554+ let store = InMemory::new();
555555+ let current = b"M-1\n";
556556+ let crc = crc32c::crc32c(current);
557557+ let meta = format!(
558558+ "schema_version 2.1\n1000\n100\n2\n\
559559+ private/1/CURRENT crc32 {crc}\n\
560560+ shared_checksum/000099_123_456.sst crc32 999 ni::excluded true"
561561+ );
562562+ put(&store, "meta/1", meta.as_bytes()).await;
563563+ put(&store, "private/1/CURRENT", current).await;
564564+565565+ let target = tempfile::tempdir().unwrap();
566566+567567+ let result = restore(
568568+ Arc::new(store),
569569+ "",
570570+ target.path(),
571571+ RestoreOptions {
572572+ backup_id: Some(1),
573573+ ..Default::default()
574574+ },
575575+ )
576576+ .await;
577577+ assert!(matches!(result, Err(Error::ExcludedFiles { count: 1 })));
578578+ }
579579+580580+ #[tokio::test]
581581+ async fn verify_passes_with_correct_crc() {
582582+ let store = InMemory::new();
583583+ add_backup(&store, 1, 1000, 100, &[("private/1/CURRENT", b"data\n")]).await;
584584+585585+ let target = tempfile::tempdir().unwrap();
586586+587587+ restore(
588588+ Arc::new(store),
589589+ "",
590590+ target.path(),
591591+ RestoreOptions {
592592+ backup_id: Some(1),
593593+ verify: true,
594594+ ..Default::default()
595595+ },
596596+ )
597597+ .await
598598+ .unwrap();
599599+ }
600600+601601+ #[tokio::test]
602602+ async fn size_mismatch_detected() {
603603+ let store = InMemory::new();
604604+ let content = b"hello";
605605+ let crc = crc32c::crc32c(content);
606606+ // Meta claims size 999 but actual content is 5 bytes
607607+ let meta = format!(
608608+ "schema_version 2.1\n1000\n100\n1\n\
609609+ private/1/CURRENT crc32 {crc} size 999"
610610+ );
611611+ put(&store, "meta/1", meta.as_bytes()).await;
612612+ put(&store, "private/1/CURRENT", content).await;
613613+614614+ let target = tempfile::tempdir().unwrap();
615615+616616+ let result = restore(
617617+ Arc::new(store),
618618+ "",
619619+ target.path(),
620620+ RestoreOptions {
621621+ backup_id: Some(1),
622622+ ..Default::default()
623623+ },
624624+ )
625625+ .await;
626626+ assert!(matches!(
627627+ result,
628628+ Err(Error::SizeMismatch {
629629+ expected: 999,
630630+ actual: 5,
631631+ ..
632632+ })
633633+ ));
634634+ }
635635+636636+ #[tokio::test]
637637+ async fn verify_catches_mismatch() {
638638+ let store = InMemory::new();
639639+ let meta = build_meta(1000, 100, &[("private/1/CURRENT", b"correct data")]);
640640+ put(&store, "meta/1", meta.as_bytes()).await;
641641+ put(&store, "private/1/CURRENT", b"tampered data").await;
642642+643643+ let target = tempfile::tempdir().unwrap();
644644+645645+ let result = restore(
646646+ Arc::new(store),
647647+ "",
648648+ target.path(),
649649+ RestoreOptions {
650650+ backup_id: Some(1),
651651+ verify: true,
652652+ ..Default::default()
653653+ },
654654+ )
655655+ .await;
656656+ assert!(matches!(result, Err(Error::ChecksumMismatch { .. })));
657657+ }
658658+659659+ #[tokio::test]
660660+ async fn no_backups() {
661661+ let store = InMemory::new();
662662+ let target = tempfile::tempdir().unwrap();
663663+664664+ let result = restore(Arc::new(store), "", target.path(), Default::default()).await;
665665+ assert!(matches!(result, Err(Error::NoBackups)));
666666+ }
667667+668668+ #[tokio::test]
669669+ async fn fetch_meta_parses() {
670670+ let store = InMemory::new();
671671+ add_backup(
672672+ &store,
673673+ 1,
674674+ 1000,
675675+ 100,
676676+ &[
677677+ ("private/1/CURRENT", b"MANIFEST-1\n"),
678678+ ("shared_checksum/000007_123_456.sst", b"sst-data"),
679679+ ],
680680+ )
681681+ .await;
682682+683683+ let meta = fetch_meta(&store, "", 1).await.unwrap();
684684+ assert_eq!(meta.timestamp, 1000);
685685+ assert_eq!(meta.sequence_number, 100);
686686+ assert_eq!(meta.files.len(), 2);
687687+ }
688688+689689+ #[test]
690690+ fn shared_checksum() {
691691+ assert_eq!(
692692+ db_filename("shared_checksum/000007_2894567812_590.sst").unwrap(),
693693+ Path::new("000007.sst")
694694+ );
695695+ }
696696+697697+ #[test]
698698+ fn private() {
699699+ assert_eq!(
700700+ db_filename("private/1/MANIFEST-000008").unwrap(),
701701+ Path::new("MANIFEST-000008")
702702+ );
703703+ }
704704+705705+ #[test]
706706+ fn shared() {
707707+ assert_eq!(
708708+ db_filename("shared/000007.sst").unwrap(),
709709+ Path::new("000007.sst")
710710+ );
711711+ }
712712+713713+ #[test]
714714+ fn unrecognized_prefix() {
715715+ assert!(matches!(
716716+ db_filename("unknown/file.sst"),
717717+ Err(Error::UnrecognizedPathPrefix(_))
718718+ ));
719719+ }
720720+721721+ #[test]
722722+ fn private_too_short() {
723723+ assert!(matches!(
724724+ db_filename("private/1"),
725725+ Err(Error::PrivatePathTooShort(_))
726726+ ));
727727+ }
728728+}
+464
tests/e2e.rs
···11+use std::collections::BTreeMap;
22+use std::path::Path;
33+use std::sync::Arc;
44+55+use object_store::memory::InMemory;
66+use object_store::path::Path as StorePath;
77+use object_store::{ObjectStoreExt, PutPayload};
88+99+use eat_rocks::Error;
1010+1111+// ---------------------------------------------------------------------------
1212+// Test helpers
1313+// ---------------------------------------------------------------------------
1414+1515+async fn put(store: &InMemory, path: &str, data: &[u8]) {
1616+ store
1717+ .put(
1818+ &StorePath::from(path),
1919+ PutPayload::from_iter(data.iter().copied()),
2020+ )
2121+ .await
2222+ .unwrap();
2323+}
2424+2525+/// Recursively collect every file under `dir`, returning (path-relative-to-base, contents).
2626+fn collect_files(base: &Path, dir: &Path) -> Vec<(String, Vec<u8>)> {
2727+ let mut out = Vec::new();
2828+ for entry in std::fs::read_dir(dir).unwrap() {
2929+ let entry = entry.unwrap();
3030+ let path = entry.path();
3131+ if path.is_dir() {
3232+ out.extend(collect_files(base, &path));
3333+ } else if path.is_file() {
3434+ let rel = path
3535+ .strip_prefix(base)
3636+ .unwrap()
3737+ .to_str()
3838+ .unwrap()
3939+ .to_string();
4040+ out.push((rel, std::fs::read(&path).unwrap()));
4141+ }
4242+ }
4343+ out
4444+}
4545+4646+/// Load every file in a directory tree into an InMemory object store.
4747+async fn load_dir_into_store(store: &InMemory, dir: &Path) {
4848+ for (rel, data) in collect_files(dir, dir) {
4949+ put(store, &rel, &data).await;
5050+ }
5151+}
5252+5353+/// Restore via RocksDB's own BackupEngine.
5454+fn rocksdb_restore(
5555+ backup_dir: &Path,
5656+ db_dir: &Path,
5757+ wal_dir: &Path,
5858+ backup_id: u32,
5959+) -> Result<(), rocksdb::Error> {
6060+ let opts = rocksdb::backup::BackupEngineOptions::new(backup_dir)?;
6161+ let env = rocksdb::Env::new()?;
6262+ let mut engine = rocksdb::backup::BackupEngine::open(&opts, &env)?;
6363+ engine.restore_from_backup(
6464+ db_dir,
6565+ wal_dir,
6666+ &rocksdb::backup::RestoreOptions::default(),
6767+ backup_id,
6868+ )
6969+}
7070+7171+/// Restore via our library.
7272+async fn our_restore(
7373+ backup_dir: &Path,
7474+ db_dir: &Path,
7575+ wal_dir: &Path,
7676+ backup_id: u64,
7777+) -> Result<(), Error> {
7878+ let store = InMemory::new();
7979+ load_dir_into_store(&store, backup_dir).await;
8080+ eat_rocks::restore(
8181+ Arc::new(store),
8282+ "",
8383+ db_dir,
8484+ eat_rocks::RestoreOptions {
8585+ backup_id: Some(backup_id),
8686+ verify: true,
8787+ wal_dir: Some(wal_dir.to_path_buf()),
8888+ ..Default::default()
8989+ },
9090+ )
9191+ .await
9292+}
9393+9494+/// Assert two directory trees contain the exact same files with identical contents.
9595+fn assert_dirs_equal(label: &str, dir_a: &Path, dir_b: &Path) {
9696+ let files_a: BTreeMap<String, Vec<u8>> = collect_files(dir_a, dir_a).into_iter().collect();
9797+ let files_b: BTreeMap<String, Vec<u8>> = collect_files(dir_b, dir_b).into_iter().collect();
9898+9999+ let keys_a: Vec<_> = files_a.keys().collect();
100100+ let keys_b: Vec<_> = files_b.keys().collect();
101101+ assert_eq!(
102102+ keys_a, keys_b,
103103+ "{label}: file sets differ\n left: {keys_a:?}\n right: {keys_b:?}"
104104+ );
105105+106106+ for (name, content_a) in &files_a {
107107+ assert_eq!(
108108+ content_a,
109109+ &files_b[name],
110110+ "{label}: contents differ for {name} ({} vs {} bytes)",
111111+ content_a.len(),
112112+ files_b[name].len(),
113113+ );
114114+ }
115115+}
116116+117117+/// Create a DB, populate it, back it up. Returns (db_dir, backup_dir) tempdirs.
118118+fn make_backup(key_range: std::ops::Range<u32>) -> (tempfile::TempDir, tempfile::TempDir) {
119119+ let db_dir = tempfile::tempdir().unwrap();
120120+ let backup_dir = tempfile::tempdir().unwrap();
121121+ {
122122+ let db = rocksdb::DB::open_default(db_dir.path()).unwrap();
123123+ for i in key_range {
124124+ db.put(format!("k{i:04}").as_bytes(), format!("v{i}").as_bytes())
125125+ .unwrap();
126126+ }
127127+ let opts = rocksdb::backup::BackupEngineOptions::new(backup_dir.path()).unwrap();
128128+ let env = rocksdb::Env::new().unwrap();
129129+ let mut engine = rocksdb::backup::BackupEngine::open(&opts, &env).unwrap();
130130+ engine.create_new_backup_flush(&db, true).unwrap();
131131+ }
132132+ (db_dir, backup_dir)
133133+}
134134+135135+// ---------------------------------------------------------------------------
136136+// E2E: create real backup -> restore via library -> verify DB
137137+// ---------------------------------------------------------------------------
138138+139139+#[tokio::test]
140140+async fn e2e_single_backup() {
141141+ let db_dir = tempfile::tempdir().unwrap();
142142+ let backup_dir = tempfile::tempdir().unwrap();
143143+144144+ {
145145+ let db = rocksdb::DB::open_default(db_dir.path()).unwrap();
146146+ for i in 0..100u32 {
147147+ db.put(
148148+ format!("key_{i:04}").as_bytes(),
149149+ format!("value_{i}").as_bytes(),
150150+ )
151151+ .unwrap();
152152+ }
153153+ let opts = rocksdb::backup::BackupEngineOptions::new(backup_dir.path()).unwrap();
154154+ let env = rocksdb::Env::new().unwrap();
155155+ let mut engine = rocksdb::backup::BackupEngine::open(&opts, &env).unwrap();
156156+ engine.create_new_backup_flush(&db, true).unwrap();
157157+ }
158158+159159+ let store = InMemory::new();
160160+ load_dir_into_store(&store, backup_dir.path()).await;
161161+162162+ let restore_dir = tempfile::tempdir().unwrap();
163163+ eat_rocks::restore(
164164+ Arc::new(store),
165165+ "",
166166+ restore_dir.path(),
167167+ eat_rocks::RestoreOptions {
168168+ backup_id: Some(1),
169169+ verify: true,
170170+ ..Default::default()
171171+ },
172172+ )
173173+ .await
174174+ .unwrap();
175175+176176+ {
177177+ let db = rocksdb::DB::open_for_read_only(
178178+ &rocksdb::Options::default(),
179179+ restore_dir.path(),
180180+ false,
181181+ )
182182+ .unwrap();
183183+ for i in 0..100u32 {
184184+ let val = db
185185+ .get(format!("key_{i:04}").as_bytes())
186186+ .unwrap()
187187+ .expect("key missing after restore");
188188+ assert_eq!(val, format!("value_{i}").as_bytes());
189189+ }
190190+ }
191191+}
192192+193193+#[tokio::test]
194194+async fn e2e_multiple_backups_shared_checksum() {
195195+ let db_dir = tempfile::tempdir().unwrap();
196196+ let backup_dir = tempfile::tempdir().unwrap();
197197+198198+ {
199199+ let db = rocksdb::DB::open_default(db_dir.path()).unwrap();
200200+ let opts = rocksdb::backup::BackupEngineOptions::new(backup_dir.path()).unwrap();
201201+ let env = rocksdb::Env::new().unwrap();
202202+ let mut engine = rocksdb::backup::BackupEngine::open(&opts, &env).unwrap();
203203+204204+ for i in 0..50u32 {
205205+ db.put(
206206+ format!("key_{i:04}").as_bytes(),
207207+ format!("v1_{i}").as_bytes(),
208208+ )
209209+ .unwrap();
210210+ }
211211+ engine.create_new_backup_flush(&db, true).unwrap();
212212+213213+ for i in 50..100u32 {
214214+ db.put(
215215+ format!("key_{i:04}").as_bytes(),
216216+ format!("v2_{i}").as_bytes(),
217217+ )
218218+ .unwrap();
219219+ }
220220+ engine.create_new_backup_flush(&db, true).unwrap();
221221+ }
222222+223223+ let store = InMemory::new();
224224+ load_dir_into_store(&store, backup_dir.path()).await;
225225+226226+ // Restore backup 1 — should only have keys 0-49.
227227+ let dir1 = tempfile::tempdir().unwrap();
228228+ eat_rocks::restore(
229229+ Arc::new(store.fork()),
230230+ "",
231231+ dir1.path(),
232232+ eat_rocks::RestoreOptions {
233233+ backup_id: Some(1),
234234+ verify: true,
235235+ ..Default::default()
236236+ },
237237+ )
238238+ .await
239239+ .unwrap();
240240+ {
241241+ let db = rocksdb::DB::open_for_read_only(&rocksdb::Options::default(), dir1.path(), false)
242242+ .unwrap();
243243+ for i in 0..50u32 {
244244+ assert!(
245245+ db.get(format!("key_{i:04}").as_bytes()).unwrap().is_some(),
246246+ "backup 1 missing key_{i:04}"
247247+ );
248248+ }
249249+ for i in 50..100u32 {
250250+ assert!(
251251+ db.get(format!("key_{i:04}").as_bytes()).unwrap().is_none(),
252252+ "backup 1 should not have key_{i:04}"
253253+ );
254254+ }
255255+ }
256256+257257+ // Restore backup 2 — should have all 100 keys.
258258+ let dir2 = tempfile::tempdir().unwrap();
259259+ eat_rocks::restore(
260260+ Arc::new(store.fork()),
261261+ "",
262262+ dir2.path(),
263263+ eat_rocks::RestoreOptions {
264264+ backup_id: Some(2),
265265+ verify: true,
266266+ ..Default::default()
267267+ },
268268+ )
269269+ .await
270270+ .unwrap();
271271+ {
272272+ let db = rocksdb::DB::open_for_read_only(&rocksdb::Options::default(), dir2.path(), false)
273273+ .unwrap();
274274+ for i in 0..100u32 {
275275+ assert!(
276276+ db.get(format!("key_{i:04}").as_bytes()).unwrap().is_some(),
277277+ "backup 2 missing key_{i:04}"
278278+ );
279279+ }
280280+ }
281281+}
282282+283283+// ---------------------------------------------------------------------------
284284+// Oracle: compare our output with rocksdb's own restore
285285+// ---------------------------------------------------------------------------
286286+287287+#[tokio::test]
288288+async fn oracle_single_backup() {
289289+ let (_db, backup_dir) = make_backup(0..200);
290290+291291+ let oracle = tempfile::tempdir().unwrap();
292292+ rocksdb_restore(backup_dir.path(), oracle.path(), oracle.path(), 1).unwrap();
293293+294294+ let ours = tempfile::tempdir().unwrap();
295295+ our_restore(backup_dir.path(), ours.path(), ours.path(), 1)
296296+ .await
297297+ .unwrap();
298298+299299+ assert_dirs_equal("single backup", oracle.path(), ours.path());
300300+}
301301+302302+#[tokio::test]
303303+async fn oracle_incremental_backups() {
304304+ let db_dir = tempfile::tempdir().unwrap();
305305+ let backup_dir = tempfile::tempdir().unwrap();
306306+307307+ {
308308+ let db = rocksdb::DB::open_default(db_dir.path()).unwrap();
309309+ let opts = rocksdb::backup::BackupEngineOptions::new(backup_dir.path()).unwrap();
310310+ let env = rocksdb::Env::new().unwrap();
311311+ let mut engine = rocksdb::backup::BackupEngine::open(&opts, &env).unwrap();
312312+313313+ for i in 0..100u32 {
314314+ db.put(format!("k{i:04}").as_bytes(), b"batch1").unwrap();
315315+ }
316316+ engine.create_new_backup_flush(&db, true).unwrap();
317317+318318+ for i in 100..250u32 {
319319+ db.put(format!("k{i:04}").as_bytes(), b"batch2").unwrap();
320320+ }
321321+ engine.create_new_backup_flush(&db, true).unwrap();
322322+ }
323323+324324+ let o1 = tempfile::tempdir().unwrap();
325325+ rocksdb_restore(backup_dir.path(), o1.path(), o1.path(), 1).unwrap();
326326+ let u1 = tempfile::tempdir().unwrap();
327327+ our_restore(backup_dir.path(), u1.path(), u1.path(), 1)
328328+ .await
329329+ .unwrap();
330330+ assert_dirs_equal("backup 1", o1.path(), u1.path());
331331+332332+ let o2 = tempfile::tempdir().unwrap();
333333+ rocksdb_restore(backup_dir.path(), o2.path(), o2.path(), 2).unwrap();
334334+ let u2 = tempfile::tempdir().unwrap();
335335+ our_restore(backup_dir.path(), u2.path(), u2.path(), 2)
336336+ .await
337337+ .unwrap();
338338+ assert_dirs_equal("backup 2", o2.path(), u2.path());
339339+}
340340+341341+#[tokio::test]
342342+async fn oracle_separate_wal_dir() {
343343+ let db_dir = tempfile::tempdir().unwrap();
344344+ let backup_dir = tempfile::tempdir().unwrap();
345345+346346+ {
347347+ let db = rocksdb::DB::open_default(db_dir.path()).unwrap();
348348+ for i in 0..100u32 {
349349+ db.put(format!("k{i:04}").as_bytes(), b"v").unwrap();
350350+ }
351351+ let opts = rocksdb::backup::BackupEngineOptions::new(backup_dir.path()).unwrap();
352352+ let env = rocksdb::Env::new().unwrap();
353353+ let mut engine = rocksdb::backup::BackupEngine::open(&opts, &env).unwrap();
354354+ // No flush — WAL should contain unflushed data.
355355+ engine.create_new_backup(&db).unwrap();
356356+ }
357357+358358+ let o_db = tempfile::tempdir().unwrap();
359359+ let o_wal = tempfile::tempdir().unwrap();
360360+ rocksdb_restore(backup_dir.path(), o_db.path(), o_wal.path(), 1).unwrap();
361361+362362+ let u_db = tempfile::tempdir().unwrap();
363363+ let u_wal = tempfile::tempdir().unwrap();
364364+ our_restore(backup_dir.path(), u_db.path(), u_wal.path(), 1)
365365+ .await
366366+ .unwrap();
367367+368368+ assert_dirs_equal("db_dir", o_db.path(), u_db.path());
369369+ assert_dirs_equal("wal_dir", o_wal.path(), u_wal.path());
370370+}
371371+372372+#[tokio::test]
373373+async fn oracle_both_reject_nonexistent_backup() {
374374+ let (_db, backup_dir) = make_backup(0..10);
375375+376376+ let o = tempfile::tempdir().unwrap();
377377+ assert!(
378378+ rocksdb_restore(backup_dir.path(), o.path(), o.path(), 99).is_err(),
379379+ "rocksdb should reject backup 99"
380380+ );
381381+382382+ let u = tempfile::tempdir().unwrap();
383383+ assert!(
384384+ our_restore(backup_dir.path(), u.path(), u.path(), 99)
385385+ .await
386386+ .is_err(),
387387+ "our restore should reject backup 99"
388388+ );
389389+}
390390+391391+// ---------------------------------------------------------------------------
392392+// Failure cases
393393+// ---------------------------------------------------------------------------
394394+395395+#[tokio::test]
396396+async fn failure_missing_data_file() {
397397+ let (_db, backup_dir) = make_backup(0..100);
398398+399399+ let store = InMemory::new();
400400+ load_dir_into_store(&store, backup_dir.path()).await;
401401+402402+ // Find and delete a data file from the store.
403403+ let meta = eat_rocks::fetch_meta(&store, "", 1).await.unwrap();
404404+ let victim = meta
405405+ .files
406406+ .iter()
407407+ .find(|f| f.path.contains("shared_checksum") || f.path.contains("MANIFEST"))
408408+ .expect("no deletable file found");
409409+ store
410410+ .delete(&StorePath::from(victim.path.as_str()))
411411+ .await
412412+ .unwrap();
413413+414414+ let target = tempfile::tempdir().unwrap();
415415+ let result = eat_rocks::restore(
416416+ Arc::new(store),
417417+ "",
418418+ target.path(),
419419+ eat_rocks::RestoreOptions {
420420+ backup_id: Some(1),
421421+ ..Default::default()
422422+ },
423423+ )
424424+ .await;
425425+ assert!(
426426+ matches!(result, Err(Error::Fetch { .. })),
427427+ "expected Fetch error, got {result:?}"
428428+ );
429429+}
430430+431431+#[tokio::test]
432432+async fn failure_corrupt_crc_detected() {
433433+ let (_db, backup_dir) = make_backup(0..50);
434434+435435+ let store = InMemory::new();
436436+ load_dir_into_store(&store, backup_dir.path()).await;
437437+438438+ // Corrupt a data file in the store by overwriting it.
439439+ let meta = eat_rocks::fetch_meta(&store, "", 1).await.unwrap();
440440+ let victim = meta
441441+ .files
442442+ .iter()
443443+ .find(|f| f.path.starts_with("shared_checksum/"))
444444+ .expect("no shared_checksum file");
445445+ put(&store, &victim.path, b"corrupted contents!").await;
446446+447447+ let target = tempfile::tempdir().unwrap();
448448+ let result = eat_rocks::restore(
449449+ Arc::new(store),
450450+ "",
451451+ target.path(),
452452+ eat_rocks::RestoreOptions {
453453+ backup_id: Some(1),
454454+ verify: true,
455455+ concurrency: 1,
456456+ ..Default::default()
457457+ },
458458+ )
459459+ .await;
460460+ assert!(
461461+ matches!(result, Err(Error::ChecksumMismatch { .. })),
462462+ "expected ChecksumMismatch, got {result:?}"
463463+ );
464464+}