A Wrapped / Replay like for teal.fm and rocksky.app (currently on hiatus)
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

speed up imports by loading whilst unpacking

Mia c347c57d 12545ef5

+21 -8
+21 -8
src/mbz/replica.rs
··· 7 7 use std::str::FromStr; 8 8 use std::sync::Arc; 9 9 use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt}; 10 + use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; 10 11 use tokio_util::compat::FuturesAsyncReadCompatExt; 11 12 use tracing::{debug, info}; 12 13 ··· 130 131 _ => eyre::bail!("tried to load a dump with no defined tables!"), 131 132 }; 132 133 134 + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); 135 + let db = self.db.clone(); 136 + let d = dir.to_path_buf(); 137 + 133 138 let compressed = replica_get_dump(&self.client, dump).await?; 134 - replica_extract(compressed, wanted, dir).await?; 135 - replica_import_dump(&self.db, wanted, dir).await 139 + let bh = tokio::task::spawn_blocking(move || replica_import_dump(&db, rx, &d)); 140 + 141 + replica_extract(compressed, wanted, dir, tx).await?; 142 + bh.await??; 143 + 144 + Ok(()) 136 145 } 137 146 138 147 fn connect_and_run_sql_batch(&self, batch: &str) -> duckdb::Result<()> { ··· 182 191 Ok(dump) 183 192 } 184 193 185 - async fn replica_extract<T>(compressed: T, wanted: &[&str], tmpdir: &Path) -> eyre::Result<u32> 194 + async fn replica_extract<T>(compressed: T, wanted: &[&str], tmpdir: &Path, tx: UnboundedSender<String>) -> eyre::Result<u32> 186 195 where 187 196 T: AsyncRead + AsyncBufRead + Unpin, 188 197 { ··· 215 224 } else if let Ok(path) = path.strip_prefix("mbdump") 216 225 && wanted.contains(&path.to_str().unwrap()) 217 226 { 218 - debug!("unpacking {} ({} bytes)", path.display(), hdr.size()?); 227 + let file = path.to_str().unwrap().to_owned(); 228 + debug!("unpacking {file} ({} bytes)", hdr.size()?); 219 229 entry.unpack_in(&tmpdir).await?; 230 + debug!("finished unpacking {file}"); 231 + tx.send(file)?; 220 232 } 221 233 } 222 234 ··· 224 236 } 225 237 226 238 /// Imports the downloaded dump into duckdb 227 - async fn replica_import_dump( 239 + fn replica_import_dump( 228 240 db: &DuckdbConnectionManager, 229 - wanted: &[&str], 241 + mut recv: UnboundedReceiver<String>, 230 242 dump: &Path, 231 243 ) -> eyre::Result<()> { 232 244 let db = db.connect()?; 233 245 let dump = dump.join("mbdump"); 234 246 235 - for table in wanted { 247 + while let Some(table) = recv.blocking_recv() { 236 248 debug!("importing dump - mbz.{table}"); 237 249 db.execute( 238 250 &format!("COPY mbz.{table} FROM ? (NULLSTR '\\N', QUOTE '')"), 239 - params![dump.join(table).to_string_lossy()], 251 + params![dump.join(&table).to_string_lossy()], 240 252 )?; 253 + debug!("finished importing dump mbz.{table}") 241 254 } 242 255 243 256 Ok(())