Fast and robust atproto CAR file processing in rust
14
fork

Configure Feed

Select the types of activity you want to include in your feed.

dump writes through a channel to a thread

phil 1e3371dc 97f4f2ee

+28 -18
+1 -1
Cargo.toml
··· 19 19 serde_bytes = "0.11.19" 20 20 serde_ipld_dagcbor = "0.6.4" 21 21 thiserror = "2.0.17" 22 - tokio = { version = "1.47.1", features = ["rt"] } 22 + tokio = { version = "1.47.1", features = ["rt", "sync"] } 23 23 24 24 [dev-dependencies] 25 25 clap = { version = "4.5.48", features = ["derive"] }
+27 -17
src/drive.rs
··· 215 215 .await 216 216 .unwrap()?; 217 217 218 + let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 219 + 220 + let access_worker = tokio::task::spawn_blocking(move || { 221 + let mut writer = access.get_writer()?; 222 + 223 + while let Some(chunk) = rx.blocking_recv() { 224 + let kvs = chunk 225 + .into_iter() 226 + .map(|(k, v)| (k.to_bytes(), encode(v).unwrap())); 227 + writer.put_many(kvs)?; 228 + } 229 + 230 + drop(writer); // cannot outlive access 231 + Ok::<_, DiskDriveError>(access) 232 + }); // await later 233 + 218 234 // dump the rest to disk (in chunks) 235 + log::debug!("dumping the rest of the stream..."); 219 236 loop { 220 - let mut chunk = vec![]; 221 237 let mut mem_size = 0; 238 + let mut chunk = vec![]; 222 239 loop { 223 240 let Some((cid, data)) = self.car.next_block().await? else { 224 241 break; ··· 239 256 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 240 257 chunk.push((cid, maybe_processed)); 241 258 if mem_size >= self.max_size { 259 + // soooooo if we're setting the db cache to max_size and then letting 260 + // multiple chunks in the queue that are >= max_size, then at any time 261 + // we might be using some multiple of max_size? 242 262 break; 243 263 } 244 264 } 245 265 if chunk.is_empty() { 246 266 break; 247 267 } 248 - 249 - // move access in and back out so we can manage lifetimes 250 - // dump mem blocks into the store 251 - access = tokio::task::spawn_blocking(move || { 252 - let mut writer = access.get_writer()?; 253 - 254 - let kvs = chunk 255 - .into_iter() 256 - .map(|(k, v)| (k.to_bytes(), encode(v).unwrap())); 268 + tx.send(chunk).await.unwrap(); 269 + } 270 + drop(tx); 271 + log::debug!("done. waiting for worker to finish..."); 257 272 258 - writer.put_many(kvs)?; 273 + access = access_worker.await.unwrap()?; 259 274 260 - drop(writer); // cannot outlive access 261 - Ok::<_, DiskDriveError>(access) 262 - }) 263 - .await 264 - .unwrap()?; // TODO 265 - } 275 + log::debug!("worker finished."); 266 276 267 277 let commit = self.commit.ok_or(DiskDriveError::MissingCommit)?; 268 278