STreaming ARchives: stricter, verifiable, deterministic, highly compressible alternatives to CAR files for atproto repositories.
atproto car
9
fork

Configure Feed

Select the types of activity you want to include in your feed.

testing it out

phil 7053dfc9 c41699d3

+390 -31
+97
Cargo.lock
··· 27 27 ] 28 28 29 29 [[package]] 30 + name = "async-channel" 31 + version = "2.5.0" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" 34 + dependencies = [ 35 + "concurrent-queue", 36 + "event-listener-strategy", 37 + "futures-core", 38 + "pin-project-lite", 39 + ] 40 + 41 + [[package]] 30 42 name = "backtrace" 31 43 version = "0.3.76" 32 44 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 88 100 ] 89 101 90 102 [[package]] 103 + name = "cc" 104 + version = "1.2.53" 105 + source = "registry+https://github.com/rust-lang/crates.io-index" 106 + checksum = "755d2fce177175ffca841e9a06afdb2c4ab0f593d53b4dee48147dfaade85932" 107 + dependencies = [ 108 + "find-msvc-tools", 109 + "shlex", 110 + ] 111 + 112 + [[package]] 91 113 name = "cfg-if" 92 114 version = "1.0.4" 93 115 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 108 130 ] 109 131 110 132 [[package]] 133 + name = "concurrent-queue" 134 + version = "2.5.0" 135 + source = "registry+https://github.com/rust-lang/crates.io-index" 136 + checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" 137 + dependencies = [ 138 + "crossbeam-utils", 139 + ] 140 + 141 + [[package]] 111 142 name = "const-str" 112 143 version = "0.4.3" 113 144 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 130 161 dependencies = [ 131 162 "libc", 132 163 ] 164 + 165 + [[package]] 166 + name = "crossbeam-utils" 167 + version = "0.8.21" 168 + source = "registry+https://github.com/rust-lang/crates.io-index" 169 + checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" 133 170 134 171 [[package]] 135 172 name = "crypto-common" ··· 188 225 ] 189 226 190 227 [[package]] 228 + name = "event-listener" 229 + version = "5.4.1" 230 + source = "registry+https://github.com/rust-lang/crates.io-index" 231 + checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" 232 + dependencies = [ 233 + "concurrent-queue", 234 + "parking", 235 + "pin-project-lite", 236 + ] 237 + 238 + [[package]] 239 + name = "event-listener-strategy" 240 + version = "0.5.4" 241 + source = "registry+https://github.com/rust-lang/crates.io-index" 242 + checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" 243 + dependencies = [ 244 + "event-listener", 245 + "pin-project-lite", 246 + ] 247 + 248 + [[package]] 249 + name = "find-msvc-tools" 250 + version = "0.1.8" 251 + source = "registry+https://github.com/rust-lang/crates.io-index" 252 + checksum = "8591b0bcc8a98a64310a2fae1bb3e9b8564dd10e381e6e28010fde8e8e8568db" 253 + 254 + [[package]] 191 255 name = "futures" 192 256 version = "0.3.31" 193 257 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 326 390 checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" 327 391 328 392 [[package]] 393 + name = "libmimalloc-sys" 394 + version = "0.1.44" 395 + source = "registry+https://github.com/rust-lang/crates.io-index" 396 + checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" 397 + dependencies = [ 398 + "cc", 399 + "libc", 400 + ] 401 + 402 + [[package]] 329 403 name = "lock_api" 330 404 version = "0.4.14" 331 405 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 350 424 version = "2.7.6" 351 425 source = "registry+https://github.com/rust-lang/crates.io-index" 352 426 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 427 + 428 + [[package]] 429 + name = "mimalloc" 430 + version = "0.1.48" 431 + source = "registry+https://github.com/rust-lang/crates.io-index" 432 + checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" 433 + dependencies = [ 434 + "libmimalloc-sys", 435 + ] 353 436 354 437 [[package]] 355 438 name = "miniz_oxide" ··· 404 487 ] 405 488 406 489 [[package]] 490 + name = "parking" 491 + version = "2.2.1" 492 + source = "registry+https://github.com/rust-lang/crates.io-index" 493 + checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" 494 + 495 + [[package]] 407 496 name = "parking_lot" 408 497 version = "0.12.5" 409 498 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 541 630 ] 542 631 543 632 [[package]] 633 + name = "shlex" 634 + version = "1.3.0" 635 + source = "registry+https://github.com/rust-lang/crates.io-index" 636 + checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 637 + 638 + [[package]] 544 639 name = "signal-hook-registry" 545 640 version = "1.4.8" 546 641 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 577 672 version = "0.1.0" 578 673 dependencies = [ 579 674 "anyhow", 675 + "async-channel", 580 676 "bytes", 581 677 "cid", 582 678 "futures", 583 679 "iroh-car", 680 + "mimalloc", 584 681 "serde", 585 682 "serde_bytes", 586 683 "serde_ipld_dagcbor",
+11
Cargo.toml
··· 27 27 anyhow = { version = "1.0.100", features = ["backtrace"] } 28 28 tokio = { version = "1", features = ["full"] } 29 29 iroh-car = "0.5.1" 30 + mimalloc = "0.1.48" 31 + async-channel = "2.5.0" 32 + 33 + [profile.profiling] 34 + inherits = "release" 35 + debug = true 30 36 31 37 [[example]] 32 38 name = "star-debug" ··· 37 43 name = "car-to-star" 38 44 path = "src/examples/car-to-star.rs" 39 45 required-features = ["async"] 46 + 47 + [[example]] 48 + name = "convert-bulk" 49 + path = "src/examples/convert-bulk.rs" 50 + required-features = ["async"]
+244
src/examples/convert-bulk.rs
··· 1 + use anyhow::Context; 2 + use star::StarMstEntry; 3 + use star::StarMstNode; 4 + use star::calculate_height; 5 + use star::StarSerializer; 6 + use star::StarCommit; 7 + use star::RepoMstNode; 8 + use cid::Cid; 9 + use std::collections::HashMap; 10 + use iroh_car::CarReader; 11 + use std::env; 12 + use std::path::PathBuf; 13 + use std::sync::{ 14 + Arc, 15 + atomic::{AtomicUsize, Ordering}, 16 + }; 17 + use tokio::{io::AsyncRead, task::JoinSet}; 18 + use anyhow::Result; 19 + 20 + use mimalloc::MiMalloc; 21 + #[global_allocator] 22 + static GLOBAL: MiMalloc = MiMalloc; 23 + 24 + 25 + async fn get_cars( 26 + cars_folder: PathBuf, 27 + tx: async_channel::Sender<(tokio::io::BufReader<tokio::fs::File>, String)>, 28 + ) -> Result<()> { 29 + let mut dir = tokio::fs::read_dir(cars_folder).await?; 30 + while let Some(entry) = dir.next_entry().await? { 31 + if !entry.file_type().await?.is_file() { 32 + continue; 33 + } 34 + let reader = tokio::fs::File::open(&entry.path()).await?; 35 + let reader = tokio::io::BufReader::new(reader); 36 + tx.send((reader, entry.file_name().to_string_lossy().into())).await?; 37 + } 38 + Ok(()) 39 + } 40 + 41 + async fn converter<R: AsyncRead + Unpin + Send + Sync + 'static>( 42 + car_rx: async_channel::Receiver<(R, String)>, 43 + star_folder: String, 44 + n: Arc<AtomicUsize>, 45 + ) -> Result<()> { 46 + while let Ok((f, name)) = car_rx.recv().await { 47 + n.fetch_add(1, Ordering::Relaxed); 48 + // eprintln!("failed to drive mem for {name:?}: {e:?}, skipping"); 49 + 50 + let mut car = match CarReader::new(f).await { 51 + Ok(c) => c, 52 + Err(e) => { 53 + eprintln!("skipping car: {e}"); 54 + continue; 55 + } 56 + }; 57 + 58 + let roots = car.header().roots(); 59 + assert_eq!(roots.len(), 1); 60 + 61 + let commit_cid = *roots.first().expect("a root to be present"); 62 + 63 + let mut blocks: HashMap<Cid, Vec<u8>> = HashMap::new(); 64 + 65 + while let Some((cid, data)) = car.next_block().await? { 66 + blocks.insert(cid, data); 67 + } 68 + let star_path = format!("{star_folder}/{}", name.replace(".car", ".star")); 69 + let res = tokio::task::spawn_blocking(move || { 70 + let output_file = std::fs::File::create(star_path)?; 71 + 72 + let commit_bytes = blocks.get(&commit_cid).context("Commit block not found")?; 73 + 74 + #[derive(serde::Deserialize)] 75 + struct RepoCommit { 76 + did: String, 77 + version: i64, 78 + data: Cid, 79 + rev: String, 80 + prev: Option<Cid>, 81 + sig: Option<serde_bytes::ByteBuf>, 82 + } 83 + 84 + let repo_commit: RepoCommit = serde_ipld_dagcbor::from_slice(commit_bytes)?; 85 + 86 + let root_bytes = blocks 87 + .get(&repo_commit.data) 88 + .context("repo data cannot be null")?; 89 + let root_node: RepoMstNode = 90 + serde_ipld_dagcbor::from_slice(root_bytes).context("root must be an mst node")?; 91 + 92 + let star_data = if root_node.l.is_none() && root_node.e.is_empty() { 93 + None 94 + } else { 95 + Some(repo_commit.data) 96 + }; 97 + 98 + let star_commit = StarCommit { 99 + did: repo_commit.did, 100 + version: repo_commit.version, 101 + data: star_data, 102 + rev: repo_commit.rev, 103 + prev: repo_commit.prev, 104 + sig: repo_commit.sig, 105 + }; 106 + 107 + let mut serializer = StarSerializer::new(output_file); 108 + 109 + serializer.write_header(&star_commit)?; 110 + 111 + if let Some(root_cid) = star_commit.data { 112 + write_tree(root_cid, &blocks, &mut serializer)?; 113 + } 114 + 115 + serializer.finish()?; 116 + Ok::<(), anyhow::Error>(()) 117 + }).await?; 118 + 119 + if let Err(e) = res { 120 + eprintln!("failed: {name}: {e}"); 121 + } 122 + } 123 + Ok(()) 124 + } 125 + 126 + fn write_tree( 127 + node_cid: Cid, 128 + blocks: &HashMap<Cid, Vec<u8>>, 129 + serializer: &mut StarSerializer<std::fs::File>, 130 + ) -> Result<(usize, usize)> { 131 + // println!("writing tree under {node_cid:?}..."); 132 + 133 + let mut nodes_written = 0; 134 + let mut records_written = 0; 135 + 136 + let block_bytes = blocks 137 + .get(&node_cid) 138 + .with_context(|| format!("Missing block {}", node_cid))?; 139 + 140 + let repo_node: RepoMstNode = serde_ipld_dagcbor::from_slice(block_bytes)?; 141 + 142 + let height = if let Some(first_entry) = repo_node.e.first() { 143 + calculate_height(&first_entry.k) 144 + } else { 145 + 0 146 + }; 147 + 148 + let star_node = StarMstNode { 149 + l: repo_node.l, 150 + l_archived: repo_node.l.map(|_| true), 151 + e: repo_node 152 + .e 153 + .iter() 154 + .map(|e| { 155 + let v = if height == 0 { None } else { Some(e.v) }; 156 + StarMstEntry { 157 + p: e.p, 158 + k: e.k.clone(), 159 + v, 160 + v_archived: Some(true), 161 + t: e.t, 162 + t_archived: e.t.map(|_| true), 163 + } 164 + }) 165 + .collect(), 166 + }; 167 + 168 + serializer.write_node(&star_node)?; 169 + nodes_written += 1; 170 + 171 + if let Some(l_cid) = repo_node.l { 172 + let (n, r) = write_tree(l_cid, blocks, serializer)?; 173 + nodes_written += n; 174 + records_written += r; 175 + } 176 + 177 + for e in repo_node.e { 178 + let record_bytes = blocks 179 + .get(&e.v) 180 + .with_context(|| format!("Missing record {}", e.v))?; 181 + // eprintln!("writing record {:?} (<= {node_cid:?})", e.v); 182 + serializer.write_record(record_bytes)?; 183 + records_written += 1; 184 + 185 + if let Some(t_cid) = e.t { 186 + let (n, r) = write_tree(t_cid, blocks, serializer)?; 187 + nodes_written += n; 188 + records_written += r; 189 + } 190 + } 191 + 192 + Ok((nodes_written, records_written)) 193 + } 194 + 195 + #[tokio::main] 196 + async fn main() -> Result<()> { 197 + let args: Vec<String> = env::args().collect(); 198 + if args.len() != 3 { 199 + eprintln!("Usage: car-to-star <cars-folder> <stars-folder>"); 200 + std::process::exit(1); 201 + } 202 + let cars_path = &args[1]; 203 + let stars_path = &args[2]; 204 + 205 + let mut set = JoinSet::<Result<()>>::new(); 206 + 207 + let (cars_tx, cars_rx) = async_channel::bounded(2); 208 + set.spawn(get_cars(cars_path.into(), cars_tx)); 209 + 210 + let n: Arc<AtomicUsize> = Arc::new(0.into()); 211 + 212 + tokio::fs::create_dir_all(stars_path.clone()).await?; 213 + for _ in 0..10 { 214 + set.spawn(converter(cars_rx.clone(), stars_path.clone(), n.clone())); 215 + } 216 + drop(cars_rx); 217 + 218 + tokio::fs::create_dir_all(stars_path.clone()).await?; 219 + set.spawn({ 220 + let n = n.clone(); 221 + let mut last_n: usize = 0; 222 + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); 223 + async move { 224 + loop { 225 + interval.tick().await; 226 + let this_n = n.load(Ordering::Relaxed); 227 + println!("{} ({this_n})", this_n - last_n); 228 + if this_n > 1 && this_n == last_n { 229 + eprintln!("done?"); 230 + break Ok(()); 231 + } 232 + last_n = this_n; 233 + } 234 + } 235 + }); 236 + 237 + while let Some(res) = set.join_next().await { 238 + println!("task from set joined: {res:?}"); 239 + } 240 + 241 + eprintln!("total repos converted: {n:?}"); 242 + 243 + Ok(()) 244 + }
+38 -31
src/examples/star-debug.rs
··· 1 - use star::{StarItem, StarIterator, StarMstNode}; 1 + use star::{StarItem, StarIterator}; 2 2 use std::env; 3 3 use std::fs::File; 4 4 use std::io::BufReader; ··· 14 14 let reader = BufReader::new(file); 15 15 let iter = StarIterator::new(reader); 16 16 17 + let mut nodes = 0; 18 + let mut records = 0; 19 + 17 20 for item in iter.iter_tree() { 18 21 let item = item?; 19 22 match item { ··· 23 26 println!(" Rev: {}", c.rev); 24 27 println!(" Root: {:?}", c.data); 25 28 } 26 - StarItem::Node(n) => { 27 - print_node(&n); 29 + StarItem::Node(_n) => { 30 + // print_node(&n); 31 + nodes += 1; 28 32 } 29 - StarItem::Record { key, cid, .. } => { 30 - println!(" Record: key={}, cid={}", to_hex(&key), cid); 33 + StarItem::Record { .. } => { 34 + // println!(" Record: key={}, cid={}", to_hex(&key), cid); 35 + records += 1; 31 36 } 32 37 } 33 38 } 34 39 40 + println!("Tree contained {nodes} nodes, {records} records."); 41 + 35 42 Ok(()) 36 43 } 37 44 38 - fn print_node(node: &StarMstNode) { 39 - println!("Node:"); 40 - if let Some(l) = node.l { 41 - println!(" Left -> {}", l); 42 - } 43 - for (i, e) in node.e.iter().enumerate() { 44 - print!(" Entry {}: key={}", i, to_hex(&e.k)); 45 - if let Some(v) = e.v { 46 - print!(", val={}", v); 47 - } else { 48 - print!(", val=(implicit)"); 49 - } 50 - if let Some(t) = e.t { 51 - print!(", right->{}", t); 52 - } 53 - println!(); 54 - } 55 - } 45 + // fn print_node(node: &StarMstNode) { 46 + // println!("Node:"); 47 + // if let Some(l) = node.l { 48 + // println!(" Left -> {}", l); 49 + // } 50 + // for (i, e) in node.e.iter().enumerate() { 51 + // print!(" Entry {}: key={}", i, to_hex(&e.k)); 52 + // if let Some(v) = e.v { 53 + // print!(", val={}", v); 54 + // } else { 55 + // print!(", val=(implicit)"); 56 + // } 57 + // if let Some(t) = e.t { 58 + // print!(", right->{}", t); 59 + // } 60 + // println!(); 61 + // } 62 + // } 56 63 57 - fn to_hex(bytes: &[u8]) -> String { 58 - let mut s = String::with_capacity(bytes.len() * 2); 59 - for b in bytes { 60 - use std::fmt::Write; 61 - write!(&mut s, "{:02x}", b).unwrap(); 62 - } 63 - s 64 - } 64 + // fn to_hex(bytes: &[u8]) -> String { 65 + // let mut s = String::with_capacity(bytes.len() * 2); 66 + // for b in bytes { 67 + // use std::fmt::Write; 68 + // write!(&mut s, "{:02x}", b).unwrap(); 69 + // } 70 + // s 71 + // }