Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

fmt

phil 3d85059c 02732591

+224 -220
+4 -6
examples/read-file/main.rs
··· 1 1 extern crate repo_stream; 2 - use std::convert::Infallible; 3 2 use clap::Parser; 4 3 use futures::TryStreamExt; 5 4 use iroh_car::CarReader; 5 + use std::convert::Infallible; 6 6 use std::path::PathBuf; 7 7 8 8 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 36 36 // let stream = Box::pin(reader.stream()); 37 37 let stream = std::pin::pin!(reader.stream()); 38 38 39 - let (commit, v) = repo_stream::drive::Vehicle::init( 40 - root, 41 - stream, 42 - |block| Ok::<_, Infallible>(block.len()), 43 - ).await?; 39 + let (commit, v) = 40 + repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 41 + .await?; 44 42 let mut record_stream = std::pin::pin!(v.stream()); 45 43 46 44 log::info!("got commit: {commit:?}");
+18 -12
src/drive.rs
··· 104 104 commit = Some(c); 105 105 break; // inner while 106 106 } else { 107 - blocks.insert(cid, if Node::could_be(&data) { 108 - MaybeProcessedBlock::Raw(data) 109 - } else { 110 - MaybeProcessedBlock::Processed(process(&data)) 111 - }); 107 + blocks.insert( 108 + cid, 109 + if Node::could_be(&data) { 110 + MaybeProcessedBlock::Raw(data) 111 + } else { 112 + MaybeProcessedBlock::Processed(process(&data)) 113 + }, 114 + ); 112 115 } 113 116 } 114 117 ··· 133 136 .await 134 137 .map_err(|e| DriveError::CarBlockError(e.into()))? 135 138 { 136 - self.blocks.insert(cid, if Node::could_be(&data) { 137 - MaybeProcessedBlock::Raw(data) 138 - } else { 139 - MaybeProcessedBlock::Processed((self.process)(&data)) 140 - }); 139 + self.blocks.insert( 140 + cid, 141 + if Node::could_be(&data) { 142 + MaybeProcessedBlock::Raw(data) 143 + } else { 144 + MaybeProcessedBlock::Processed((self.process)(&data)) 145 + }, 146 + ); 141 147 if cid == cid_needed { 142 148 return Ok(()); 143 149 } 144 - }; 150 + } 145 151 146 152 // if we never found the block 147 - return Err(DriveError::MissingBlock(cid_needed)); 153 + Err(DriveError::MissingBlock(cid_needed)) 148 154 } 149 155 150 156 pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> {
+2 -3
src/mst.rs
··· 67 67 0xA2, // map length 2 (for "l" and "e" keys) 68 68 0x61, // text length 1 69 69 b'e', // "e" before "l" because map keys have to be lex-sorted 70 - // 0x8?: "e" contains an array (0x8 nibble) of some length (low nib) 70 + // 0x8?: "e" contains an array (0x8 nibble) of some length (low nib) 71 71 ]; 72 72 let bytes = bytes.as_ref(); 73 73 bytes.starts_with(&NODE_FINGERPRINT) 74 - // && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false) 74 + // && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false) 75 75 } 76 76 } 77 - 78 77 79 78 impl Node { 80 79 /// Check if a node has any entries
+200 -199
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use std::error::Error; 4 - use crate::mst::Node; 5 3 use crate::drive::{MaybeProcessedBlock, ProcRes}; 4 + use crate::mst::Node; 6 5 use ipld_core::cid::Cid; 7 6 use std::collections::HashMap; 7 + use std::error::Error; 8 8 use std::fmt; 9 9 10 10 #[derive(Debug, thiserror::Error)] ··· 227 227 }; 228 228 let rkey = rkey.clone(); 229 229 let data = match data { 230 - MaybeProcessedBlock::Raw(data) => process(&data), 230 + MaybeProcessedBlock::Raw(data) => process(data), 231 231 MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()), 232 - MaybeProcessedBlock::Processed(_e) => 233 - return Err(Trip::RecordFailedProcessing("booo".into())), // TODO 232 + MaybeProcessedBlock::Processed(_e) => { 233 + return Err(Trip::RecordFailedProcessing("booo".into())); 234 + } // TODO 234 235 }; 235 236 236 237 // found node, make sure we remember ··· 255 256 .parse() 256 257 .unwrap() 257 258 } 258 - // fn cid2() -> Cid { 259 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 260 - // .parse() 261 - // .unwrap() 262 - // } 263 - // fn cid3() -> Cid { 264 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 265 - // .parse() 266 - // .unwrap() 267 - // } 268 - // fn cid4() -> Cid { 269 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 270 - // .parse() 271 - // .unwrap() 272 - // } 273 - // fn cid5() -> Cid { 274 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 275 - // .parse() 276 - // .unwrap() 277 - // } 278 - // fn cid6() -> Cid { 279 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 280 - // .parse() 281 - // .unwrap() 282 - // } 283 - // fn cid7() -> Cid { 284 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 285 - // .parse() 286 - // .unwrap() 287 - // } 288 - // fn cid8() -> Cid { 289 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 290 - // .parse() 291 - // .unwrap() 292 - // } 293 - // fn cid9() -> Cid { 294 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 295 - // .parse() 296 - // .unwrap() 297 - // } 259 + // fn cid2() -> Cid { 260 + // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 261 + // .parse() 262 + // .unwrap() 263 + // } 264 + // fn cid3() -> Cid { 265 + // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 266 + // .parse() 267 + // .unwrap() 268 + // } 269 + // fn cid4() -> Cid { 270 + // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 271 + // .parse() 272 + // .unwrap() 273 + // } 274 + // fn cid5() -> Cid { 275 + // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 276 + // .parse() 277 + // .unwrap() 278 + // } 279 + // fn cid6() -> Cid { 280 + // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 281 + // .parse() 282 + // .unwrap() 283 + // } 284 + // fn cid7() -> Cid { 285 + // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 286 + // .parse() 287 + // .unwrap() 288 + // } 289 + // fn cid8() -> Cid { 290 + // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 291 + // .parse() 292 + // .unwrap() 293 + // } 294 + // fn cid9() -> Cid { 295 + // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 296 + // .parse() 297 + // .unwrap() 298 + // } 298 299 299 300 #[test] 300 301 fn test_next_from_node_empty() { ··· 316 317 assert_eq!(action_node.next(), Some(Need::Node(cid1()))); 317 318 } 318 319 319 - // #[test] 320 - // fn test_needs_from_node_just_one_record() { 321 - // let node = Node { 322 - // left: None, 323 - // entries: vec![Entry { 324 - // keysuffix: "asdf".into(), 325 - // prefix_len: 0, 326 - // value: cid1(), 327 - // tree: None, 328 - // }], 329 - // }; 330 - // assert_eq!( 331 - // needs_from_node(node).unwrap(), 332 - // vec![Need::Record { 333 - // rkey: "asdf".into(), 334 - // cid: cid1(), 335 - // },] 336 - // ); 337 - // } 320 + // #[test] 321 + // fn test_needs_from_node_just_one_record() { 322 + // let node = Node { 323 + // left: None, 324 + // entries: vec![Entry { 325 + // keysuffix: "asdf".into(), 326 + // prefix_len: 0, 327 + // value: cid1(), 328 + // tree: None, 329 + // }], 330 + // }; 331 + // assert_eq!( 332 + // needs_from_node(node).unwrap(), 333 + // vec![Need::Record { 334 + // rkey: "asdf".into(), 335 + // cid: cid1(), 336 + // },] 337 + // ); 338 + // } 338 339 339 - // #[test] 340 - // fn test_needs_from_node_two_records() { 341 - // let node = Node { 342 - // left: None, 343 - // entries: vec![ 344 - // Entry { 345 - // keysuffix: "asdf".into(), 346 - // prefix_len: 0, 347 - // value: cid1(), 348 - // tree: None, 349 - // }, 350 - // Entry { 351 - // keysuffix: "gh".into(), 352 - // prefix_len: 2, 353 - // value: cid2(), 354 - // tree: None, 355 - // }, 356 - // ], 357 - // }; 358 - // assert_eq!( 359 - // needs_from_node(node).unwrap(), 360 - // vec![ 361 - // Need::Record { 362 - // rkey: "asdf".into(), 363 - // cid: cid1(), 364 - // }, 365 - // Need::Record { 366 - // rkey: "asgh".into(), 367 - // cid: cid2(), 368 - // }, 369 - // ] 370 - // ); 371 - // } 340 + // #[test] 341 + // fn test_needs_from_node_two_records() { 342 + // let node = Node { 343 + // left: None, 344 + // entries: vec![ 345 + // Entry { 346 + // keysuffix: "asdf".into(), 347 + // prefix_len: 0, 348 + // value: cid1(), 349 + // tree: None, 350 + // }, 351 + // Entry { 352 + // keysuffix: "gh".into(), 353 + // prefix_len: 2, 354 + // value: cid2(), 355 + // tree: None, 356 + // }, 357 + // ], 358 + // }; 359 + // assert_eq!( 360 + // needs_from_node(node).unwrap(), 361 + // vec![ 362 + // Need::Record { 363 + // rkey: "asdf".into(), 364 + // cid: cid1(), 365 + // }, 366 + // Need::Record { 367 + // rkey: "asgh".into(), 368 + // cid: cid2(), 369 + // }, 370 + // ] 371 + // ); 372 + // } 372 373 373 - // #[test] 374 - // fn test_needs_from_node_with_both() { 375 - // let node = Node { 376 - // left: None, 377 - // entries: vec![Entry { 378 - // keysuffix: "asdf".into(), 379 - // prefix_len: 0, 380 - // value: cid1(), 381 - // tree: Some(cid2()), 382 - // }], 383 - // }; 384 - // assert_eq!( 385 - // needs_from_node(node).unwrap(), 386 - // vec![ 387 - // Need::Record { 388 - // rkey: "asdf".into(), 389 - // cid: cid1(), 390 - // }, 391 - // Need::Node(cid2()), 392 - // ] 393 - // ); 394 - // } 374 + // #[test] 375 + // fn test_needs_from_node_with_both() { 376 + // let node = Node { 377 + // left: None, 378 + // entries: vec![Entry { 379 + // keysuffix: "asdf".into(), 380 + // prefix_len: 0, 381 + // value: cid1(), 382 + // tree: Some(cid2()), 383 + // }], 384 + // }; 385 + // assert_eq!( 386 + // needs_from_node(node).unwrap(), 387 + // vec![ 388 + // Need::Record { 389 + // rkey: "asdf".into(), 390 + // cid: cid1(), 391 + // }, 392 + // Need::Node(cid2()), 393 + // ] 394 + // ); 395 + // } 395 396 396 - // #[test] 397 - // fn test_needs_from_node_left_and_record() { 398 - // let node = Node { 399 - // left: Some(cid1()), 400 - // entries: vec![Entry { 401 - // keysuffix: "asdf".into(), 402 - // prefix_len: 0, 403 - // value: cid2(), 404 - // tree: None, 405 - // }], 406 - // }; 407 - // assert_eq!( 408 - // needs_from_node(node).unwrap(), 409 - // vec![ 410 - // Need::Node(cid1()), 411 - // Need::Record { 412 - // rkey: "asdf".into(), 413 - // cid: cid2(), 414 - // }, 415 - // ] 416 - // ); 417 - // } 397 + // #[test] 398 + // fn test_needs_from_node_left_and_record() { 399 + // let node = Node { 400 + // left: Some(cid1()), 401 + // entries: vec![Entry { 402 + // keysuffix: "asdf".into(), 403 + // prefix_len: 0, 404 + // value: cid2(), 405 + // tree: None, 406 + // }], 407 + // }; 408 + // assert_eq!( 409 + // needs_from_node(node).unwrap(), 410 + // vec![ 411 + // Need::Node(cid1()), 412 + // Need::Record { 413 + // rkey: "asdf".into(), 414 + // cid: cid2(), 415 + // }, 416 + // ] 417 + // ); 418 + // } 418 419 419 - // #[test] 420 - // fn test_needs_from_full_node() { 421 - // let node = Node { 422 - // left: Some(cid1()), 423 - // entries: vec![ 424 - // Entry { 425 - // keysuffix: "asdf".into(), 426 - // prefix_len: 0, 427 - // value: cid2(), 428 - // tree: Some(cid3()), 429 - // }, 430 - // Entry { 431 - // keysuffix: "ghi".into(), 432 - // prefix_len: 1, 433 - // value: cid4(), 434 - // tree: Some(cid5()), 435 - // }, 436 - // Entry { 437 - // keysuffix: "jkl".into(), 438 - // prefix_len: 2, 439 - // value: cid6(), 440 - // tree: Some(cid7()), 441 - // }, 442 - // Entry { 443 - // keysuffix: "mno".into(), 444 - // prefix_len: 4, 445 - // value: cid8(), 446 - // tree: Some(cid9()), 447 - // }, 448 - // ], 449 - // }; 450 - // assert_eq!( 451 - // needs_from_node(node).unwrap(), 452 - // vec![ 453 - // Need::Node(cid1()), 454 - // Need::Record { 455 - // rkey: "asdf".into(), 456 - // cid: cid2(), 457 - // }, 458 - // Need::Node(cid3()), 459 - // Need::Record { 460 - // rkey: "aghi".into(), 461 - // cid: cid4(), 462 - // }, 463 - // Need::Node(cid5()), 464 - // Need::Record { 465 - // rkey: "agjkl".into(), 466 - // cid: cid6(), 467 - // }, 468 - // Need::Node(cid7()), 469 - // Need::Record { 470 - // rkey: "agjkmno".into(), 471 - // cid: cid8(), 472 - // }, 473 - // Need::Node(cid9()), 474 - // ] 475 - // ); 476 - // } 420 + // #[test] 421 + // fn test_needs_from_full_node() { 422 + // let node = Node { 423 + // left: Some(cid1()), 424 + // entries: vec![ 425 + // Entry { 426 + // keysuffix: "asdf".into(), 427 + // prefix_len: 0, 428 + // value: cid2(), 429 + // tree: Some(cid3()), 430 + // }, 431 + // Entry { 432 + // keysuffix: "ghi".into(), 433 + // prefix_len: 1, 434 + // value: cid4(), 435 + // tree: Some(cid5()), 436 + // }, 437 + // Entry { 438 + // keysuffix: "jkl".into(), 439 + // prefix_len: 2, 440 + // value: cid6(), 441 + // tree: Some(cid7()), 442 + // }, 443 + // Entry { 444 + // keysuffix: "mno".into(), 445 + // prefix_len: 4, 446 + // value: cid8(), 447 + // tree: Some(cid9()), 448 + // }, 449 + // ], 450 + // }; 451 + // assert_eq!( 452 + // needs_from_node(node).unwrap(), 453 + // vec![ 454 + // Need::Node(cid1()), 455 + // Need::Record { 456 + // rkey: "asdf".into(), 457 + // cid: cid2(), 458 + // }, 459 + // Need::Node(cid3()), 460 + // Need::Record { 461 + // rkey: "aghi".into(), 462 + // cid: cid4(), 463 + // }, 464 + // Need::Node(cid5()), 465 + // Need::Record { 466 + // rkey: "agjkl".into(), 467 + // cid: cid6(), 468 + // }, 469 + // Need::Node(cid7()), 470 + // Need::Record { 471 + // rkey: "agjkmno".into(), 472 + // cid: cid8(), 473 + // }, 474 + // Need::Node(cid9()), 475 + // ] 476 + // ); 477 + // } 477 478 }