Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

wip more like garbage ip

phil 455306cb 3a93da4f

+1 -243
-126
examples/node-counts/main.rs
··· 1 - extern crate repo_stream; 2 - use std::sync::atomic::AtomicUsize; 3 - use std::sync::atomic::Ordering; 4 - use tokio::task::JoinSet; 5 - use tokio::io::AsyncRead; 6 - use std::sync::Arc; 7 - use tokio::sync::Mutex; 8 - use std::collections::BTreeMap; 9 - use clap::Parser; 10 - use repo_stream::DriverBuilder; 11 - use std::path::PathBuf; 12 - 13 - 14 - #[derive(Debug, thiserror::Error)] 15 - enum Error { 16 - #[error("io error: {0}")] 17 - IoError(#[from] std::io::Error), 18 - #[error("drive error: {0}")] 19 - DriveError(#[from] repo_stream::DriveError), 20 - #[error("send error: {0}")] 21 - SendError(String), 22 - #[error("failed to die")] 23 - FailedToDie, 24 - } 25 - 26 - type Result<T> = std::result::Result<T, Error>; 27 - 28 - #[derive(Debug, Parser)] 29 - struct Args { 30 - #[arg()] 31 - folder: PathBuf, 32 - } 33 - 34 - async fn get_cars( 35 - cars_folder: PathBuf, 36 - tx: async_channel::Sender<(tokio::io::BufReader<tokio::fs::File>, String)>, 37 - ) -> Result<()> { 38 - let mut dir = tokio::fs::read_dir(cars_folder).await?; 39 - while let Some(entry) = dir.next_entry().await? { 40 - if !entry.file_type().await?.is_file() { 41 - continue; 42 - } 43 - let reader = tokio::fs::File::open(&entry.path()).await?; 44 - let reader = tokio::io::BufReader::new(reader); 45 - tx.send((reader, entry.file_name().to_string_lossy().into())).await.map_err(|e| Error::SendError(e.to_string()))?; 46 - } 47 - Ok(()) 48 - } 49 - 50 - async fn counter<R: AsyncRead + Unpin + Send + Sync + 'static>( 51 - car_rx: async_channel::Receiver<(R, String)>, 52 - totals: Arc<Mutex<BTreeMap<usize, usize>>>, 53 - n: Arc<AtomicUsize>, 54 - ) -> Result<()> { 55 - 56 - let builder = DriverBuilder::new() 57 - .with_block_processor(|_| vec![]); 58 - 59 - while let Ok((f, name)) = car_rx.recv().await { 60 - n.fetch_add(1, Ordering::Relaxed); 61 - 62 - let Ok(Some(counts)) = builder 63 - .clone() 64 - .count_entries(f) 65 - .await 66 - .inspect_err(|e| eprintln!("{name} failed: {e}")) 67 - else { 68 - continue 69 - }; 70 - 71 - let mut t = totals.lock().await; 72 - for (entries, n) in counts { 73 - *t.entry(entries).or_default() += n; 74 - } 75 - drop(t); 76 - } 77 - Ok(()) 78 - } 79 - 80 - #[tokio::main] 81 - async fn main() -> Result<()> { 82 - env_logger::init(); 83 - 84 - let Args { folder } = Args::parse(); 85 - 86 - let mut set = JoinSet::<Result<()>>::new(); 87 - 88 - tokio::fs::create_dir_all(folder.clone()).await?; 89 - 90 - let (cars_tx, cars_rx) = async_channel::bounded(2); 91 - set.spawn(get_cars(folder, cars_tx)); 92 - 93 - let n: Arc<AtomicUsize> = Arc::new(0.into()); 94 - 95 - let totals = Arc::new(Mutex::new(BTreeMap::new())); 96 - 97 - for _ in 0..15 { 98 - set.spawn(counter(cars_rx.clone(), totals.clone(), n.clone())); 99 - } 100 - drop(cars_rx); 101 - 102 - let (die, mut til_death) = tokio::sync::oneshot::channel(); 103 - let monitor = n.clone(); 104 - tokio::task::spawn(async move { 105 - let mut interval = tokio::time::interval(std::time::Duration::from_secs(2)); 106 - loop { 107 - tokio::select! { 108 - _ = interval.tick() => {}, 109 - _ = &mut til_death => break, 110 - } 111 - eprintln!("repos: {}", monitor.load(Ordering::Relaxed)); 112 - } 113 - }); 114 - 115 - while let Some(res) = set.join_next().await { 116 - println!("task from set joined: {res:?}"); 117 - } 118 - die.send(()).map_err(|_| Error::FailedToDie)?; 119 - 120 - println!("repos: {}", n.load(Ordering::SeqCst)); 121 - for (n, c) in totals.lock().await.iter() { 122 - println!("{n}\t{c}"); 123 - } 124 - 125 - Ok(()) 126 - }
-55
src/drive.rs
··· 7 7 mst::MstNode, 8 8 walk::{MstError, Output}, 9 9 }; 10 - use std::collections::BTreeMap; 11 10 use cid::Cid; 12 11 use iroh_car::CarReader; 13 12 use std::convert::Infallible; ··· 185 184 pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 186 185 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 186 } 188 - 189 - /// Begin processing an atproto MST from a CAR file 190 - pub async fn count_entries<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Option<BTreeMap<usize, usize>>, DriveError> { 191 - Driver::count_entries(reader).await 192 - } 193 187 } 194 188 195 189 impl<R: AsyncRead + Unpin> Driver<R> { 196 - pub async fn count_entries( 197 - reader: R, 198 - ) -> Result<Option<BTreeMap<usize, usize>>, DriveError> { 199 - let mut mem_blocks: HashMap<ObjectLink, _> = HashMap::new(); 200 - 201 - let mut car = CarReader::new(reader).await?; 202 - 203 - let roots = car.header().roots(); 204 - assert_eq!(roots.len(), 1); 205 - 206 - let root = *roots.first().ok_or(DriveError::MissingRoot)?; 207 - log::debug!("root: {root:?}"); 208 - 209 - let mut commit = None; 210 - 211 - 212 - // try to load all the blocks into memory 213 - while let Some((cid, data)) = car.next_block().await? { 214 - // the root commit is a Special Third Kind of block that we need to make 215 - // sure not to optimistically send to the processing function 216 - if cid == root { 217 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 218 - commit = Some(c); 219 - continue; 220 - } 221 - let maybe_processed = MaybeProcessedBlock::maybe(|_| vec![], data); 222 - 223 - // stash (maybe processed) blocks in memory as long as we have room 224 - mem_blocks.insert(cid.into(), maybe_processed); 225 - } 226 - 227 - let commit = commit.ok_or(DriveError::MissingCommit)?; 228 - 229 - // the commit always must point to a Node; empty node => empty MST special case 230 - let root_node: MstNode = match mem_blocks 231 - .get(&commit.data) 232 - .ok_or(DriveError::MissingCommit)? 233 - { 234 - MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 235 - MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 236 - }; 237 - 238 - if root_node.depth.unwrap_or(0) < 4 { 239 - return Ok(None); 240 - } 241 - 242 - let mut walker = Walker::new(root_node); 243 - Ok(Some(walker.count_entries(&mut mem_blocks)?)) 244 - } 245 190 246 191 /// Begin processing an atproto MST from a CAR file 247 192 ///
+1 -62
src/walk.rs
··· 3 3 use crate::link::{NodeThing, ObjectLink, ThingKind}; 4 4 use crate::mst::{Depth, MstNode}; 5 5 use crate::{Bytes, HashMap, Rkey, disk::DiskStore, drive::MaybeProcessedBlock, noop}; 6 - use std::collections::BTreeMap; 7 6 use cid::Cid; 8 7 use std::convert::Infallible; 9 8 ··· 85 84 links: 0, 86 85 prev_rkey: "".to_string(), 87 86 root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 88 - todo: vec![root_node.things.into_iter().filter(|t| !t.is_record()).collect()], 89 - } 90 - } 91 - 92 - pub fn count_entries( 93 - &mut self, 94 - blocks: &HashMap<ObjectLink, MaybeProcessedBlock>, 95 - ) -> Result<BTreeMap<usize, usize>, WalkError> { 96 - let mut counts = BTreeMap::new(); 97 - 98 - while let Some(NodeThing { link, kind }) = self.next_todo() { 99 - let Some(mpb) = blocks.get(&link) else { 100 - return Err(WalkError::MissingBlock(NodeThing { link, kind }.into())); 101 - }; 102 - match kind { 103 - ThingKind::Record(_) => unreachable!(), 104 - ThingKind::ChildNode => { 105 - let MaybeProcessedBlock::Raw(data) = mpb else { 106 - return Err(WalkError::BadCommitFingerprint); 107 - }; 108 - 109 - let node: MstNode = 110 - serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 111 - 112 - if node.is_empty() { 113 - return Err(WalkError::MstError(MstError::EmptyNode)); 114 - } 115 - 116 - let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 117 - let next_depth = current_depth 118 - .checked_sub(1) 119 - .ok_or(MstError::DepthUnderflow)?; 120 - if let Some(d) = node.depth 121 - && d != next_depth 122 - { 123 - return Err(WalkError::MstError(MstError::WrongDepth { 124 - depth: d, 125 - expected: next_depth, 126 - })); 127 - } 128 - 129 - let mut entries = 0; 130 - let mut links = Vec::new(); 131 - for thing in node.things { 132 - if thing.is_record() { 133 - entries += 1; 134 - } else { 135 - links.push(thing); 136 - } 137 - } 138 - self.todo.push(links); 139 - if entries > 0 { 140 - *counts.entry(entries).or_default() += 1; 141 - if entries > 10_000 { 142 - eprintln!("whoa, found a {}-entry node", entries); 143 - } 144 - } 145 - } 146 - } 87 + todo: vec![root_node.things], 147 88 } 148 - 149 - Ok(counts) 150 89 } 151 90 152 91 pub fn viz(