tracks lexicons and how many times they appeared on the jetstream
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(server): make encoding items actually par (first collect all items and then par iter the encoding)

dusk 48460bf4 11924417

+18 -16
+5 -6
server/src/db/handle.rs
··· 218 218 Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "no items are in queue").into()) 219 219 } 220 220 221 - pub fn encode_block(&self, item_count: usize) -> AppResult<Block> { 221 + pub fn take_block_items(&self, item_count: usize) -> Vec<Item> { 222 222 let mut buf = self.buf.lock(); 223 223 let end = item_count.min(buf.len()); 224 - Self::encode_block_from_items( 225 - buf.drain(..end).map(|event| { 224 + buf.drain(..end) 225 + .map(|event| { 226 226 Item::new( 227 227 event.timestamp, 228 228 &NsidHit { 229 229 deleted: event.deleted, 230 230 }, 231 231 ) 232 - }), 233 - item_count, 234 - ) 232 + }) 233 + .collect() 235 234 } 236 235 }
+13 -10
server/src/db/mod.rs
··· 207 207 chunk 208 208 .into_iter() 209 209 .map(|(i, handle, max_block_size)| { 210 - handle 211 - .encode_block(max_block_size) 212 - .inspect(|block| { 213 - tracing::info!( 214 - "{}: encoded block with {} items", 215 - handle.nsid(), 216 - block.written, 217 - ) 218 - }) 219 - .map(|block| (i, block, handle)) 210 + (i, handle.take_block_items(max_block_size), handle) 211 + }) 212 + .collect::<Vec<_>>() 213 + .into_par_iter() 214 + .map(|(i, items, handle)| { 215 + let count = items.len(); 216 + let block = LexiconHandle::encode_block_from_items(items, count)?; 217 + tracing::info!( 218 + "{}: encoded block with {} items", 219 + handle.nsid(), 220 + block.written, 221 + ); 222 + AppResult::Ok((i, block, handle)) 220 223 }) 221 224 .collect::<Result<Vec<_>, _>>() 222 225 })