Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

copy pasta impls and failing tests for filtering

phil 34266979 66f83f22

+232
+54
constellation/src/storage/mem_store.rs
··· 278 278 }) 279 279 } 280 280 281 + fn get_links_from_dids( 282 + &self, 283 + target: &str, 284 + collection: &str, 285 + path: &str, 286 + limit: u64, 287 + until: Option<u64>, 288 + dids: &[Did], 289 + ) -> Result<PagedAppendingCollection<RecordId>> { 290 + let data = self.0.lock().unwrap(); 291 + let Some(paths) = data.targets.get(&Target::new(target)) else { 292 + return Ok(PagedAppendingCollection { 293 + version: (0, 0), 294 + items: Vec::new(), 295 + next: None, 296 + }); 297 + }; 298 + let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 299 + return Ok(PagedAppendingCollection { 300 + version: (0, 0), 301 + items: Vec::new(), 302 + next: None, 303 + }); 304 + }; 305 + 306 + let total = did_rkeys.len(); 307 + let end = until 308 + .map(|u| std::cmp::min(u as usize, total)) 309 + .unwrap_or(total); 310 + let begin = end.saturating_sub(limit as usize); 311 + let next = if begin == 0 { None } else { Some(begin as u64) }; 312 + 313 + let alive = did_rkeys.iter().flatten().count(); 314 + let gone = total - alive; 315 + 316 + let items: Vec<_> = did_rkeys[begin..end] 317 + .iter() 318 + .rev() 319 + .flatten() 320 + .filter(|(did, _)| *data.dids.get(did).expect("did must be in dids")) 321 + .map(|(did, rkey)| RecordId { 322 + did: did.clone(), 323 + rkey: rkey.0.clone(), 324 + collection: collection.to_string(), 325 + }) 326 + .collect(); 327 + 328 + Ok(PagedAppendingCollection { 329 + version: (total as u64, gone as u64), 330 + items, 331 + next, 332 + }) 333 + } 334 + 281 335 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 282 336 let data = self.0.lock().unwrap(); 283 337 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new();
+113
constellation/src/storage/mod.rs
··· 69 69 until: Option<u64>, 70 70 ) -> Result<PagedAppendingCollection<Did>>; // TODO: reflect dedups in cursor 71 71 72 + fn get_links_from_dids( 73 + &self, 74 + target: &str, 75 + collection: &str, 76 + path: &str, 77 + limit: u64, 78 + until: Option<u64>, 79 + dids: &[Did], 80 + ) -> Result<PagedAppendingCollection<RecordId>>; 81 + 72 82 fn get_all_record_counts(&self, _target: &str) 73 83 -> Result<HashMap<String, HashMap<String, u64>>>; 74 84 ··· 758 768 } 759 769 ); 760 770 assert_stats(storage.get_stats()?, 5..=5, 1..=1, 5..=5); 771 + }); 772 + 773 + test_each_storage!(get_filtered_links, |storage| { 774 + let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &[Did("did:plc:linker".to_string())])?; 775 + assert_eq!( 776 + links, 777 + PagedAppendingCollection { 778 + version: (0, 0), 779 + items: vec![], 780 + next: None, 781 + } 782 + ); 783 + 784 + storage.push( 785 + &ActionableEvent::CreateLinks { 786 + record_id: RecordId { 787 + did: format!("did:plc:linker").into(), 788 + collection: "app.t.c".into(), 789 + rkey: "asdf".into(), 790 + }, 791 + links: vec![CollectedLink { 792 + target: Link::Uri("a.com".into()), 793 + path: ".abc.uri".into(), 794 + }], 795 + }, 796 + 0, 797 + )?; 798 + 799 + let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &[Did("did:plc:linker".to_string())])?; 800 + assert_eq!( 801 + links, 802 + PagedAppendingCollection { 803 + version: (1, 0), 804 + items: vec![ 805 + RecordId { 806 + did: "did:plc:linker".into(), 807 + collection: "app.t.c".into(), 808 + rkey: "asdf".into(), 809 + }, 810 + ], 811 + next: None, 812 + } 813 + ); 814 + 815 + let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &[Did("did:plc:someone-else".to_string())])?; 816 + assert_eq!( 817 + links, 818 + PagedAppendingCollection { 819 + version: (0, 0), 820 + items: vec![], 821 + next: None, 822 + } 823 + ); 824 + 825 + storage.push( 826 + &ActionableEvent::CreateLinks { 827 + record_id: RecordId { 828 + did: format!("did:plc:linker").into(), 829 + collection: "app.t.c".into(), 830 + rkey: "asdf-2".into(), 831 + }, 832 + links: vec![CollectedLink { 833 + target: Link::Uri("a.com".into()), 834 + path: ".abc.uri".into(), 835 + }], 836 + }, 837 + 0, 838 + )?; 839 + storage.push( 840 + &ActionableEvent::CreateLinks { 841 + record_id: RecordId { 842 + did: format!("did:plc:someone-else").into(), 843 + collection: "app.t.c".into(), 844 + rkey: "asdf".into(), 845 + }, 846 + links: vec![CollectedLink { 847 + target: Link::Uri("a.com".into()), 848 + path: ".abc.uri".into(), 849 + }], 850 + }, 851 + 0, 852 + )?; 853 + 854 + let links = storage.get_links_from_dids("a.com", "app.t.c", ".abc.uri", 2, None, &[Did("did:plc:linker".to_string())])?; 855 + assert_eq!( 856 + links, 857 + PagedAppendingCollection { 858 + version: (2, 0), 859 + items: vec![ 860 + RecordId { 861 + did: "did:plc:linker".into(), 862 + collection: "app.t.c".into(), 863 + rkey: "asdf".into(), 864 + }, 865 + RecordId { 866 + did: "did:plc:linker".into(), 867 + collection: "app.t.c".into(), 868 + rkey: "asdf-2".into(), 869 + }, 870 + ], 871 + next: None, 872 + } 873 + ); 761 874 }); 762 875 763 876 test_each_storage!(get_links_exact_multiple, |storage| {
+65
constellation/src/storage/rocks_store.rs
··· 977 977 }) 978 978 } 979 979 980 + fn get_links_from_dids( 981 + &self, 982 + target: &str, 983 + collection: &str, 984 + path: &str, 985 + limit: u64, 986 + until: Option<u64>, 987 + dids: &[Did], 988 + ) -> Result<PagedAppendingCollection<RecordId>> { 989 + let target_key = TargetKey( 990 + Target(target.to_string()), 991 + Collection(collection.to_string()), 992 + RPath(path.to_string()), 993 + ); 994 + 995 + let Some(target_id) = self.target_id_table.get_id_val(&self.db, &target_key)? else { 996 + return Ok(PagedAppendingCollection { 997 + version: (0, 0), 998 + items: Vec::new(), 999 + next: None, 1000 + }); 1001 + }; 1002 + 1003 + let linkers = self.get_target_linkers(&target_id)?; 1004 + 1005 + let (alive, gone) = linkers.count(); 1006 + let total = alive + gone; 1007 + let end = until.map(|u| std::cmp::min(u, total)).unwrap_or(total) as usize; 1008 + let begin = end.saturating_sub(limit as usize); 1009 + let next = if begin == 0 { None } else { Some(begin as u64) }; 1010 + 1011 + let did_id_rkeys = linkers.0[begin..end].iter().rev().collect::<Vec<_>>(); 1012 + 1013 + let mut items = Vec::with_capacity(did_id_rkeys.len()); 1014 + // TODO: use get-many (or multi-get or whatever it's called) 1015 + for (did_id, rkey) in did_id_rkeys { 1016 + if did_id.is_empty() { 1017 + continue; 1018 + } 1019 + if let Some(did) = self.did_id_table.get_val_from_id(&self.db, did_id.0)? { 1020 + let Some(DidIdValue(_, active)) = self.did_id_table.get_id_val(&self.db, &did)? 1021 + else { 1022 + eprintln!("failed to look up did_value from did_id {did_id:?}: {did:?}: data consistency bug?"); 1023 + continue; 1024 + }; 1025 + if !active { 1026 + continue; 1027 + } 1028 + items.push(RecordId { 1029 + did, 1030 + collection: collection.to_string(), 1031 + rkey: rkey.0.clone(), 1032 + }); 1033 + } else { 1034 + eprintln!("failed to look up did from did_id {did_id:?}"); 1035 + } 1036 + } 1037 + 1038 + Ok(PagedAppendingCollection { 1039 + version: (total, gone), 1040 + items, 1041 + next, 1042 + }) 1043 + } 1044 + 980 1045 fn get_all_record_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 981 1046 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new(); 982 1047 for (target_key, target_id) in self.iter_targets_for_target(&Target(target.into())) {