Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

fjall 3.0.0 buggy

authored by

phil and committed by tangled.org 62433e68 9aa7acf5

+246 -66
+200 -1
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 + source = "registry+https://github.com/rust-lang/crates.io-index" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 + 175 + [[package]] 170 176 name = "bytes" 171 177 version = "1.10.1" 172 178 source = "registry+https://github.com/rust-lang/crates.io-index" 173 179 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 180 + 181 + [[package]] 182 + name = "byteview" 183 + version = "0.10.0" 184 + source = "registry+https://github.com/rust-lang/crates.io-index" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 174 186 175 187 [[package]] 176 188 name = "cast" ··· 281 293 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 294 283 295 [[package]] 296 + name = "compare" 297 + version = "0.0.6" 298 + source = "registry+https://github.com/rust-lang/crates.io-index" 299 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 300 + 301 + [[package]] 284 302 name = "const-str" 285 303 version = "0.4.3" 286 304 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 376 ] 359 377 360 378 [[package]] 379 + name = "crossbeam-skiplist" 380 + version = "0.1.3" 381 + source = "registry+https://github.com/rust-lang/crates.io-index" 382 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 383 + dependencies = [ 384 + "crossbeam-epoch", 385 + "crossbeam-utils", 386 + ] 387 + 388 + [[package]] 361 389 name = "crossbeam-utils" 362 390 version = "0.8.21" 363 391 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 377 405 dependencies = [ 378 406 "generic-array", 379 407 "typenum", 408 + ] 409 + 410 + [[package]] 411 + name = "dashmap" 412 + version = "6.1.0" 413 + source = "registry+https://github.com/rust-lang/crates.io-index" 414 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 415 + dependencies = [ 416 + "cfg-if", 417 + "crossbeam-utils", 418 + "hashbrown 0.14.5", 419 + "lock_api", 420 + "once_cell", 421 + "parking_lot_core", 380 422 ] 381 423 382 424 [[package]] ··· 422 464 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 465 424 466 [[package]] 467 + name = "enum_dispatch" 468 + version = "0.3.13" 469 + source = "registry+https://github.com/rust-lang/crates.io-index" 470 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 471 + dependencies = [ 472 + "once_cell", 473 + "proc-macro2", 474 + "quote", 475 + "syn 2.0.106", 476 + ] 477 + 478 + [[package]] 425 479 name = "env_filter" 426 480 version = "0.1.3" 427 481 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 443 497 "jiff", 444 498 "log", 445 499 ] 500 + 501 + [[package]] 502 + name = "equivalent" 503 + version = "1.0.2" 504 + source = "registry+https://github.com/rust-lang/crates.io-index" 505 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 446 506 447 507 [[package]] 448 508 name = "errno" ··· 471 531 version = "2.3.0" 472 532 source = "registry+https://github.com/rust-lang/crates.io-index" 473 533 checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 534 + 535 + [[package]] 536 + name = "fjall" 537 + version = "3.0.0" 538 + source = "registry+https://github.com/rust-lang/crates.io-index" 539 + checksum = "4986f550347ed1666561f36e8bf1be3c97df72850ecef0140129da6e2d0aa911" 540 + dependencies = [ 541 + "byteorder-lite", 542 + "byteview", 543 + "dashmap", 544 + "flume", 545 + "log", 546 + "lsm-tree", 547 + "lz4_flex", 548 + "tempfile", 549 + "xxhash-rust", 550 + ] 551 + 552 + [[package]] 553 + name = "flume" 554 + version = "0.12.0" 555 + source = "registry+https://github.com/rust-lang/crates.io-index" 556 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 557 + dependencies = [ 558 + "spin", 559 + ] 474 560 475 561 [[package]] 476 562 name = "foldhash" ··· 608 694 609 695 [[package]] 610 696 name = "hashbrown" 697 + version = "0.14.5" 698 + source = "registry+https://github.com/rust-lang/crates.io-index" 699 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 700 + 701 + [[package]] 702 + name = "hashbrown" 611 703 version = "0.15.5" 612 704 source = "registry+https://github.com/rust-lang/crates.io-index" 613 705 checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" ··· 616 708 ] 617 709 618 710 [[package]] 711 + name = "hashbrown" 712 + version = "0.16.1" 713 + source = "registry+https://github.com/rust-lang/crates.io-index" 714 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 715 + 716 + [[package]] 619 717 name = "hashlink" 620 718 version = "0.10.0" 621 719 source = "registry+https://github.com/rust-lang/crates.io-index" 622 720 checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 721 dependencies = [ 624 - "hashbrown", 722 + "hashbrown 0.15.5", 625 723 ] 626 724 627 725 [[package]] ··· 631 729 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 730 633 731 [[package]] 732 + name = "interval-heap" 733 + version = "0.0.5" 734 + source = "registry+https://github.com/rust-lang/crates.io-index" 735 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 736 + dependencies = [ 737 + "compare", 738 + ] 739 + 740 + [[package]] 634 741 name = "io-uring" 635 742 version = "0.7.10" 636 743 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 761 868 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 869 763 870 [[package]] 871 + name = "lsm-tree" 872 + version = "3.0.0" 873 + source = "registry+https://github.com/rust-lang/crates.io-index" 874 + checksum = "3a206e87e8bc38114045060ec1fc6bc4e4559748a37e9622b910d80e48863e87" 875 + dependencies = [ 876 + "byteorder-lite", 877 + "byteview", 878 + "crossbeam-skiplist", 879 + "enum_dispatch", 880 + "interval-heap", 881 + "log", 882 + "lz4_flex", 883 + "quick_cache", 884 + "rustc-hash", 885 + "self_cell", 886 + "sfa", 887 + "tempfile", 888 + "varint-rs", 889 + "xxhash-rust", 890 + ] 891 + 892 + [[package]] 893 + name = "lz4_flex" 894 + version = "0.11.5" 895 + source = "registry+https://github.com/rust-lang/crates.io-index" 896 + checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" 897 + dependencies = [ 898 + "twox-hash", 899 + ] 900 + 901 + [[package]] 764 902 name = "match-lookup" 765 903 version = "0.1.1" 766 904 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 950 1088 ] 951 1089 952 1090 [[package]] 1091 + name = "quick_cache" 1092 + version = "0.6.18" 1093 + source = "registry+https://github.com/rust-lang/crates.io-index" 1094 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1095 + dependencies = [ 1096 + "equivalent", 1097 + "hashbrown 0.16.1", 1098 + ] 1099 + 1100 + [[package]] 953 1101 name = "quote" 954 1102 version = "1.0.41" 955 1103 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1030 1178 "clap", 1031 1179 "criterion", 1032 1180 "env_logger", 1181 + "fjall", 1033 1182 "futures", 1034 1183 "futures-core", 1035 1184 "ipld-core", ··· 1067 1216 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 1217 1069 1218 [[package]] 1219 + name = "rustc-hash" 1220 + version = "2.1.1" 1221 + source = "registry+https://github.com/rust-lang/crates.io-index" 1222 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1223 + 1224 + [[package]] 1070 1225 name = "rustix" 1071 1226 version = "1.1.2" 1072 1227 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1105 1260 version = "1.2.0" 1106 1261 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 1262 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1263 + 1264 + [[package]] 1265 + name = "self_cell" 1266 + version = "1.2.2" 1267 + source = "registry+https://github.com/rust-lang/crates.io-index" 1268 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1108 1269 1109 1270 [[package]] 1110 1271 name = "serde" ··· 1172 1333 ] 1173 1334 1174 1335 [[package]] 1336 + name = "sfa" 1337 + version = "1.0.0" 1338 + source = "registry+https://github.com/rust-lang/crates.io-index" 1339 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1340 + dependencies = [ 1341 + "byteorder-lite", 1342 + "log", 1343 + "xxhash-rust", 1344 + ] 1345 + 1346 + [[package]] 1175 1347 name = "sha2" 1176 1348 version = "0.10.9" 1177 1349 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1214 1386 ] 1215 1387 1216 1388 [[package]] 1389 + name = "spin" 1390 + version = "0.9.8" 1391 + source = "registry+https://github.com/rust-lang/crates.io-index" 1392 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1393 + dependencies = [ 1394 + "lock_api", 1395 + ] 1396 + 1397 + [[package]] 1217 1398 name = "strsim" 1218 1399 version = "0.11.1" 1219 1400 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1336 1517 ] 1337 1518 1338 1519 [[package]] 1520 + name = "twox-hash" 1521 + version = "2.1.2" 1522 + source = "registry+https://github.com/rust-lang/crates.io-index" 1523 + checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" 1524 + 1525 + [[package]] 1339 1526 name = "typenum" 1340 1527 version = "1.19.0" 1341 1528 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1370 1557 version = "0.2.2" 1371 1558 source = "registry+https://github.com/rust-lang/crates.io-index" 1372 1559 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1560 + 1561 + [[package]] 1562 + name = "varint-rs" 1563 + version = "2.2.0" 1564 + source = "registry+https://github.com/rust-lang/crates.io-index" 1565 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1373 1566 1374 1567 [[package]] 1375 1568 name = "vcpkg" ··· 1659 1852 version = "0.46.0" 1660 1853 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 1854 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1855 + 1856 + [[package]] 1857 + name = "xxhash-rust" 1858 + version = "0.8.15" 1859 + source = "registry+https://github.com/rust-lang/crates.io-index" 1860 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1662 1861 1663 1862 [[package]] 1664 1863 name = "zerocopy"
+1
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 + fjall = "3.0.0" 11 12 futures = "0.3.31" 12 13 futures-core = "0.3.31" 13 14 ipld-core = { version = "0.4.2", features = ["serde"] }
-5
examples/disk-read-file/main.rs
··· 82 82 83 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 84 85 - // clean up the database. would be nice to do this in drop so it happens 86 - // automatically, but some blocking work happens, so that's not allowed in 87 - // async rust. 🤷‍♀️ 88 - join.await?.reset_store().await?; 89 - 90 85 log::info!("done. n={n} zeros={zeros}"); 91 86 92 87 Ok(())
+44 -56
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 21 + use fjall::{Database, Keyspace, KeyspaceCreateOptions, Error as FjallError}; 22 22 use std::path::PathBuf; 23 23 24 24 #[derive(Debug, thiserror::Error)] ··· 28 28 /// (The wrapped err should probably be obscured to remove public-facing 29 29 /// sqlite bits) 30 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 31 + DbError(#[from] FjallError), 32 32 /// A tokio blocking task failed to join 33 33 #[error("Failed to join a tokio blocking task: {0}")] 34 34 JoinError(#[from] tokio::task::JoinError), ··· 71 71 impl Default for DiskBuilder { 72 72 fn default() -> Self { 73 73 Self { 74 - cache_size_mb: 32, 74 + cache_size_mb: 64, 75 75 max_stored_mb: 10 * 1024, // 10 GiB 76 76 } 77 77 } ··· 84 84 } 85 85 /// Set the in-memory cache allowance for the database 86 86 /// 87 - /// Default: 32 MiB 87 + /// Default: 64 MiB 88 88 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 89 self.cache_size_mb = size; 90 90 self ··· 104 104 105 105 /// On-disk block storage 106 106 pub struct DiskStore { 107 - conn: rusqlite::Connection, 107 + #[allow(unused)] 108 + db: Database, 109 + ks: Keyspace, 108 110 max_stored: usize, 109 111 stored: usize, 110 112 } ··· 117 119 max_stored_mb: usize, 118 120 ) -> Result<Self, DiskError> { 119 121 let max_stored = max_stored_mb * 2_usize.pow(20); 120 - let conn = tokio::task::spawn_blocking(move || { 121 - let conn = rusqlite::Connection::open(path)?; 122 - 123 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 - 125 - // conn.pragma_update(None, "journal_mode", "OFF")?; 126 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 - conn.pragma_update(None, "journal_mode", "WAL")?; 128 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 - conn.pragma_update(None, "synchronous", "OFF")?; 130 - conn.pragma_update( 131 - None, 132 - "cache_size", 133 - (cache_mb as i64 * sqlite_one_mb).to_string(), 122 + let (db, ks) = tokio::task::spawn_blocking(move || { 123 + let db = Database::builder(path) 124 + // .manual_journal_persist(true) 125 + // .worker_threads(1) 126 + // .cache_size(cache_mb as u64 * 2_u64.pow(20)) 127 + // .temporary(true) 128 + .open()?; 129 + let ks = db.keyspace("z", || 130 + KeyspaceCreateOptions::default() 131 + // .expect_point_read_hits(true) 132 + // .manual_journal_persist(true) 134 133 )?; 135 - Self::reset_tables(&conn)?; 134 + 135 + // Self::reset_tables(&ks)?; 136 136 137 - Ok::<_, DiskError>(conn) 137 + Ok::<_, DiskError>((db, ks)) 138 138 }) 139 139 .await??; 140 140 141 141 Ok(Self { 142 - conn, 142 + db, 143 + ks, 143 144 max_stored, 144 145 stored: 0, 145 146 }) 146 147 } 147 148 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 - let tx = self.conn.transaction()?; 149 149 Ok(SqliteWriter { 150 - tx, 150 + ks: self.ks.clone(), 151 151 stored: &mut self.stored, 152 152 max: self.max_stored, 153 153 }) 154 154 } 155 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 - Ok(SqliteReader { select_stmt }) 155 + pub(crate) fn get_reader(&self) -> Result<SqliteReader, DiskError> { 156 + Ok(SqliteReader { 157 + ks: self.ks.clone(), 158 + }) 158 159 } 159 160 /// Drop and recreate the kv table 160 161 pub async fn reset(self) -> Result<Self, DiskError> { 161 162 tokio::task::spawn_blocking(move || { 162 - Self::reset_tables(&self.conn)?; 163 + Self::reset_tables(&self.ks)?; 163 164 Ok(self) 164 165 }) 165 166 .await? 166 167 } 167 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 - conn.execute( 170 - "CREATE TABLE blocks ( 171 - key BLOB PRIMARY KEY NOT NULL, 172 - val BLOB NOT NULL 173 - ) WITHOUT ROWID", 174 - (), 175 - )?; 168 + fn reset_tables(ks: &Keyspace) -> Result<(), DiskError> { 169 + ks.clear()?; 176 170 Ok(()) 177 171 } 178 172 } 179 173 180 - pub(crate) struct SqliteWriter<'conn> { 181 - tx: rusqlite::Transaction<'conn>, 182 - stored: &'conn mut usize, 174 + pub(crate) struct SqliteWriter<'a> { 175 + ks: Keyspace, 176 + stored: &'a mut usize, 183 177 max: usize, 184 178 } 185 179 ··· 188 182 &mut self, 189 183 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 184 ) -> Result<(), DriveError> { 191 - let mut insert_stmt = self 192 - .tx 193 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 - .map_err(DiskError::DbError)?; 195 185 for pair in kv { 196 186 let (k, v) = pair?; 197 187 *self.stored += v.len(); 198 188 if *self.stored > self.max { 199 189 return Err(DiskError::MaxSizeExceeded.into()); 200 190 } 201 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 191 + self.ks.insert(k, v).map_err(DiskError::DbError)?; 202 192 } 203 193 Ok(()) 204 194 } 205 - pub fn commit(self) -> Result<(), DiskError> { 206 - self.tx.commit()?; 207 - Ok(()) 208 - } 209 195 } 210 196 211 - pub(crate) struct SqliteReader<'conn> { 212 - select_stmt: rusqlite::Statement<'conn>, 197 + pub(crate) struct SqliteReader { 198 + ks: Keyspace, 213 199 } 214 200 215 - impl SqliteReader<'_> { 216 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 - self.select_stmt 218 - .query_one((&key,), |row| row.get(0)) 219 - .optional() 201 + impl SqliteReader { 202 + pub(crate) fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, FjallError> { 203 + let rv = self 204 + .ks 205 + .get(&key)? 206 + .map(|v| v.as_ref().into()); 207 + Ok(rv) 220 208 } 221 209 }
-3
src/drive.rs
··· 343 343 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 344 345 345 writer.put_many(kvs)?; 346 - writer.commit()?; 347 346 Ok::<_, DriveError>(store) 348 347 }) 349 348 .await??; ··· 359 358 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 359 writer.put_many(kvs)?; 361 360 } 362 - 363 - writer.commit()?; 364 361 Ok::<_, DriveError>(store) 365 362 }); // await later 366 363
+1 -1
src/walk.rs
··· 19 19 #[error("Action node error: {0}")] 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 22 + StorageError(#[from] fjall::Error), 23 23 #[error("Decode error: {0}")] 24 24 DecodeError(#[from] DecodeError), 25 25 }