Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

batch rw tasks

phil 11d62fd8 ac5105c2

+69 -52
+69 -52
ufos/src/store.rs
··· 17 17 use std::time::{Duration, Instant}; 18 18 use tokio::{sync::mpsc::Receiver, time::sleep}; 19 19 20 - const MAX_BATCHED_DELETE_ACCOUNT_RECORDS: usize = 32; // there are probably some efficiency gains for higher, at cost of more memory 20 + /// Commit the RW batch immediately if this nubmer of events have been read off the mod queue 21 + const MAX_BATCHED_RW_EVENTS: usize = 96; 22 + 23 + /// Commit the RW batch immediately if this number of records is reached 24 + /// 25 + /// there are probably some efficiency gains for higher, at cost of more memory. 26 + /// interestingly, this kind of sets a priority weight for the RW loop: 27 + /// - doing more work whenever scheduled means getting more CPU time in general 28 + const MAX_BATCHED_RW_ITEMS: usize = 32; 21 29 22 30 /** 23 31 * data format, roughly: ··· 132 140 pub async fn rw_loop(&self) -> anyhow::Result<()> { 133 141 // TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time. 134 142 loop { 135 - sleep(Duration::from_secs_f64(0.001)).await; 143 + sleep(Duration::from_secs_f64(0.001)).await; // todo: interval rate-limit instead 136 144 let keyspace = self.keyspace.clone(); 137 145 let partition = self.partition.clone(); 138 146 tokio::task::spawn_blocking(move || -> anyhow::Result<()> { 139 147 let mod_cursor = get_static::<ModCursorKey, ModCursorValue>(&partition)? 140 148 .unwrap_or(Cursor::from_start()); 141 149 let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?; 142 - let Some(pair) = partition.range(range.clone()).next() else { 143 - // eprintln!("mod queue empty."); 144 - return Ok(()); 145 - }; 150 + 151 + let mut db_batch = keyspace.batch(); 152 + let mut batched_rw_items = 0; 146 153 147 - let (key_bytes, val_bytes) = pair?; 148 - let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) { 149 - Ok(k) => k, 150 - Err(EncodingError::WrongStaticPrefix(_, _)) => { 151 - panic!("wsp: mod queue empty."); 154 + for (i, pair) in partition.range(range.clone()).enumerate() { 155 + if i >= MAX_BATCHED_RW_EVENTS { 156 + break; 152 157 } 153 - otherwise => otherwise?, 154 - }; 158 + 159 + let (key_bytes, val_bytes) = pair?; 160 + let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) { 161 + Ok(k) => k, 162 + Err(EncodingError::WrongStaticPrefix(_, _)) => { 163 + panic!("wsp: mod queue empty."); 164 + } 165 + otherwise => otherwise?, 166 + }; 155 167 156 - let mod_value: ModQueueItemValue = 157 - db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?; 168 + let mod_value: ModQueueItemValue = 169 + db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?; 158 170 159 - DBWriter { 160 - keyspace, 161 - partition, 171 + batched_rw_items += DBWriter { 172 + keyspace: keyspace.clone(), 173 + partition: partition.clone(), 174 + } 175 + .write_rw(&mut db_batch, mod_key, mod_value)?; 176 + 177 + if batched_rw_items >= MAX_BATCHED_RW_ITEMS { 178 + break; 179 + } 162 180 } 163 - .write_rw(mod_key, mod_value)?; 181 + 182 + db_batch.commit()?; 164 183 Ok(()) 165 184 }) 166 185 .await??; ··· 304 323 305 324 let mut scanned = 0; 306 325 let mut rolled = 0; 307 - for (i, pair) in partition.range(range).enumerate() { 326 + for pair in partition.range(range) { 308 327 let (key_bytes, value_bytes) = pair?; 309 328 let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 310 329 let val = db_complete::<ByCursorSeenValue>(&value_bytes)?; 311 - if i >= 20 { 312 - eprintln!("{key:?} => {val:?}") 313 - } 330 + 314 331 if *key.collection() == collection { 315 332 let SeenCounter(n) = val; 316 333 collection_total += n; ··· 338 355 339 356 fn write_rw( 340 357 self, 358 + db_batch: &mut FjallBatch, 341 359 mod_key: ModQueueItemKey, 342 360 mod_value: ModQueueItemValue, 343 - ) -> anyhow::Result<()> { 344 - let mut db_batch = self.keyspace.batch(); 345 - 361 + ) -> anyhow::Result<usize> { 346 362 // update the current rw cursor to this item (atomically with the batch if it succeeds) 347 363 let mod_cursor: Cursor = (&mod_key).into(); 348 - insert_batch_static::<ModCursorKey>(&mut db_batch, &self.partition, mod_cursor.clone())?; 364 + insert_batch_static::<ModCursorKey>(db_batch, &self.partition, mod_cursor.clone())?; 349 365 350 - let completed = match mod_value { 366 + let items_modified = match mod_value { 351 367 ModQueueItemValue::DeleteAccount(did) => { 352 - self.delete_account(&mut db_batch, mod_cursor, did)? 368 + let (items, finished) = self.delete_account(db_batch, mod_cursor, did)?; 369 + if finished { 370 + // only remove the queued rw task if we have actually completed its account removal work 371 + remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 372 + } 373 + items 353 374 } 354 375 ModQueueItemValue::DeleteRecord(did, collection, rkey) => { 355 - self.delete_record(&mut db_batch, mod_cursor, did, collection, rkey)?; 356 - true 376 + remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 377 + self.delete_record(db_batch, mod_cursor, did, collection, rkey)? 357 378 } 358 379 ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => { 359 - self.update_record(&mut db_batch, mod_cursor, did, collection, rkey, record)?; 360 - true 380 + remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?; 381 + self.update_record(db_batch, mod_cursor, did, collection, rkey, record)? 361 382 } 362 383 }; 363 - if completed { 364 - // remove the queued rw task so that we'll continue with the *next* one (atomically with batch) 365 - remove_batch::<ModQueueItemKey>(&mut db_batch, &self.partition, mod_key)?; 366 - } 367 - Ok(db_batch.commit()?) 384 + Ok(items_modified) 368 385 } 369 386 370 387 fn update_record( ··· 377 394 record: serde_json::Value, 378 395 ) -> anyhow::Result<usize> { 379 396 // 1. delete any existing versions older than us 380 - let n_deleted = self.delete_record( 397 + let items_deleted = self.delete_record( 381 398 db_batch, 382 399 cursor.clone(), 383 400 did.clone(), ··· 388 405 // 2. insert the updated version, at our new cursor 389 406 self.add_record(db_batch, cursor, did, collection, rkey, record)?; 390 407 391 - Ok(n_deleted) 408 + let items_total = items_deleted + 1; 409 + Ok(items_total) 392 410 } 393 411 394 412 fn delete_record( ··· 402 420 let key_prefix_bytes = 403 421 ByIdKey::record_prefix(did, collection.clone(), rkey).to_db_bytes()?; 404 422 405 - let mut n_removed = 0; 423 + let mut items_removed = 0; 406 424 for pair in self.partition.prefix(&key_prefix_bytes) { 407 425 // find all (hopefully 1) 408 426 let (key_bytes, _) = pair?; ··· 422 440 ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?; 423 441 db_batch.remove(&self.partition, by_collection_key_bytes); 424 442 425 - n_removed += 1; 443 + items_removed += 1; 426 444 } 427 445 428 - if n_removed > 1 { 429 - eprintln!("odd, removed {n_removed} records for one record removal:"); 446 + if items_removed > 1 { 447 + eprintln!("odd, removed {items_removed} records for one record removal:"); 430 448 for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() { 431 449 // find all (hopefully 1) 432 450 let (key_bytes, _) = pair?; ··· 439 457 eprintln!(" {i}: key {key:?}"); 440 458 } 441 459 } 442 - Ok(n_removed) 460 + Ok(items_removed) 443 461 } 444 462 445 463 fn delete_account( ··· 447 465 db_batch: &mut FjallBatch, 448 466 cursor: Cursor, 449 467 did: Did, 450 - ) -> anyhow::Result<bool> { 468 + ) -> anyhow::Result<(usize, bool)> { 451 469 let key_prefix_bytes = ByIdKey::did_prefix(did).to_db_bytes()?; 452 470 453 - let mut n_found = 0; 471 + let mut items_added = 0; 454 472 for pair in self.partition.prefix(&key_prefix_bytes) { 455 473 let (key_bytes, _) = pair?; 456 474 ··· 470 488 ByCollectionKey::new(collection, found_cursor).to_db_bytes()?; 471 489 db_batch.remove(&self.partition, by_collection_key_bytes); 472 490 473 - n_found += 1; 474 - if n_found >= MAX_BATCHED_DELETE_ACCOUNT_RECORDS { 475 - return Ok(false); // there might be more records but we've done enough for this batch 491 + items_added += 1; 492 + if items_added >= MAX_BATCHED_RW_ITEMS { 493 + return Ok((items_added, false)); // there might be more records but we've done enough for this batch 476 494 } 477 495 } 478 496 479 - // eprintln!("removed {n_found} account records."); 480 - Ok(true) 497 + Ok((items_added, true)) 481 498 } 482 499 483 500 fn add_record_creates(