···38383939 let mut n = 0;
4040 let mut zeros = 0;
4141- let (mut rx, worker) = driver.rx(512).await?;
4141+ let mut rx = driver.to_channel(512);
42424343 log::debug!("walking...");
4444- while let Some(pairs) = rx.recv().await {
4444+ while let Some(r) = rx.recv().await {
4545+ let pairs = r?;
4546 n += pairs.len();
4647 for (_, block) in pairs {
4748 zeros += block.into_iter().filter(|&b| b == b'0').count()
4849 }
4950 }
5050- log::debug!("done walking! joining...");
5151-5252- worker.await.unwrap().unwrap();
5353-5454- log::debug!("joined.");
5151+ log::debug!("done walking!");
55525653 // log::info!("now is the time to check mem...");
5754 // tokio::time::sleep(std::time::Duration::from_secs(22)).await;
+89-79
src/drive.rs
···77use serde::{Deserialize, Serialize};
88use std::collections::HashMap;
99use std::convert::Infallible;
1010-use tokio::io::AsyncRead;
1010+use tokio::{io::AsyncRead, sync::mpsc};
11111212use crate::mst::{Commit, Node};
1313use crate::walk::{Step, WalkError, Walker};
···4444 #[error("extra bytes remained after decoding")]
4545 ExtraGarbage,
4646}
4747+4848+pub type BlockChunk<T> = Vec<(String, T)>;
47494850#[derive(Debug, Clone, Serialize, Deserialize)]
4951pub enum MaybeProcessedBlock<T> {
···161163 }
162164}
163165166166+/// The core driver between the block stream and MST walker
167167+///
168168+/// In the future, PDSs will export CARs in a stream-friendly order that will
169169+/// enable processing them with tiny memory overhead. But that future is not
170170+/// here yet.
171171+///
172172+/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
173173+/// optimistic stream features: we load all block first, then walk the MST.
174174+///
175175+/// This makes things much simpler: we only need to worry about spilling to disk
176176+/// in one place, and we always have a reasonable expecatation about how much
177177+/// work the init function will do. We can drop the CAR reader before walking,
178178+/// so the sync/async boundaries become a little easier to work around.
179179+#[derive(Debug)]
180180+pub struct MemDriver<T: Processable> {
181181+ blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
182182+ walker: Walker,
183183+ process: fn(Vec<u8>) -> T,
184184+}
185185+186186+impl<T: Processable> MemDriver<T> {
187187+ /// Manually step through the record outputs
188188+ pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
189189+ let mut out = Vec::with_capacity(n);
190190+ for _ in 0..n {
191191+ // walk as far as we can until we run out of blocks or find a record
192192+ match self.walker.step(&mut self.blocks, self.process)? {
193193+ Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
194194+ Step::Finish => break,
195195+ Step::Step { rkey, data } => {
196196+ out.push((rkey, data));
197197+ continue;
198198+ }
199199+ };
200200+ }
201201+202202+ if out.is_empty() {
203203+ Ok(None)
204204+ } else {
205205+ Ok(Some(out))
206206+ }
207207+ }
208208+}
209209+164210/// a paritally memory-loaded car file that needs disk spillover to continue
165211pub struct BigCar<R: AsyncRead + Unpin, T: Processable> {
166212 car: CarReader<R>,
···204250 })
205251 .await??;
206252207207- let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
253253+ let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
208254209255 let store_worker = tokio::task::spawn_blocking(move || {
210256 let mut writer = store.get_writer()?;
···290336 pub async fn next_chunk(
291337 mut self,
292338 n: usize,
293293- ) -> Result<(Self, Option<Vec<(String, T)>>), DriveError> {
339339+ ) -> Result<(Self, Option<BlockChunk<T>>), DriveError> {
294340 let mut out = Vec::with_capacity(n);
295341 (self, out) = tokio::task::spawn_blocking(move || {
296342 let store = self.store;
···321367 }
322368 }
323369324324- pub async fn rx(
370370+ fn read_tx_blocking(
325371 mut self,
326372 n: usize,
327327- ) -> Result<
328328- (
329329- tokio::sync::mpsc::Receiver<Vec<(String, T)>>,
330330- tokio::task::JoinHandle<Result<(), DriveError>>,
331331- ),
332332- DriveError,
333333- > {
334334- let (tx, rx) = tokio::sync::mpsc::channel::<Vec<(String, T)>>(1);
373373+ tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
374374+ ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
375375+ let mut reader = match self.store.get_reader() {
376376+ Ok(r) => r,
377377+ Err(e) => return tx.blocking_send(Err(e.into())),
378378+ };
335379336336- // sketch: this worker is going to be allowed to execute without a join handle
337337- // ...should we return the join handle here so the caller at least knows about it?
338338- // yes probably for error handling?? (orrr put errors in the channel)
339339- let worker = tokio::task::spawn_blocking(move || {
340340- let mut reader = self.store.get_reader()?;
380380+ loop {
381381+ let mut out: BlockChunk<T> = Vec::with_capacity(n);
341382342342- loop {
343343- let mut out = Vec::with_capacity(n);
383383+ for _ in 0..n {
384384+ // walk as far as we can until we run out of blocks or find a record
344385345345- for _ in 0..n {
346346- // walk as far as we can until we run out of blocks or find a record
347347- match self.walker.disk_step(&mut reader, self.process)? {
348348- Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
349349- Step::Finish => break,
350350- Step::Step { rkey, data } => {
351351- out.push((rkey, data));
352352- continue;
353353- }
354354- };
355355- }
386386+ let step = match self.walker.disk_step(&mut reader, self.process) {
387387+ Ok(s) => s,
388388+ Err(e) => return tx.blocking_send(Err(e.into())),
389389+ };
356390357357- if out.is_empty() {
358358- break;
359359- }
360360- tx.blocking_send(out)
361361- .map_err(|_| DriveError::ChannelSendError)?;
391391+ match step {
392392+ Step::Missing(cid) => {
393393+ return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
394394+ }
395395+ Step::Finish => return Ok(()),
396396+ Step::Step { rkey, data } => {
397397+ out.push((rkey, data));
398398+ continue;
399399+ }
400400+ };
362401 }
363402364364- drop(reader); // cannot outlive store
365365- Ok(())
366366- }); // await later
403403+ if out.is_empty() {
404404+ break;
405405+ }
406406+ tx.blocking_send(Ok(out))?;
407407+ }
367408368368- Ok((rx, worker))
409409+ Ok(())
369410 }
370370-}
371411372372-/// The core driver between the block stream and MST walker
373373-///
374374-/// In the future, PDSs will export CARs in a stream-friendly order that will
375375-/// enable processing them with tiny memory overhead. But that future is not
376376-/// here yet.
377377-///
378378-/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
379379-/// optimistic stream features: we load all block first, then walk the MST.
380380-///
381381-/// This makes things much simpler: we only need to worry about spilling to disk
382382-/// in one place, and we always have a reasonable expecatation about how much
383383-/// work the init function will do. We can drop the CAR reader before walking,
384384-/// so the sync/async boundaries become a little easier to work around.
385385-#[derive(Debug)]
386386-pub struct MemDriver<T: Processable> {
387387- blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
388388- walker: Walker,
389389- process: fn(Vec<u8>) -> T,
390390-}
412412+ pub fn to_channel(self, n: usize) -> mpsc::Receiver<Result<BlockChunk<T>, DriveError>> {
413413+ let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
391414392392-impl<T: Processable> MemDriver<T> {
393393- /// Manually step through the record outputs
394394- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> {
395395- let mut out = Vec::with_capacity(n);
396396- for _ in 0..n {
397397- // walk as far as we can until we run out of blocks or find a record
398398- match self.walker.step(&mut self.blocks, self.process)? {
399399- Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
400400- Step::Finish => break,
401401- Step::Step { rkey, data } => {
402402- out.push((rkey, data));
403403- continue;
404404- }
405405- };
406406- }
415415+ // sketch: this worker is going to be allowed to execute without a join handle
416416+ tokio::task::spawn_blocking(move || {
417417+ if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
418418+ log::debug!("big car reader exited early due to dropped receiver channel");
419419+ }
420420+ });
407421408408- if out.is_empty() {
409409- Ok(None)
410410- } else {
411411- Ok(Some(out))
412412- }
422422+ rx
413423 }
414424}