Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

undo some explicit iterator drop things

phil 6c7ef0f6 89f8041d

+62 -77
+62 -77
ufos/src/store.rs
··· 65 65 * } 66 66 * Collection and rollup meta: 67 67 * ["seen_by_js_cursor_collection"|js_cursor|collection] => u64 // batched total, gets cleaned up by rollup 68 - * ["total_by_collection"|collection] => [u64, js_cursor] // live total requires scanning seen_by_collection after js_cursor 69 - * ["hour_by_collection"|hour(u64)|collection] => u64 // rollup: computed by helper task based on dirty collections 68 + * ["total_by_collection"|collection] => [u64, js_cursor] // rollup; live total requires scanning seen_by_collection after js_cursor 69 + * ["hour_by_collection"|hour(u64)|collection] => u64 // rollup from seen_by_js_cursor_collection 70 70 * Samples: 71 71 * ["by_collection"|collection|js_cursor] => [did|rkey|record] 72 72 * ["by_id"|did|collection|rkey|js_cursor] => [] // required to support deletes; did first prefix for account deletes. ··· 258 258 tokio::task::spawn_blocking(move || { 259 259 let mut output = Vec::new(); 260 260 261 - ////// ITER 262 - { 263 - for pair in partition.prefix(&prefix).rev().take(limit) { 264 - let (k_bytes, v_bytes) = pair?; 265 - let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into(); 266 - let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into(); 267 - output.push(CreateRecord { 268 - did, 269 - rkey, 270 - record, 271 - cursor, 272 - }) 273 - } 261 + for pair in partition.prefix(&prefix).rev().take(limit) { 262 + let (k_bytes, v_bytes) = pair?; 263 + let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into(); 264 + let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into(); 265 + output.push(CreateRecord { 266 + did, 267 + rkey, 268 + record, 269 + cursor, 270 + }) 274 271 } 275 272 Ok(output) 276 273 }) ··· 400 397 let mut scanned = 0; 401 398 let mut rolled = 0; 402 399 403 - ////// ITER 404 - 405 - { 406 - for pair in partition.range(range) { 407 - let (key_bytes, value_bytes) = pair?; 408 - let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 409 - let val = db_complete::<ByCursorSeenValue>(&value_bytes)?; 400 + for pair in partition.range(range) { 401 + let (key_bytes, value_bytes) = pair?; 402 + let key = db_complete::<ByCursorSeenKey>(&key_bytes)?; 403 + let val = db_complete::<ByCursorSeenValue>(&value_bytes)?; 410 404 411 - if *key.collection() == collection { 412 - let SeenCounter(n) = val; 413 - collection_total += n; 414 - rolled += 1; 415 - } 416 - scanned += 1; 405 + if *key.collection() == collection { 406 + let SeenCounter(n) = val; 407 + collection_total += n; 408 + rolled += 1; 417 409 } 410 + scanned += 1; 418 411 } 419 412 420 413 eprintln!("scanned: {scanned}, rolled: {rolled}"); ··· 553 546 554 547 log::trace!("delete_record: iterate over up to current cursor..."); 555 548 556 - ////////// ITER 557 - 549 + for (i, pair) in self 550 + .partition 551 + .range(key_prefix_bytes..key_limit) 552 + .enumerate() 558 553 { 559 - for (i, pair) in self 560 - .partition 561 - .range(key_prefix_bytes..key_limit) 562 - .enumerate() 563 - { 564 - log::trace!("delete_record iter {i}: found"); 565 - // find all (hopefully 1) 566 - let (key_bytes, _) = pair?; 567 - let key = db_complete::<ByIdKey>(&key_bytes)?; 568 - let found_cursor = key.cursor(); 569 - if found_cursor > cursor { 570 - // we are *only* allowed to delete records that came before the record delete event 571 - // log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}"); 572 - panic!("wtf, found newer version than cursor limit we tried to set."); 573 - // break; 574 - } 554 + log::trace!("delete_record iter {i}: found"); 555 + // find all (hopefully 1) 556 + let (key_bytes, _) = pair?; 557 + let key = db_complete::<ByIdKey>(&key_bytes)?; 558 + let found_cursor = key.cursor(); 559 + if found_cursor > cursor { 560 + // we are *only* allowed to delete records that came before the record delete event 561 + // log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}"); 562 + panic!("wtf, found newer version than cursor limit we tried to set."); 563 + // break; 564 + } 575 565 576 - // remove the by_id entry 577 - db_batch.remove(&self.partition, key_bytes); 566 + // remove the by_id entry 567 + db_batch.remove(&self.partition, key_bytes); 578 568 579 - // remove its record sample 580 - let by_collection_key_bytes = 581 - ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?; 582 - db_batch.remove(&self.partition, by_collection_key_bytes); 569 + // remove its record sample 570 + let by_collection_key_bytes = 571 + ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?; 572 + db_batch.remove(&self.partition, by_collection_key_bytes); 583 573 584 - items_removed += 1; 585 - } 574 + items_removed += 1; 586 575 } 587 576 588 577 // if items_removed > 1 { ··· 612 601 613 602 let mut items_added = 0; 614 603 615 - ////////// ITER 604 + for pair in self.partition.prefix(&key_prefix_bytes) { 605 + let (key_bytes, _) = pair?; 616 606 617 - { 618 - for pair in self.partition.prefix(&key_prefix_bytes) { 619 - let (key_bytes, _) = pair?; 607 + let (_, collection, _rkey, found_cursor) = 608 + db_complete::<ByIdKey>(&key_bytes)?.into(); 609 + if found_cursor > cursor { 610 + log::trace!( 611 + "delete account: found (and ignoring) newer records than the delete event??" 612 + ); 613 + continue; 614 + } 620 615 621 - let (_, collection, _rkey, found_cursor) = 622 - db_complete::<ByIdKey>(&key_bytes)?.into(); 623 - if found_cursor > cursor { 624 - log::trace!( 625 - "delete account: found (and ignoring) newer records than the delete event??" 626 - ); 627 - continue; 628 - } 629 - 630 - // remove the by_id entry 631 - db_batch.remove(&self.partition, key_bytes); 616 + // remove the by_id entry 617 + db_batch.remove(&self.partition, key_bytes); 632 618 633 - // remove its record sample 634 - let by_collection_key_bytes = 635 - ByCollectionKey::new(collection, found_cursor).to_db_bytes()?; 636 - db_batch.remove(&self.partition, by_collection_key_bytes); 619 + // remove its record sample 620 + let by_collection_key_bytes = 621 + ByCollectionKey::new(collection, found_cursor).to_db_bytes()?; 622 + db_batch.remove(&self.partition, by_collection_key_bytes); 637 623 638 - items_added += 1; 639 - if items_added >= MAX_BATCHED_RW_ITEMS { 640 - return Ok((items_added, false)); // there might be more records but we've done enough for this batch 641 - } 624 + items_added += 1; 625 + if items_added >= MAX_BATCHED_RW_ITEMS { 626 + return Ok((items_added, false)); // there might be more records but we've done enough for this batch 642 627 } 643 628 } 644 629