Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

implement distinct dids for rocksdb

phil ad544d4d b600dd4d

+94 -3
+2 -2
constellation/src/storage/mod.rs
··· 12 12 13 13 #[derive(Debug, PartialEq)] 14 14 pub struct PagedAppendingCollection<T> { 15 - pub version: (u64, u64), // (collection length, deleted item count) 15 + pub version: (u64, u64), // (collection length, deleted item count) // TODO: change to (total, active)? since dedups isn't "deleted" 16 16 pub items: Vec<T>, 17 17 pub next: Option<u64>, 18 18 } ··· 66 66 path: &str, 67 67 limit: u64, 68 68 until: Option<u64>, 69 - ) -> Result<PagedAppendingCollection<Did>>; 69 + ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 70 70 71 71 fn get_all_counts(&self, _target: &str) -> Result<HashMap<String, HashMap<String, u64>>>; 72 72
+92 -1
constellation/src/storage/rocks_store.rs
··· 8 8 MultiThreaded, Options, PrefixRange, ReadOptions, WriteBatch, 9 9 }; 10 10 use serde::{Deserialize, Serialize}; 11 - use std::collections::HashMap; 11 + use std::collections::{HashMap, HashSet}; 12 12 use std::io::Read; 13 13 use std::marker::PhantomData; 14 14 use std::path::Path; ··· 349 349 }; 350 350 _vr(&linkers_bytes) 351 351 } 352 + /// zero out every duplicate did. bit of a hack, looks the same as deleted, but eh 353 + fn get_distinct_target_linkers(&self, target_id: &TargetId) -> Result<TargetLinkers> { 354 + let mut seen = HashSet::new(); 355 + let mut linkers = self.get_target_linkers(target_id)?; 356 + for (did_id, _) in linkers.0.iter_mut() { 357 + if seen.contains(did_id) { 358 + did_id.0 = 0; 359 + } else { 360 + seen.insert(*did_id); 361 + } 362 + } 363 + Ok(linkers) 364 + } 352 365 fn merge_target_linker( 353 366 &self, 354 367 batch: &mut WriteBatch, ··· 634 647 } 635 648 } 636 649 650 + fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 651 + let target_key = TargetKey( 652 + Target(target.to_string()), 653 + Collection(collection.to_string()), 654 + RPath(path.to_string()), 655 + ); 656 + if let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? { 657 + let TargetLinkers(alives) = self.get_target_linkers(&target_id)?; 658 + Ok(alives // TODO: maybe make this a method on TargetLinkers? 659 + .iter() 660 + .filter_map(|(DidId(id), _)| if *id == 0 { None } else { Some(id) }) 661 + .collect::<HashSet<_>>() 662 + .len() as u64) 663 + } else { 664 + Ok(0) 665 + } 666 + } 667 + 637 668 fn get_links( 638 669 &self, 639 670 target: &str, ··· 686 717 collection: collection.to_string(), 687 718 rkey: rkey.0.clone(), 688 719 }); 720 + } else { 721 + eprintln!("failed to look up did from did_id {did_id:?}"); 722 + } 723 + } 724 + 725 + Ok(PagedAppendingCollection { 726 + version: (total, gone), 727 + items, 728 + next, 729 + }) 730 + } 731 + 732 + fn get_distinct_dids( 733 + &self, 734 + target: &str, 735 + collection: &str, 736 + path: &str, 737 + limit: u64, 738 + until: Option<u64>, 739 + ) -> Result<PagedAppendingCollection<Did>> { 740 + let target_key = TargetKey( 741 + Target(target.to_string()), 742 + Collection(collection.to_string()), 743 + RPath(path.to_string()), 744 + ); 745 + 746 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 747 + return Ok(PagedAppendingCollection { 748 + version: (0, 0), 749 + items: Vec::new(), 750 + next: None, 751 + }); 752 + }; 753 + 754 + let linkers = self.get_distinct_target_linkers(&target_id)?; 755 + 756 + let (alive, gone) = linkers.count(); 757 + let total = alive + gone; 758 + let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 759 + let begin = end.saturating_sub(limit as usize); 760 + let next = if begin == 0 { None } else { Some(begin as u64) }; 761 + 762 + let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 763 + 764 + let mut items = Vec::with_capacity(did_id_rkeys.len()); 765 + // TODO: use get-many (or multi-get or whatever it's called) 766 + for (did_id, _) in did_id_rkeys { 767 + if did_id.is_empty() { 768 + continue; 769 + } 770 + if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 771 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 772 + else { 773 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 774 + continue; 775 + }; 776 + if !active { 777 + continue; 778 + } 779 + items.push(did); 689 780 } else { 690 781 eprintln!("failed to look up did from did_id {did_id:?}"); 691 782 }