Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

wip

phil 3a93da4f bec73def

+492 -18
+49
Cargo.lock
··· 95 95 checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" 96 96 97 97 [[package]] 98 + name = "async-channel" 99 + version = "2.5.0" 100 + source = "registry+https://github.com/rust-lang/crates.io-index" 101 + checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" 102 + dependencies = [ 103 + "concurrent-queue", 104 + "event-listener-strategy", 105 + "futures-core", 106 + "pin-project-lite", 107 + ] 108 + 109 + [[package]] 98 110 name = "autocfg" 99 111 version = "1.5.0" 100 112 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 295 307 checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 296 308 297 309 [[package]] 310 + name = "concurrent-queue" 311 + version = "2.5.0" 312 + source = "registry+https://github.com/rust-lang/crates.io-index" 313 + checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" 314 + dependencies = [ 315 + "crossbeam-utils", 316 + ] 317 + 318 + [[package]] 298 319 name = "const-str" 299 320 version = "0.4.3" 300 321 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 511 532 ] 512 533 513 534 [[package]] 535 + name = "event-listener" 536 + version = "5.4.1" 537 + source = "registry+https://github.com/rust-lang/crates.io-index" 538 + checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" 539 + dependencies = [ 540 + "concurrent-queue", 541 + "parking", 542 + "pin-project-lite", 543 + ] 544 + 545 + [[package]] 546 + name = "event-listener-strategy" 547 + version = "0.5.4" 548 + source = "registry+https://github.com/rust-lang/crates.io-index" 549 + checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" 550 + dependencies = [ 551 + "event-listener", 552 + "pin-project-lite", 553 + ] 554 + 555 + [[package]] 514 556 name = "fastrand" 515 557 version = "2.3.0" 516 558 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 976 1018 checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" 977 1019 978 1020 [[package]] 1021 + name = "parking" 1022 + version = "2.2.1" 1023 + source = "registry+https://github.com/rust-lang/crates.io-index" 1024 + checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" 1025 + 1026 + [[package]] 979 1027 name = "parking_lot" 980 1028 version = "0.12.5" 981 1029 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1149 1197 name = "repo-stream" 1150 1198 version = "0.4.0" 1151 1199 dependencies = [ 1200 + "async-channel", 1152 1201 "cid", 1153 1202 "clap", 1154 1203 "criterion",
+1
Cargo.toml
··· 32 32 tokio = { version = "1.47.1", features = ["full"] } 33 33 mimalloc = "0.1.48" 34 34 hmac-sha256 = "1.1.12" 35 + async-channel = "2.5.0" 35 36 36 37 [profile.profiling] 37 38 inherits = "release"
+123
examples/mst-node-survival/main.rs
··· 1 + extern crate repo_stream; 2 + use repo_stream::link::ThingKind; 3 + use clap::Parser; 4 + use repo_stream::{Driver, DriverBuilder}; 5 + use std::path::PathBuf; 6 + 7 + type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 8 + 9 + #[derive(Debug, Parser)] 10 + struct Args { 11 + #[arg()] 12 + a: PathBuf, 13 + // #[arg()] 14 + // b: PathBuf, 15 + } 16 + 17 + #[tokio::main] 18 + async fn main() -> Result<()> { 19 + env_logger::init(); 20 + 21 + let Args { a } = Args::parse(); 22 + let reader_a = tokio::fs::File::open(a.clone()).await?; 23 + let reader_a = tokio::io::BufReader::new(reader_a); 24 + 25 + // let reader_b = tokio::fs::File::open(b.clone()).await?; 26 + // let reader_b = tokio::io::BufReader::new(reader_b); 27 + 28 + let builder = DriverBuilder::new() 29 + .with_mem_limit_mb(1000) 30 + .with_block_processor(|_| vec![]); 31 + 32 + log::info!("loading {a:?}..."); 33 + let driver_a = match builder.clone().load_car(reader_a).await? { 34 + Driver::Memory(_, _, mem_driver) => mem_driver, 35 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 36 + }; 37 + 38 + // log::info!("loading {b:?}..."); 39 + // let driver_b = match builder.load_car(reader_b).await? { 40 + // Driver::Memory(_, _, mem_driver) => mem_driver, 41 + // Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 42 + // }; 43 + 44 + let mut total_referenced_nodes = 0; 45 + let mut total_l0_nodes = 0; 46 + let mut total_referenced_records = 0; 47 + let mut total_low_records = 0; 48 + // let mut records_a = 0; 49 + // let mut layer0_a = 0; 50 + 51 + for (_link, mpb) in driver_a.blocks { 52 + // // skips (probably nodes) 53 + // if !mpb.unknown_depth() { 54 + // continue; 55 + // } 56 + 57 + // // records 58 + // if mpb.to_node().is_some() { 59 + // continue; 60 + // } 61 + 62 + 63 + // nodes 64 + let Some(node) = mpb.to_node() else { 65 + continue; 66 + }; 67 + let Some(depth) = node.depth else { 68 + continue; 69 + }; 70 + for thing in node.things { 71 + match thing.kind { 72 + ThingKind::Record(_) => { 73 + total_referenced_records += 1; 74 + if depth <= 1 { 75 + total_low_records += 1; 76 + } 77 + } 78 + ThingKind::ChildNode => { 79 + total_referenced_nodes += 1; 80 + if depth == 1 { 81 + total_l0_nodes += 1; 82 + } 83 + } 84 + } 85 + } 86 + 87 + // // levels 88 + // let Some(node) = mpb.to_node() else { 89 + // continue; 90 + // }; 91 + // if node.depth != Some(1) { 92 + // continue; 93 + // } 94 + 95 + // total_a += 1; 96 + 97 + // if driver_b.blocks.contains_key(&link) { 98 + // surviving_a += 1; 99 + // } 100 + } 101 + 102 + eprintln!("referenced nodes: {total_referenced_nodes}"); 103 + eprintln!("referenced records: {total_referenced_records}"); 104 + let total_links = total_referenced_nodes + total_referenced_records; 105 + eprintln!("total links: {}", total_links); 106 + 107 + eprintln!("layer 0+1 records: {total_low_records}"); 108 + eprintln!("low recs of records: {:.1}", 109 + 100. * f64::try_from(total_low_records).unwrap() / f64::try_from(total_referenced_records).unwrap() 110 + ); 111 + 112 + eprintln!("layer 0 nodes: {total_l0_nodes}"); 113 + eprintln!("low nodes of nodes: {:.1}", 114 + 100. * f64::try_from(total_l0_nodes).unwrap() / f64::try_from(total_referenced_nodes).unwrap() 115 + ); 116 + 117 + let low_links = total_l0_nodes + total_low_records; 118 + eprintln!("low links of all links: {:.1}", 119 + 100. * f64::try_from(low_links).unwrap() / f64::try_from(total_links).unwrap() 120 + ); 121 + 122 + Ok(()) 123 + }
+126
examples/node-counts/main.rs
··· 1 + extern crate repo_stream; 2 + use std::sync::atomic::AtomicUsize; 3 + use std::sync::atomic::Ordering; 4 + use tokio::task::JoinSet; 5 + use tokio::io::AsyncRead; 6 + use std::sync::Arc; 7 + use tokio::sync::Mutex; 8 + use std::collections::BTreeMap; 9 + use clap::Parser; 10 + use repo_stream::DriverBuilder; 11 + use std::path::PathBuf; 12 + 13 + 14 + #[derive(Debug, thiserror::Error)] 15 + enum Error { 16 + #[error("io error: {0}")] 17 + IoError(#[from] std::io::Error), 18 + #[error("drive error: {0}")] 19 + DriveError(#[from] repo_stream::DriveError), 20 + #[error("send error: {0}")] 21 + SendError(String), 22 + #[error("failed to die")] 23 + FailedToDie, 24 + } 25 + 26 + type Result<T> = std::result::Result<T, Error>; 27 + 28 + #[derive(Debug, Parser)] 29 + struct Args { 30 + #[arg()] 31 + folder: PathBuf, 32 + } 33 + 34 + async fn get_cars( 35 + cars_folder: PathBuf, 36 + tx: async_channel::Sender<(tokio::io::BufReader<tokio::fs::File>, String)>, 37 + ) -> Result<()> { 38 + let mut dir = tokio::fs::read_dir(cars_folder).await?; 39 + while let Some(entry) = dir.next_entry().await? { 40 + if !entry.file_type().await?.is_file() { 41 + continue; 42 + } 43 + let reader = tokio::fs::File::open(&entry.path()).await?; 44 + let reader = tokio::io::BufReader::new(reader); 45 + tx.send((reader, entry.file_name().to_string_lossy().into())).await.map_err(|e| Error::SendError(e.to_string()))?; 46 + } 47 + Ok(()) 48 + } 49 + 50 + async fn counter<R: AsyncRead + Unpin + Send + Sync + 'static>( 51 + car_rx: async_channel::Receiver<(R, String)>, 52 + totals: Arc<Mutex<BTreeMap<usize, usize>>>, 53 + n: Arc<AtomicUsize>, 54 + ) -> Result<()> { 55 + 56 + let builder = DriverBuilder::new() 57 + .with_block_processor(|_| vec![]); 58 + 59 + while let Ok((f, name)) = car_rx.recv().await { 60 + n.fetch_add(1, Ordering::Relaxed); 61 + 62 + let Ok(Some(counts)) = builder 63 + .clone() 64 + .count_entries(f) 65 + .await 66 + .inspect_err(|e| eprintln!("{name} failed: {e}")) 67 + else { 68 + continue 69 + }; 70 + 71 + let mut t = totals.lock().await; 72 + for (entries, n) in counts { 73 + *t.entry(entries).or_default() += n; 74 + } 75 + drop(t); 76 + } 77 + Ok(()) 78 + } 79 + 80 + #[tokio::main] 81 + async fn main() -> Result<()> { 82 + env_logger::init(); 83 + 84 + let Args { folder } = Args::parse(); 85 + 86 + let mut set = JoinSet::<Result<()>>::new(); 87 + 88 + tokio::fs::create_dir_all(folder.clone()).await?; 89 + 90 + let (cars_tx, cars_rx) = async_channel::bounded(2); 91 + set.spawn(get_cars(folder, cars_tx)); 92 + 93 + let n: Arc<AtomicUsize> = Arc::new(0.into()); 94 + 95 + let totals = Arc::new(Mutex::new(BTreeMap::new())); 96 + 97 + for _ in 0..15 { 98 + set.spawn(counter(cars_rx.clone(), totals.clone(), n.clone())); 99 + } 100 + drop(cars_rx); 101 + 102 + let (die, mut til_death) = tokio::sync::oneshot::channel(); 103 + let monitor = n.clone(); 104 + tokio::task::spawn(async move { 105 + let mut interval = tokio::time::interval(std::time::Duration::from_secs(2)); 106 + loop { 107 + tokio::select! { 108 + _ = interval.tick() => {}, 109 + _ = &mut til_death => break, 110 + } 111 + eprintln!("repos: {}", monitor.load(Ordering::Relaxed)); 112 + } 113 + }); 114 + 115 + while let Some(res) = set.join_next().await { 116 + println!("task from set joined: {res:?}"); 117 + } 118 + die.send(()).map_err(|_| Error::FailedToDie)?; 119 + 120 + println!("repos: {}", n.load(Ordering::SeqCst)); 121 + for (n, c) in totals.lock().await.iter() { 122 + println!("{n}\t{c}"); 123 + } 124 + 125 + Ok(()) 126 + }
+7 -6
examples/read-file/main.rs
··· 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 26 let (commit, mut driver) = match DriverBuilder::new() 27 + .with_mem_limit_mb(1000) 27 28 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 29 .load_car(reader) 29 30 .await? ··· 36 37 37 38 while let Step::Value(records) = driver.next_chunk(256).await? { 38 39 for Output { rkey, cid, data } in records { 39 - let size = usize::from_ne_bytes(data.try_into().unwrap()); 40 - print!("0x"); 41 - for byte in cid.to_bytes() { 42 - print!("{byte:>02x}"); 43 - } 44 - println!(": {rkey} => record of len {}", size); 40 + // let size = usize::from_ne_bytes(data.try_into().unwrap()); 41 + // print!("0x"); 42 + // for byte in cid.to_bytes() { 43 + // print!("{byte:>02x}"); 44 + // } 45 + // println!(": {rkey} => record of len {}", size); 45 46 } 46 47 } 47 48
+84 -7
src/drive.rs
··· 7 7 mst::MstNode, 8 8 walk::{MstError, Output}, 9 9 }; 10 + use std::collections::BTreeMap; 10 11 use cid::Cid; 11 12 use iroh_car::CarReader; 12 13 use std::convert::Infallible; ··· 46 47 pub type BlockChunk = Vec<Output>; 47 48 48 49 #[derive(Debug, Clone)] 49 - pub(crate) enum MaybeProcessedBlock { 50 + pub enum MaybeProcessedBlock { 50 51 /// A block that's *probably* a Node (but we can't know yet) 51 52 /// 52 53 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 72 73 } 73 74 74 75 impl MaybeProcessedBlock { 76 + pub fn to_node(&self) -> Option<MstNode> { 77 + let Self::Raw(bytes) = self else { 78 + return None; 79 + }; 80 + serde_ipld_dagcbor::from_slice(bytes).ok() 81 + } 82 + pub fn unknown_depth(&self) -> bool { 83 + let Self::Raw(bytes) = self else { 84 + return false; 85 + }; 86 + let Ok(node) = serde_ipld_dagcbor::from_slice::<MstNode>(bytes) else { 87 + return false; 88 + }; 89 + node.depth.is_none() 90 + } 75 91 pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 76 92 if MstNode::could_be(&data) { 77 93 MaybeProcessedBlock::Raw(data) ··· 168 184 /// Begin processing an atproto MST from a CAR file 169 185 pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 170 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 + } 188 + 189 + /// Begin processing an atproto MST from a CAR file 190 + pub async fn count_entries<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Option<BTreeMap<usize, usize>>, DriveError> { 191 + Driver::count_entries(reader).await 171 192 } 172 193 } 173 194 174 195 impl<R: AsyncRead + Unpin> Driver<R> { 196 + pub async fn count_entries( 197 + reader: R, 198 + ) -> Result<Option<BTreeMap<usize, usize>>, DriveError> { 199 + let mut mem_blocks: HashMap<ObjectLink, _> = HashMap::new(); 200 + 201 + let mut car = CarReader::new(reader).await?; 202 + 203 + let roots = car.header().roots(); 204 + assert_eq!(roots.len(), 1); 205 + 206 + let root = *roots.first().ok_or(DriveError::MissingRoot)?; 207 + log::debug!("root: {root:?}"); 208 + 209 + let mut commit = None; 210 + 211 + 212 + // try to load all the blocks into memory 213 + while let Some((cid, data)) = car.next_block().await? { 214 + // the root commit is a Special Third Kind of block that we need to make 215 + // sure not to optimistically send to the processing function 216 + if cid == root { 217 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 218 + commit = Some(c); 219 + continue; 220 + } 221 + let maybe_processed = MaybeProcessedBlock::maybe(|_| vec![], data); 222 + 223 + // stash (maybe processed) blocks in memory as long as we have room 224 + mem_blocks.insert(cid.into(), maybe_processed); 225 + } 226 + 227 + let commit = commit.ok_or(DriveError::MissingCommit)?; 228 + 229 + // the commit always must point to a Node; empty node => empty MST special case 230 + let root_node: MstNode = match mem_blocks 231 + .get(&commit.data) 232 + .ok_or(DriveError::MissingCommit)? 233 + { 234 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 235 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 236 + }; 237 + 238 + if root_node.depth.unwrap_or(0) < 4 { 239 + return Ok(None); 240 + } 241 + 242 + let mut walker = Walker::new(root_node); 243 + Ok(Some(walker.count_entries(&mut mem_blocks)?)) 244 + } 245 + 175 246 /// Begin processing an atproto MST from a CAR file 176 247 /// 177 248 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 186 257 process: fn(Bytes) -> Bytes, 187 258 mem_limit_mb: usize, 188 259 ) -> Result<Driver<R>, DriveError> { 260 + let mut block_count = 0; 261 + 189 262 let max_size = mem_limit_mb * 2_usize.pow(20); 190 263 let mut mem_blocks = HashMap::new(); 191 264 192 265 let mut car = CarReader::new(reader).await?; 193 266 194 - let root = *car 195 - .header() 196 - .roots() 197 - .first() 198 - .ok_or(DriveError::MissingRoot)?; 267 + let roots = car.header().roots(); 268 + assert_eq!(roots.len(), 1); 269 + 270 + let root = *roots.first().ok_or(DriveError::MissingRoot)?; 199 271 log::debug!("root: {root:?}"); 200 272 201 273 let mut commit = None; ··· 203 275 // try to load all the blocks into memory 204 276 let mut mem_size = 0; 205 277 while let Some((cid, data)) = car.next_block().await? { 278 + block_count += 1; 206 279 // the root commit is a Special Third Kind of block that we need to make 207 280 // sure not to optimistically send to the processing function 208 281 if cid == root { ··· 218 291 mem_size += maybe_processed.len(); 219 292 mem_blocks.insert(cid.into(), maybe_processed); 220 293 if mem_size >= max_size { 294 + log::debug!("blocks loaded before disk needed: {block_count}"); 295 + 221 296 return Ok(Driver::Disk(NeedDisk { 222 297 car, 223 298 root, ··· 228 303 })); 229 304 } 230 305 } 306 + 307 + log::debug!("blocks: {block_count}"); 231 308 232 309 // all blocks loaded and we fit in memory! hopefully we found the commit... 233 310 let commit = commit.ok_or(DriveError::MissingCommit)?; ··· 274 351 /// so the sync/async boundaries become a little easier to work around. 275 352 #[derive(Debug)] 276 353 pub struct MemDriver { 277 - blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 354 + pub blocks: HashMap<ObjectLink, MaybeProcessedBlock>, 278 355 walker: Walker, 279 356 process: fn(Bytes) -> Bytes, // TODO: impl Fn(bytes) -> Bytes? 280 357 next_missing: Option<NodeThing>,
+1 -1
src/link.rs
··· 21 21 } 22 22 } 23 23 24 - #[derive(Debug, Clone)] 24 + #[derive(Debug, Clone, PartialEq)] 25 25 pub struct NodeThing { 26 26 pub link: ObjectLink, 27 27 pub kind: ThingKind,
+1 -1
src/mst.rs
··· 48 48 } 49 49 50 50 #[derive(Debug, Clone)] 51 - pub(crate) struct MstNode { 51 + pub struct MstNode { 52 52 pub depth: Option<Depth>, // known for nodes with entries (required for root) 53 53 pub things: Vec<NodeThing>, 54 54 }
+100 -3
src/walk.rs
··· 3 3 use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::mst::{Depth, MstNode}; 5 5 use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop}; 6 + use std::collections::BTreeMap; 6 7 use cid::Cid; 7 8 use std::convert::Infallible; 8 9 ··· 50 51 End(Option<Rkey>), 51 52 } 52 53 54 + // #[derive(Debug, PartialEq)] 55 + // pub struct LowStep { 56 + // pub cid: Cid, 57 + // pub kind: LowKind, 58 + // } 59 + 60 + // #[derive(Debug, PartialEq)] 61 + // pub enum LowKind { 62 + // Node { 63 + // children: Option<Vec<NodeThing>>, 64 + // }, 65 + // Record { 66 + // key: Rkey, 67 + // data: Option<Bytes>, 68 + // }, 69 + // } 70 + 53 71 /// Traverser of an atproto MST 54 72 /// 55 73 /// Walks the tree from left-to-right in depth-first order 56 74 #[derive(Debug, Clone)] 57 75 pub struct Walker { 76 + links: usize, 58 77 prev_rkey: Rkey, 59 78 root_depth: Depth, 60 79 todo: Vec<Vec<NodeThing>>, ··· 63 82 impl Walker { 64 83 pub fn new(root_node: MstNode) -> Self { 65 84 Self { 85 + links: 0, 66 86 prev_rkey: "".to_string(), 67 87 root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 68 - todo: vec![root_node.things], 88 + todo: vec![root_node.things.into_iter().filter(|t| !t.is_record()).collect()], 89 + } 90 + } 91 + 92 + pub fn count_entries( 93 + &mut self, 94 + blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 95 + ) -> Result<BTreeMap<usize, usize>, WalkError> { 96 + let mut counts = BTreeMap::new(); 97 + 98 + while let Some(NodeThing { link, kind }) = self.next_todo() { 99 + let Some(mpb) = blocks.get(&link) else { 100 + return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 101 + }; 102 + match kind { 103 + ThingKind::Record(_) => unreachable!(), 104 + ThingKind::ChildNode => { 105 + let MaybeProcessedBlock::Raw(data) = mpb else { 106 + return Err(WalkError::BadCommitFingerprint); 107 + }; 108 + 109 + let node: MstNode = 110 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 111 + 112 + if node.is_empty() { 113 + return Err(WalkError::MstError(MstError::EmptyNode)); 114 + } 115 + 116 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 117 + let next_depth = current_depth 118 + .checked_sub(1) 119 + .ok_or(MstError::DepthUnderflow)?; 120 + if let Some(d) = node.depth 121 + && d != next_depth 122 + { 123 + return Err(WalkError::MstError(MstError::WrongDepth { 124 + depth: d, 125 + expected: next_depth, 126 + })); 127 + } 128 + 129 + let mut entries = 0; 130 + let mut links = Vec::new(); 131 + for thing in node.things { 132 + if thing.is_record() { 133 + entries += 1; 134 + } else { 135 + links.push(thing); 136 + } 137 + } 138 + self.todo.push(links); 139 + if entries > 0 { 140 + *counts.entry(entries).or_default() += 1; 141 + if entries > 10_000 { 142 + eprintln!("whoa, found a {}-entry node", entries); 143 + } 144 + } 145 + } 146 + } 69 147 } 148 + 149 + Ok(counts) 70 150 } 71 151 72 152 pub fn viz( ··· 245 325 })); 246 326 } 247 327 248 - log::trace!("node into depth {next_depth}"); 328 + let n = node.things.len(); 329 + log::trace!("node into depth {next_depth} with {n} links"); 249 330 self.todo.push(node.things); 331 + self.links += n; 250 332 Ok(None) 251 333 } 252 334 } ··· 278 360 return Ok(Step::Value(out)); 279 361 } 280 362 } 363 + log::debug!("total links: {}", self.links); 281 364 Ok(Step::End(None)) 282 365 } 283 366 367 + // /// Emit every step including MST nodes 368 + // pub fn step_low( 369 + // &mut self, 370 + // blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 371 + // process: impl Fn(Bytes) -> Bytes, 372 + // ) -> Result<Option<LowStep>, WalkError> { 373 + // let Some(NodeThing { link, kind }) = self.next_todo() else { 374 + // return Ok(None); 375 + // }; 376 + // let Some(mpb) = blocks.get(&link) else { 377 + 378 + // } 379 + // } 380 + 284 381 /// Advance through nodes, allowing for missing records 285 382 pub fn step_sparse( 286 383 &mut self, ··· 331 428 } 332 429 Err(anyother) => return Err(anyother), 333 430 Ok(z) => { 334 - eprintln!("apparently we are too far at {z:?}"); 431 + log::info!("apparently we are too far at {z:?}"); 335 432 return Ok(rkey_prev); // oop real record, mutant went too far 336 433 } 337 434 }