Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

try candystore (ehhhhhhh.)

phil dcfc15cc 9aa7acf5

+289 -147
+228
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 + name = "bytemuck" 171 + version = "1.24.0" 172 + source = "registry+https://github.com/rust-lang/crates.io-index" 173 + checksum = "1fbdf580320f38b612e485521afda1ee26d10cc9884efaaa750d383e13e3c5f4" 174 + dependencies = [ 175 + "bytemuck_derive", 176 + ] 177 + 178 + [[package]] 179 + name = "bytemuck_derive" 180 + version = "1.10.2" 181 + source = "registry+https://github.com/rust-lang/crates.io-index" 182 + checksum = "f9abbd1bc6865053c427f7198e6af43bfdedc55ab791faed4fbd361d789575ff" 183 + dependencies = [ 184 + "proc-macro2", 185 + "quote", 186 + "syn 2.0.106", 187 + ] 188 + 189 + [[package]] 170 190 name = "bytes" 171 191 version = "1.10.1" 172 192 source = "registry+https://github.com/rust-lang/crates.io-index" 173 193 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 194 + 195 + [[package]] 196 + name = "candystore" 197 + version = "0.5.6" 198 + source = "registry+https://github.com/rust-lang/crates.io-index" 199 + checksum = "e015e4215c0e855880a745ed0d9be7f2d8f49d0426006ccbc66b7b0e10a1bd1a" 200 + dependencies = [ 201 + "anyhow", 202 + "bytemuck", 203 + "crossbeam-channel", 204 + "databuf", 205 + "fslock", 206 + "libc", 207 + "memmap", 208 + "parking_lot", 209 + "rand", 210 + "simd-itertools", 211 + "siphasher", 212 + "uuid", 213 + ] 174 214 175 215 [[package]] 176 216 name = "cast" ··· 339 379 ] 340 380 341 381 [[package]] 382 + name = "crossbeam-channel" 383 + version = "0.5.15" 384 + source = "registry+https://github.com/rust-lang/crates.io-index" 385 + checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" 386 + dependencies = [ 387 + "crossbeam-utils", 388 + ] 389 + 390 + [[package]] 342 391 name = "crossbeam-deque" 343 392 version = "0.8.6" 344 393 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 406 455 ] 407 456 408 457 [[package]] 458 + name = "databuf" 459 + version = "0.5.0" 460 + source = "registry+https://github.com/rust-lang/crates.io-index" 461 + checksum = "9e1ad1d99bee317a8dac0b7cd86896c5a5f24307009292985dabbf3e412c8b9d" 462 + dependencies = [ 463 + "databuf-derive", 464 + ] 465 + 466 + [[package]] 467 + name = "databuf-derive" 468 + version = "0.5.0" 469 + source = "registry+https://github.com/rust-lang/crates.io-index" 470 + checksum = "04040c9fc8fcb4084222a26c99faf5b3014772a6115e076b7a50fe49bf25d0ea" 471 + dependencies = [ 472 + "databuf_derive_impl", 473 + ] 474 + 475 + [[package]] 476 + name = "databuf_derive_impl" 477 + version = "0.2.3" 478 + source = "registry+https://github.com/rust-lang/crates.io-index" 479 + checksum = "daf656eb071fe87d23716f933788a35a8ad6baa6fdbf66a67a261dbd3f9dc81a" 480 + dependencies = [ 481 + "quote2", 482 + "syn 2.0.106", 483 + ] 484 + 485 + [[package]] 409 486 name = "digest" 410 487 version = "0.10.7" 411 488 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 477 554 version = "0.1.5" 478 555 source = "registry+https://github.com/rust-lang/crates.io-index" 479 556 checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 557 + 558 + [[package]] 559 + name = "fslock" 560 + version = "0.2.1" 561 + source = "registry+https://github.com/rust-lang/crates.io-index" 562 + checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb" 563 + dependencies = [ 564 + "libc", 565 + "winapi", 566 + ] 480 567 481 568 [[package]] 482 569 name = "futures" ··· 778 865 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 779 866 780 867 [[package]] 868 + name = "memmap" 869 + version = "0.7.0" 870 + source = "registry+https://github.com/rust-lang/crates.io-index" 871 + checksum = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b" 872 + dependencies = [ 873 + "libc", 874 + "winapi", 875 + ] 876 + 877 + [[package]] 781 878 name = "miniz_oxide" 782 879 version = "0.8.9" 783 880 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 821 918 ] 822 919 823 920 [[package]] 921 + name = "multiversion" 922 + version = "0.8.0" 923 + source = "registry+https://github.com/rust-lang/crates.io-index" 924 + checksum = "7edb7f0ff51249dfda9ab96b5823695e15a052dc15074c9dbf3d118afaf2c201" 925 + dependencies = [ 926 + "multiversion-macros", 927 + "target-features", 928 + ] 929 + 930 + [[package]] 931 + name = "multiversion-macros" 932 + version = "0.8.0" 933 + source = "registry+https://github.com/rust-lang/crates.io-index" 934 + checksum = "b093064383341eb3271f42e381cb8f10a01459478446953953c75d24bd339fc0" 935 + dependencies = [ 936 + "proc-macro2", 937 + "quote", 938 + "syn 2.0.106", 939 + "target-features", 940 + ] 941 + 942 + [[package]] 824 943 name = "num-traits" 825 944 version = "0.2.19" 826 945 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 941 1060 ] 942 1061 943 1062 [[package]] 1063 + name = "ppv-lite86" 1064 + version = "0.2.21" 1065 + source = "registry+https://github.com/rust-lang/crates.io-index" 1066 + checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" 1067 + dependencies = [ 1068 + "zerocopy", 1069 + ] 1070 + 1071 + [[package]] 944 1072 name = "proc-macro2" 945 1073 version = "1.0.101" 946 1074 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 957 1085 dependencies = [ 958 1086 "proc-macro2", 959 1087 ] 1088 + 1089 + [[package]] 1090 + name = "quote2" 1091 + version = "0.7.0" 1092 + source = "registry+https://github.com/rust-lang/crates.io-index" 1093 + checksum = "970573b86f7e5795c8c6c50c56ef602368593f0687188da27fd489a59e253630" 1094 + dependencies = [ 1095 + "proc-macro2", 1096 + "quote", 1097 + "quote2-macros", 1098 + ] 1099 + 1100 + [[package]] 1101 + name = "quote2-macros" 1102 + version = "0.7.0" 1103 + source = "registry+https://github.com/rust-lang/crates.io-index" 1104 + checksum = "5f4b89c37b2d870a28629ad20da669bb0e7d7214878d0d5111b304aa466e1977" 960 1105 961 1106 [[package]] 962 1107 name = "r-efi" ··· 965 1110 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 966 1111 967 1112 [[package]] 1113 + name = "rand" 1114 + version = "0.9.2" 1115 + source = "registry+https://github.com/rust-lang/crates.io-index" 1116 + checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" 1117 + dependencies = [ 1118 + "rand_chacha", 1119 + "rand_core", 1120 + ] 1121 + 1122 + [[package]] 1123 + name = "rand_chacha" 1124 + version = "0.9.0" 1125 + source = "registry+https://github.com/rust-lang/crates.io-index" 1126 + checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" 1127 + dependencies = [ 1128 + "ppv-lite86", 1129 + "rand_core", 1130 + ] 1131 + 1132 + [[package]] 1133 + name = "rand_core" 1134 + version = "0.9.3" 1135 + source = "registry+https://github.com/rust-lang/crates.io-index" 1136 + checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" 1137 + dependencies = [ 1138 + "getrandom", 1139 + ] 1140 + 1141 + [[package]] 968 1142 name = "rayon" 969 1143 version = "1.11.0" 970 1144 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1027 1201 version = "0.2.2" 1028 1202 dependencies = [ 1029 1203 "bincode", 1204 + "candystore", 1030 1205 "clap", 1031 1206 "criterion", 1032 1207 "env_logger", ··· 1192 1367 ] 1193 1368 1194 1369 [[package]] 1370 + name = "simd-itertools" 1371 + version = "0.3.0" 1372 + source = "registry+https://github.com/rust-lang/crates.io-index" 1373 + checksum = "a037ed5ba0cb7102a5b720453b642c5b2cf39960edd2ceace91af8ec3743082a" 1374 + dependencies = [ 1375 + "multiversion", 1376 + ] 1377 + 1378 + [[package]] 1379 + name = "siphasher" 1380 + version = "1.0.1" 1381 + source = "registry+https://github.com/rust-lang/crates.io-index" 1382 + checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" 1383 + 1384 + [[package]] 1195 1385 name = "slab" 1196 1386 version = "0.4.11" 1197 1387 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1240 1430 "quote", 1241 1431 "unicode-ident", 1242 1432 ] 1433 + 1434 + [[package]] 1435 + name = "target-features" 1436 + version = "0.1.6" 1437 + source = "registry+https://github.com/rust-lang/crates.io-index" 1438 + checksum = "c1bbb9f3c5c463a01705937a24fdabc5047929ac764b2d5b9cf681c1f5041ed5" 1243 1439 1244 1440 [[package]] 1245 1441 name = "tempfile" ··· 1372 1568 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1569 1374 1570 [[package]] 1571 + name = "uuid" 1572 + version = "1.19.0" 1573 + source = "registry+https://github.com/rust-lang/crates.io-index" 1574 + checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" 1575 + dependencies = [ 1576 + "js-sys", 1577 + "wasm-bindgen", 1578 + ] 1579 + 1580 + [[package]] 1375 1581 name = "vcpkg" 1376 1582 version = "0.2.15" 1377 1583 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1493 1699 ] 1494 1700 1495 1701 [[package]] 1702 + name = "winapi" 1703 + version = "0.3.9" 1704 + source = "registry+https://github.com/rust-lang/crates.io-index" 1705 + checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" 1706 + dependencies = [ 1707 + "winapi-i686-pc-windows-gnu", 1708 + "winapi-x86_64-pc-windows-gnu", 1709 + ] 1710 + 1711 + [[package]] 1712 + name = "winapi-i686-pc-windows-gnu" 1713 + version = "0.4.0" 1714 + source = "registry+https://github.com/rust-lang/crates.io-index" 1715 + checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" 1716 + 1717 + [[package]] 1496 1718 name = "winapi-util" 1497 1719 version = "0.1.11" 1498 1720 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1500 1722 dependencies = [ 1501 1723 "windows-sys 0.60.2", 1502 1724 ] 1725 + 1726 + [[package]] 1727 + name = "winapi-x86_64-pc-windows-gnu" 1728 + version = "0.4.0" 1729 + source = "registry+https://github.com/rust-lang/crates.io-index" 1730 + checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" 1503 1731 1504 1732 [[package]] 1505 1733 name = "windows-link"
+1
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 + candystore = "0.5.6" 11 12 futures = "0.3.31" 12 13 futures-core = "0.3.31" 13 14 ipld-core = { version = "0.4.2", features = ["serde"] }
+1 -1
examples/disk-read-file/main.rs
··· 43 43 // disk to continue 44 44 45 45 // set up a disk store we can spill to 46 - let disk_store = DiskBuilder::new().open(tmpfile).await?; 46 + let disk_store = DiskBuilder::new().open(tmpfile, Some(big_stuff.keys_hint())).await?; 47 47 48 48 // do the spilling, get back a (similar) driver 49 49 let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
+43 -105
src/disk.rs
··· 9 9 # #[tokio::main] 10 10 # async fn main() -> Result<(), DiskError> { 11 11 let store = DiskBuilder::new() 12 - .with_cache_size_mb(32) 13 12 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 13 .open("/some/path.db".into()).await?; 15 14 # Ok(()) ··· 18 17 */ 19 18 20 19 use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 20 + use candystore::{CandyError, CandyStore, Config}; 22 21 use std::path::PathBuf; 23 22 24 23 #[derive(Debug, thiserror::Error)] ··· 28 27 /// (The wrapped err should probably be obscured to remove public-facing 29 28 /// sqlite bits) 30 29 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 30 + DbError(#[from] CandyError), 31 + /// Unfortunately candystore uses anyhow::Result for it's open call 32 + #[error("Failed on a db call, see logs")] 33 + DbGarbageError, 32 34 /// A tokio blocking task failed to join 33 35 #[error("Failed to join a tokio blocking task: {0}")] 34 36 JoinError(#[from] tokio::task::JoinError), ··· 38 40 /// limit. 39 41 #[error("Maximum disk size reached")] 40 42 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 43 } 54 44 55 45 /// Builder-style disk store setup 56 46 #[derive(Debug, Clone)] 57 47 pub struct DiskBuilder { 58 - /// Database in-memory cache allowance 59 - /// 60 - /// Default: 32 MiB 61 - pub cache_size_mb: usize, 62 48 /// Database stored block size limit 63 49 /// 64 50 /// Default: 10 GiB ··· 71 57 impl Default for DiskBuilder { 72 58 fn default() -> Self { 73 59 Self { 74 - cache_size_mb: 32, 75 60 max_stored_mb: 10 * 1024, // 10 GiB 76 61 } 77 62 } ··· 82 67 pub fn new() -> Self { 83 68 Default::default() 84 69 } 85 - /// Set the in-memory cache allowance for the database 86 - /// 87 - /// Default: 32 MiB 88 - pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 - self.cache_size_mb = size; 90 - self 91 - } 92 70 /// Set the approximate stored block size limit 93 71 /// 94 72 /// Default: 10 GiB ··· 97 75 self 98 76 } 99 77 /// Open and initialize the actual disk storage 100 - pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 101 - DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 78 + pub async fn open(&self, path: PathBuf, keys_hint: Option<usize>) -> Result<DiskStore, DiskError> { 79 + DiskStore::new(path, self.max_stored_mb, keys_hint).await 102 80 } 103 81 } 104 82 105 83 /// On-disk block storage 106 84 pub struct DiskStore { 107 - conn: rusqlite::Connection, 85 + db: CandyStore, 108 86 max_stored: usize, 109 87 stored: usize, 110 88 } 111 89 112 90 impl DiskStore { 113 91 /// Initialize a new disk store 114 - pub async fn new( 115 - path: PathBuf, 116 - cache_mb: usize, 117 - max_stored_mb: usize, 118 - ) -> Result<Self, DiskError> { 92 + pub async fn new(path: PathBuf, max_stored_mb: usize, keys_hint: Option<usize>) -> Result<Self, DiskError> { 119 93 let max_stored = max_stored_mb * 2_usize.pow(20); 120 - let conn = tokio::task::spawn_blocking(move || { 121 - let conn = rusqlite::Connection::open(path)?; 122 - 123 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 - 125 - // conn.pragma_update(None, "journal_mode", "OFF")?; 126 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 - conn.pragma_update(None, "journal_mode", "WAL")?; 128 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 - conn.pragma_update(None, "synchronous", "OFF")?; 130 - conn.pragma_update( 131 - None, 132 - "cache_size", 133 - (cache_mb as i64 * sqlite_one_mb).to_string(), 134 - )?; 135 - Self::reset_tables(&conn)?; 94 + let db = tokio::task::spawn_blocking(move || { 95 + let mut conf = Config::default(); 96 + // conf.max_shard_size = 256 * 1024 * 1024; 97 + // conf.min_compaction_threashold = 32 * 1024 * 1024; 98 + // conf.expected_number_of_keys = 1_200_000; 99 + if let Some(hint) = keys_hint { 100 + conf.expected_number_of_keys = hint; 101 + } 102 + conf.num_compaction_threads = 1; 103 + let db = CandyStore::open(path, conf).map_err(|e| { 104 + log::error!("{e:?}"); 105 + DiskError::DbGarbageError 106 + })?; 136 107 137 - Ok::<_, DiskError>(conn) 108 + Ok::<_, DiskError>(db) 138 109 }) 139 110 .await??; 140 111 141 112 Ok(Self { 142 - conn, 113 + db, 143 114 max_stored, 144 115 stored: 0, 145 116 }) 146 117 } 147 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 - let tx = self.conn.transaction()?; 149 - Ok(SqliteWriter { 150 - tx, 151 - stored: &mut self.stored, 152 - max: self.max_stored, 153 - }) 154 - } 155 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 - Ok(SqliteReader { select_stmt }) 158 - } 118 + 159 119 /// Drop and recreate the kv table 160 120 pub async fn reset(self) -> Result<Self, DiskError> { 161 121 tokio::task::spawn_blocking(move || { 162 - Self::reset_tables(&self.conn)?; 122 + Self::reset_tables(&self.db)?; 163 123 Ok(self) 164 124 }) 165 125 .await? 166 126 } 167 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 - conn.execute( 170 - "CREATE TABLE blocks ( 171 - key BLOB PRIMARY KEY NOT NULL, 172 - val BLOB NOT NULL 173 - ) WITHOUT ROWID", 174 - (), 175 - )?; 127 + fn reset_tables(db: &CandyStore) -> Result<(), DiskError> { 128 + db.clear().map_err(|e| { 129 + log::error!("{e:?}"); 130 + DiskError::DbGarbageError 131 + })?; 132 + 176 133 Ok(()) 177 134 } 178 - } 179 135 180 - pub(crate) struct SqliteWriter<'conn> { 181 - tx: rusqlite::Transaction<'conn>, 182 - stored: &'conn mut usize, 183 - max: usize, 184 - } 185 - 186 - impl SqliteWriter<'_> { 187 136 pub(crate) fn put_many( 188 137 &mut self, 189 138 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 139 ) -> Result<(), DriveError> { 191 - let mut insert_stmt = self 192 - .tx 193 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 - .map_err(DiskError::DbError)?; 195 140 for pair in kv { 196 141 let (k, v) = pair?; 197 - *self.stored += v.len(); 198 - if *self.stored > self.max { 142 + self.stored += v.len(); 143 + if self.stored > self.max_stored { 199 144 return Err(DiskError::MaxSizeExceeded.into()); 200 145 } 201 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 146 + self.db.owned_set(k, &v).map_err(|e| { 147 + log::error!("{e:?}"); 148 + DiskError::DbGarbageError 149 + })?; 202 150 } 203 151 Ok(()) 204 152 } 205 - pub fn commit(self) -> Result<(), DiskError> { 206 - self.tx.commit()?; 207 - Ok(()) 208 - } 209 - } 210 - 211 - pub(crate) struct SqliteReader<'conn> { 212 - select_stmt: rusqlite::Statement<'conn>, 213 - } 214 - 215 - impl SqliteReader<'_> { 216 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 - self.select_stmt 218 - .query_one((&key,), |row| row.get(0)) 219 - .optional() 153 + pub(crate) fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, DiskError> { 154 + self.db.owned_get(key).map_err(|e| { 155 + log::error!("{e:?}"); 156 + DiskError::DbGarbageError 157 + }) 220 158 } 221 159 }
+10 -36
src/drive.rs
··· 315 315 pub commit: Option<Commit>, 316 316 } 317 317 318 + impl<R: AsyncRead + Unpin, T: Processable> NeedDisk<R, T> { 319 + pub fn keys_hint(&self) -> usize { 320 + self.mem_blocks.len() * 10 321 + } 322 + } 323 + 318 324 fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 319 325 bincode::serde::encode_to_vec(v, bincode::config::standard()) 320 326 } ··· 335 341 // move store in and back out so we can manage lifetimes 336 342 // dump mem blocks into the store 337 343 store = tokio::task::spawn(async move { 338 - let mut writer = store.get_writer()?; 339 - 340 344 let kvs = self 341 345 .mem_blocks 342 346 .into_iter() 343 347 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 348 345 - writer.put_many(kvs)?; 346 - writer.commit()?; 349 + store.put_many(kvs)?; 347 350 Ok::<_, DriveError>(store) 348 351 }) 349 352 .await??; ··· 351 354 let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 352 355 353 356 let store_worker = tokio::task::spawn_blocking(move || { 354 - let mut writer = store.get_writer()?; 355 - 356 357 while let Some(chunk) = rx.blocking_recv() { 357 358 let kvs = chunk 358 359 .into_iter() 359 360 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 - writer.put_many(kvs)?; 361 + store.put_many(kvs)?; 361 362 } 362 363 363 - writer.commit()?; 364 364 Ok::<_, DriveError>(store) 365 365 }); // await later 366 366 ··· 468 468 // comes out again. 469 469 let (state, res) = tokio::task::spawn_blocking( 470 470 move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 - let mut reader_res = state.store.get_reader(); 472 - let reader: &mut _ = match reader_res { 473 - Ok(ref mut r) => r, 474 - Err(ref mut e) => { 475 - // unfortunately we can't return the error directly because 476 - // (for some reason) it's attached to the lifetime of the 477 - // reader? 478 - // hack a mem::swap so we can get it out :/ 479 - let e_swapped = e.steal(); 480 - // the pain: `state` *has to* outlive the reader 481 - drop(reader_res); 482 - return (state, Err(e_swapped.into())); 483 - } 484 - }; 485 - 486 471 let mut out = Vec::with_capacity(n); 487 472 488 473 for _ in 0..n { 489 474 // walk as far as we can until we run out of blocks or find a record 490 - let step = match state.walker.disk_step(reader, process) { 475 + let step = match state.walker.disk_step(&mut state.store, process) { 491 476 Ok(s) => s, 492 477 Err(e) => { 493 - // the pain: `state` *has to* outlive the reader 494 - drop(reader_res); 495 478 return (state, Err(e.into())); 496 479 } 497 480 }; 498 481 match step { 499 482 Step::Missing(cid) => { 500 - // the pain: `state` *has to* outlive the reader 501 - drop(reader_res); 502 483 return (state, Err(DriveError::MissingBlock(cid))); 503 484 } 504 485 Step::Finish => break, 505 486 Step::Found { rkey, data } => out.push((rkey, data)), 506 487 }; 507 488 } 508 - 509 - // `state` *has to* outlive the reader 510 - drop(reader_res); 511 489 512 490 (state, Ok::<_, DriveError>(out)) 513 491 }, ··· 532 510 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 511 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 534 512 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 - let mut reader = match store.get_reader() { 536 - Ok(r) => r, 537 - Err(e) => return tx.blocking_send(Err(e.into())), 538 - }; 539 513 540 514 loop { 541 515 let mut out: BlockChunk<T> = Vec::with_capacity(n); ··· 543 517 for _ in 0..n { 544 518 // walk as far as we can until we run out of blocks or find a record 545 519 546 - let step = match walker.disk_step(&mut reader, self.process) { 520 + let step = match walker.disk_step(store, self.process) { 547 521 Ok(s) => s, 548 522 Err(e) => return tx.blocking_send(Err(e.into())), 549 523 };
+6 -5
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk::SqliteReader; 3 + use crate::DiskError; 4 + use crate::DiskStore; 4 5 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 6 use crate::mst::Node; 6 7 use crate::process::Processable; ··· 19 20 #[error("Action node error: {0}")] 20 21 MstError(#[from] MstError), 21 22 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 + StorageError(#[from] DiskError), 23 24 #[error("Decode error: {0}")] 24 25 DecodeError(#[from] DecodeError), 25 26 } ··· 239 240 /// blocking!!!!!! 240 241 pub fn disk_step<T: Processable>( 241 242 &mut self, 242 - reader: &mut SqliteReader, 243 + db: &mut DiskStore, 243 244 process: impl Fn(Vec<u8>) -> T, 244 245 ) -> Result<Step<T>, WalkError> { 245 246 loop { ··· 252 253 &mut Need::Node { depth, cid } => { 253 254 let cid_bytes = cid.to_bytes(); 254 255 log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(cid_bytes)? else { 256 + let Some(block_bytes) = db.get(cid_bytes)? else { 256 257 log::trace!("node not found, resting"); 257 258 return Ok(Step::Missing(cid)); 258 259 }; ··· 274 275 Need::Record { rkey, cid } => { 275 276 log::trace!("need record {cid:?}"); 276 277 let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(cid_bytes)? else { 278 + let Some(data_bytes) = db.get(cid_bytes)? else { 278 279 log::trace!("record block not found, resting"); 279 280 return Ok(Step::Missing(*cid)); 280 281 };