Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

fjall v3

pretty good speedup!

authored by

phil and committed by tangled.org 96d3481d e369f8f1

+47 -116
+40 -67
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 - name = "byteorder" 171 - version = "1.5.0" 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 174 175 175 [[package]] 176 176 name = "bytes" ··· 180 180 181 181 [[package]] 182 182 name = "byteview" 183 - version = "0.6.1" 183 + version = "0.10.0" 184 184 source = "registry+https://github.com/rust-lang/crates.io-index" 185 - checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 186 186 187 187 [[package]] 188 188 name = "cast" ··· 458 458 ] 459 459 460 460 [[package]] 461 - name = "double-ended-peekable" 462 - version = "0.1.0" 463 - source = "registry+https://github.com/rust-lang/crates.io-index" 464 - checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" 465 - 466 - [[package]] 467 461 name = "either" 468 462 version = "1.15.0" 469 463 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 528 522 529 523 [[package]] 530 524 name = "fjall" 531 - version = "2.11.2" 525 + version = "3.0.0" 532 526 source = "registry+https://github.com/rust-lang/crates.io-index" 533 - checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc" 527 + checksum = "4986f550347ed1666561f36e8bf1be3c97df72850ecef0140129da6e2d0aa911" 534 528 dependencies = [ 535 - "byteorder", 529 + "byteorder-lite", 536 530 "byteview", 537 531 "dashmap", 532 + "flume", 538 533 "log", 539 534 "lsm-tree", 540 - "path-absolutize", 541 - "std-semaphore", 542 535 "tempfile", 543 536 "xxhash-rust", 544 537 ] 545 538 546 539 [[package]] 540 + name = "flume" 541 + version = "0.12.0" 542 + source = "registry+https://github.com/rust-lang/crates.io-index" 543 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 544 + dependencies = [ 545 + "spin", 546 + ] 547 + 548 + [[package]] 547 549 name = "futures" 548 550 version = "0.3.31" 549 551 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 659 661 version = "0.32.3" 660 662 source = "registry+https://github.com/rust-lang/crates.io-index" 661 663 checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" 662 - 663 - [[package]] 664 - name = "guardian" 665 - version = "1.3.0" 666 - source = "registry+https://github.com/rust-lang/crates.io-index" 667 - checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f" 668 664 669 665 [[package]] 670 666 name = "half" ··· 826 822 827 823 [[package]] 828 824 name = "lsm-tree" 829 - version = "2.10.4" 825 + version = "3.0.0" 830 826 source = "registry+https://github.com/rust-lang/crates.io-index" 831 - checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15" 827 + checksum = "3a206e87e8bc38114045060ec1fc6bc4e4559748a37e9622b910d80e48863e87" 832 828 dependencies = [ 833 - "byteorder", 829 + "byteorder-lite", 830 + "byteview", 834 831 "crossbeam-skiplist", 835 - "double-ended-peekable", 836 832 "enum_dispatch", 837 - "guardian", 838 833 "interval-heap", 839 834 "log", 840 - "path-absolutize", 841 835 "quick_cache", 842 836 "rustc-hash", 843 837 "self_cell", 838 + "sfa", 844 839 "tempfile", 845 - "value-log", 846 840 "varint-rs", 847 841 "xxhash-rust", 848 842 ] ··· 967 961 ] 968 962 969 963 [[package]] 970 - name = "path-absolutize" 971 - version = "3.1.1" 972 - source = "registry+https://github.com/rust-lang/crates.io-index" 973 - checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" 974 - dependencies = [ 975 - "path-dedot", 976 - ] 977 - 978 - [[package]] 979 - name = "path-dedot" 980 - version = "3.1.1" 981 - source = "registry+https://github.com/rust-lang/crates.io-index" 982 - checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" 983 - dependencies = [ 984 - "once_cell", 985 - ] 986 - 987 - [[package]] 988 964 name = "pin-project-lite" 989 965 version = "0.2.16" 990 966 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1279 1255 ] 1280 1256 1281 1257 [[package]] 1258 + name = "sfa" 1259 + version = "1.0.0" 1260 + source = "registry+https://github.com/rust-lang/crates.io-index" 1261 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1262 + dependencies = [ 1263 + "byteorder-lite", 1264 + "log", 1265 + "xxhash-rust", 1266 + ] 1267 + 1268 + [[package]] 1282 1269 name = "sha2" 1283 1270 version = "0.10.9" 1284 1271 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1321 1308 ] 1322 1309 1323 1310 [[package]] 1324 - name = "std-semaphore" 1325 - version = "0.1.0" 1311 + name = "spin" 1312 + version = "0.9.8" 1326 1313 source = "registry+https://github.com/rust-lang/crates.io-index" 1327 - checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" 1314 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1315 + dependencies = [ 1316 + "lock_api", 1317 + ] 1328 1318 1329 1319 [[package]] 1330 1320 name = "strsim" ··· 1483 1473 version = "0.2.2" 1484 1474 source = "registry+https://github.com/rust-lang/crates.io-index" 1485 1475 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1486 - 1487 - [[package]] 1488 - name = "value-log" 1489 - version = "1.9.0" 1490 - source = "registry+https://github.com/rust-lang/crates.io-index" 1491 - checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c" 1492 - dependencies = [ 1493 - "byteorder", 1494 - "byteview", 1495 - "interval-heap", 1496 - "log", 1497 - "path-absolutize", 1498 - "rustc-hash", 1499 - "tempfile", 1500 - "varint-rs", 1501 - "xxhash-rust", 1502 - ] 1503 1476 1504 1477 [[package]] 1505 1478 name = "varint-rs"
+1 -1
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 - fjall = { version = "2.11.2", default-features = false } 11 + fjall = { version = "3.0.0", default-features = false } 12 12 futures = "0.3.31" 13 13 futures-core = "0.3.31" 14 14 ipld-core = { version = "0.4.2", features = ["serde"] }
+1 -3
examples/disk-read-file/main.rs
··· 82 82 83 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 84 85 - let driver = join.await?; 86 - 87 - driver.reset_store().await?; 85 + join.await?; 88 86 89 87 log::info!("done. n={n} zeros={zeros}"); 90 88
-3
readme.md
··· 50 50 total_size += size; 51 51 } 52 52 } 53 - 54 - // clean up the disk store (drop tables etc) 55 - driver.reset_store().await?; 56 53 } 57 54 }; 58 55 println!("sum of size of all records: {total_size}");
+5 -24
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 - use fjall::{Config, Error as FjallError, Keyspace, Partition, PartitionCreateOptions}; 21 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 22 use std::path::PathBuf; 23 23 24 24 #[derive(Debug, thiserror::Error)] ··· 93 93 /// On-disk block storage 94 94 pub struct DiskStore { 95 95 #[allow(unused)] 96 - db: Keyspace, 97 - partition: Partition, 96 + db: Database, 97 + partition: Keyspace, 98 98 max_stored: usize, 99 99 stored: usize, 100 100 } ··· 108 108 ) -> Result<Self, DiskError> { 109 109 let max_stored = max_stored_mb * 2_usize.pow(20); 110 110 let (db, partition) = tokio::task::spawn_blocking(move || { 111 - let db = Config::new(path) 111 + let db = Database::builder(path) 112 112 // .manual_journal_persist(true) 113 113 // .flush_workers(1) 114 114 // .compaction_workers(1) 115 115 .cache_size(cache_mb as u64 * 2_u64.pow(20)) 116 116 .temporary(true) 117 117 .open()?; 118 - let partition = Self::get_partition(&db)?; 118 + let partition = db.keyspace("z", KeyspaceCreateOptions::default)?; 119 119 120 120 Ok::<_, DiskError>((db, partition)) 121 121 }) ··· 129 129 }) 130 130 } 131 131 132 - /// Drop and recreate the kv table 133 - pub async fn reset(mut self) -> Result<Self, DiskError> { 134 - tokio::task::spawn_blocking(move || { 135 - let partition = self.partition; 136 - Self::reset_partition(&self.db, partition)?; 137 - self.partition = Self::get_partition(&self.db)?; 138 - Ok(self) 139 - }) 140 - .await? 141 - } 142 - 143 - fn get_partition(db: &Keyspace) -> Result<Partition, FjallError> { 144 - db.open_partition("z", PartitionCreateOptions::default()) 145 - } 146 - fn reset_partition(keyspace: &Keyspace, partition: Partition) -> Result<Partition, DiskError> { 147 - keyspace.delete_partition(partition)?; 148 - let partition = Self::get_partition(keyspace)?; 149 - Ok(partition) 150 - } 151 132 pub(crate) fn put_many( 152 133 &mut self, 153 134 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
-15
src/drive.rs
··· 446 446 /// println!("{rkey}: size={}", record.len()); 447 447 /// } 448 448 /// } 449 - /// let store = disk_driver.reset_store().await?; 450 449 /// # Ok(()) 451 450 /// # } 452 451 /// ``` ··· 559 558 /// } 560 559 /// 561 560 /// } 562 - /// let store = join.await?.reset_store().await?; 563 561 /// # Ok(()) 564 562 /// # } 565 563 /// ``` ··· 581 579 }); 582 580 583 581 (rx, chan_task) 584 - } 585 - 586 - /// Reset the disk storage so it can be reused. You must call this. 587 - /// 588 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 589 - /// calls, that would be risky in an async context. For now you just have to 590 - /// carefully make sure you call it. 591 - /// 592 - /// The sqlite store is returned, so it can be reused for another 593 - /// `DiskDriver`. 594 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 595 - let BigState { store, .. } = self.state.take().expect("valid state"); 596 - Ok(store.reset().await?) 597 582 } 598 583 }
-3
src/lib.rs
··· 53 53 total_size += size; 54 54 } 55 55 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 56 } 60 57 }; 61 58 println!("sum of size of all records: {total_size}");