Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

memstore: links as distinct dids

phil b600dd4d 382a125f

+199 -5
+4
constellation/readme.md
··· 136 136 - [x] either count or estimate the total number of links added (distinct from link targets) 137 137 - [x] jetstream: don't crash on connection refused (retry * backoff) 138 138 - [x] allow cors requests (ie. atproto-browser. (but it's really meant for backends)) 139 + - [~] api: get distinct linking dids (https://bsky.app/profile/bnewbold.net/post/3lhhzejv7zc2h) 140 + - [~] endpoint for count 141 + - [~] endpoint for listing them 142 + - [ ] add to exploratory /all endpoint 139 143 140 144 cache 141 145 - [ ] set api response headers
+86 -4
constellation/src/storage/mem_store.rs
··· 2 2 use anyhow::Result; 3 3 use constellation::{ActionableEvent, Did, RecordId}; 4 4 use links::CollectedLink; 5 - use std::collections::HashMap; 5 + use std::collections::{HashMap, HashSet}; 6 6 use std::sync::{Arc, Mutex}; 7 7 8 8 // hopefully-correct simple hashmap version, intended only for tests to verify disk impl ··· 131 131 let Some(paths) = data.targets.get(&Target::new(target)) else { 132 132 return Ok(0); 133 133 }; 134 - let Some(dids) = paths.get(&Source::new(collection, path)) else { 134 + let Some(linkers) = paths.get(&Source::new(collection, path)) else { 135 135 return Ok(0); 136 136 }; 137 - Ok(dids.iter().flatten().count().try_into()?) 137 + Ok(linkers.iter().flatten().count() as u64) 138 + } 139 + 140 + fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64> { 141 + let data = self.0.lock().unwrap(); 142 + let Some(paths) = data.targets.get(&Target::new(target)) else { 143 + return Ok(0); 144 + }; 145 + let Some(linkers) = paths.get(&Source::new(collection, path)) else { 146 + return Ok(0); 147 + }; 148 + Ok(linkers 149 + .iter() 150 + .flatten() 151 + .map(|(did, _)| did) 152 + .collect::<HashSet<_>>() 153 + .len() as u64) 138 154 } 139 155 140 156 fn get_links( ··· 190 206 }) 191 207 } 192 208 209 + fn get_distinct_dids( 210 + &self, 211 + target: &str, 212 + collection: &str, 213 + path: &str, 214 + limit: u64, 215 + until: Option<u64>, 216 + ) -> Result<PagedAppendingCollection<Did>> { 217 + let data = self.0.lock().unwrap(); 218 + let Some(paths) = data.targets.get(&Target::new(target)) else { 219 + return Ok(PagedAppendingCollection { 220 + version: (0, 0), 221 + items: Vec::new(), 222 + next: None, 223 + }); 224 + }; 225 + let Some(did_rkeys) = paths.get(&Source::new(collection, path)) else { 226 + return Ok(PagedAppendingCollection { 227 + version: (0, 0), 228 + items: Vec::new(), 229 + next: None, 230 + }); 231 + }; 232 + 233 + let dids: Vec<Option<Did>> = { 234 + let mut seen = HashSet::new(); 235 + did_rkeys 236 + .iter() 237 + .map(|o| { 238 + o.clone().and_then(|(did, _)| { 239 + if seen.contains(&did) { 240 + None 241 + } else { 242 + seen.insert(did.clone()); 243 + Some(did) 244 + } 245 + }) 246 + }) 247 + .collect() 248 + }; 249 + 250 + let total = dids.len(); 251 + let end = until 252 + .map(|u| std::cmp::min(u as usize, total)) 253 + .unwrap_or(total); 254 + let begin = end.saturating_sub(limit as usize); 255 + let next = if begin == 0 { None } else { Some(begin as u64) }; 256 + 257 + let alive = dids.iter().flatten().count(); 258 + let gone = total - alive; 259 + 260 + let items: Vec<Did> = dids[begin..end] 261 + .iter() 262 + .rev() 263 + .flatten() 264 + .filter(|did| *data.dids.get(did).expect("did must be in dids")) 265 + .cloned() 266 + .collect(); 267 + 268 + Ok(PagedAppendingCollection { 269 + version: (total as u64, gone as u64), 270 + items, 271 + next, 272 + }) 273 + } 274 + 193 275 fn get_all_counts(&self, target: &str) -> Result<HashMap<String, HashMap<String, u64>>> { 194 276 let data = self.0.lock().unwrap(); 195 277 let mut out: HashMap<String, HashMap<String, u64>> = HashMap::new(); 196 278 if let Some(asdf) = data.targets.get(&Target::new(target)) { 197 279 for (Source { collection, path }, linkers) in asdf { 198 - let count = linkers.iter().flatten().count().try_into()?; 280 + let count = linkers.iter().flatten().count() as u64; 199 281 out.entry(collection.to_string()) 200 282 .or_default() 201 283 .insert(path.to_string(), count);
+109 -1
constellation/src/storage/mod.rs
··· 1 1 use anyhow::Result; 2 - use constellation::{ActionableEvent, RecordId}; 2 + use constellation::{ActionableEvent, Did, RecordId}; 3 3 use std::collections::HashMap; 4 4 5 5 pub mod mem_store; ··· 48 48 pub trait LinkReader: Clone + Send + Sync + 'static { 49 49 fn get_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 50 50 51 + fn get_distinct_did_count(&self, target: &str, collection: &str, path: &str) -> Result<u64>; 52 + 51 53 fn get_links( 52 54 &self, 53 55 target: &str, ··· 57 59 until: Option<u64>, 58 60 ) -> Result<PagedAppendingCollection<RecordId>>; 59 61 62 + fn get_distinct_dids( 63 + &self, 64 + target: &str, 65 + collection: &str, 66 + path: &str, 67 + limit: u64, 68 + until: Option<u64>, 69 + ) -> Result<PagedAppendingCollection<Did>>; 70 + 60 71 fn get_all_counts(&self, _target: &str) -> Result<HashMap<String, HashMap<String, u64>>>; 61 72 62 73 /// assume all stats are estimates, since exact counts are very challenging for LSMs ··· 124 135 )?, 125 136 0 126 137 ); 138 + assert_eq!(storage.get_distinct_did_count("", "", "")?, 0); 127 139 assert_eq!( 128 140 storage.get_links("a.com", "app.t.c", ".abc.uri", 100, None)?, 129 141 PagedAppendingCollection { ··· 132 144 next: None, 133 145 } 134 146 ); 147 + assert_eq!( 148 + storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 149 + PagedAppendingCollection { 150 + version: (0, 0), 151 + items: vec![], 152 + next: None, 153 + } 154 + ); 135 155 assert_eq!(storage.get_all_counts("bad-example.com")?, HashMap::new()); 156 + 136 157 assert_stats(storage.get_stats()?, 0..=0, 0..=0, 0..=0); 137 158 }); 138 159 ··· 152 173 0, 153 174 )?; 154 175 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 176 + assert_eq!( 177 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 178 + 1 179 + ); 155 180 assert_eq!(storage.get_count("bad.com", "app.t.c", ".abc.uri")?, 0); 156 181 assert_eq!(storage.get_count("e.com", "app.t.c", ".bad.uri")?, 0); 182 + assert_eq!( 183 + storage.get_distinct_did_count("e.com", "app.t.c", ".bad.uri")?, 184 + 0 185 + ); 157 186 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 158 187 }); 159 188 ··· 222 251 0, 223 252 )?; 224 253 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 254 + assert_eq!( 255 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 256 + 1 257 + ); 225 258 226 259 // add another link from this user 227 260 storage.push( ··· 239 272 0, 240 273 )?; 241 274 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 275 + assert_eq!( 276 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 277 + 1 278 + ); 242 279 243 280 // add a link from someone else 244 281 storage.push( ··· 256 293 0, 257 294 )?; 258 295 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 3); 296 + assert_eq!( 297 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 298 + 2 299 + ); 259 300 260 301 // aaaand delete the first one again 261 302 storage.push( ··· 267 308 0, 268 309 )?; 269 310 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 311 + assert_eq!( 312 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 313 + 2 314 + ); 270 315 assert_stats(storage.get_stats()?, 2..=2, 1..=1, 2..=2); 271 316 }); 272 317 ··· 287 332 0, 288 333 )?; 289 334 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 335 + assert_eq!( 336 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 337 + 1 338 + ); 290 339 291 340 // create the second link (same user, different rkey) 292 341 storage.push( ··· 304 353 0, 305 354 )?; 306 355 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 2); 356 + assert_eq!( 357 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 358 + 1 359 + ); 307 360 308 361 // aaaand delete the first link 309 362 storage.push( ··· 315 368 0, 316 369 )?; 317 370 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 371 + assert_eq!( 372 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 373 + 1 374 + ); 318 375 assert_stats(storage.get_stats()?, 1..=1, 1..=1, 1..=1); 319 376 }); 320 377 ··· 401 458 0, 402 459 )?; 403 460 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 1); 461 + assert_eq!( 462 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 463 + 1 464 + ); 404 465 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 1); 466 + assert_eq!( 467 + storage.get_distinct_did_count("f.com", "app.t.c", ".xyz[].uri")?, 468 + 1 469 + ); 405 470 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 1); 471 + assert_eq!( 472 + storage.get_distinct_did_count("g.com", "app.t.c", ".xyz[].uri")?, 473 + 1 474 + ); 406 475 407 476 storage.push( 408 477 &ActionableEvent::DeleteRecord(RecordId { ··· 413 482 0, 414 483 )?; 415 484 assert_eq!(storage.get_count("e.com", "app.t.c", ".abc.uri")?, 0); 485 + assert_eq!( 486 + storage.get_distinct_did_count("e.com", "app.t.c", ".abc.uri")?, 487 + 0 488 + ); 416 489 assert_eq!(storage.get_count("f.com", "app.t.c", ".xyz[].uri")?, 0); 417 490 assert_eq!(storage.get_count("g.com", "app.t.c", ".xyz[].uri")?, 0); 418 491 assert_stats(storage.get_stats()?, 1..=1, 3..=3, 0..=0); ··· 562 635 collection: "app.t.c".into(), 563 636 rkey: "asdf".into(), 564 637 }], 638 + next: None, 639 + } 640 + ); 641 + assert_eq!( 642 + storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 100, None)?, 643 + PagedAppendingCollection { 644 + version: (1, 0), 645 + items: vec!["did:plc:asdf".into()], 565 646 next: None, 566 647 } 567 648 ); ··· 586 667 )?; 587 668 } 588 669 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, None)?; 670 + let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, None)?; 589 671 assert_eq!( 590 672 links, 591 673 PagedAppendingCollection { ··· 605 687 next: Some(3), 606 688 } 607 689 ); 690 + assert_eq!( 691 + dids, 692 + PagedAppendingCollection { 693 + version: (5, 0), 694 + items: vec!["did:plc:asdf-5".into(), "did:plc:asdf-4".into()], 695 + next: Some(3), 696 + } 697 + ); 608 698 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 699 + let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 609 700 assert_eq!( 610 701 links, 611 702 PagedAppendingCollection { ··· 625 716 next: Some(1), 626 717 } 627 718 ); 719 + assert_eq!( 720 + dids, 721 + PagedAppendingCollection { 722 + version: (5, 0), 723 + items: vec!["did:plc:asdf-3".into(), "did:plc:asdf-2".into()], 724 + next: Some(1), 725 + } 726 + ); 628 727 let links = storage.get_links("a.com", "app.t.c", ".abc.uri", 2, links.next)?; 728 + let dids = storage.get_distinct_dids("a.com", "app.t.c", ".abc.uri", 2, dids.next)?; 629 729 assert_eq!( 630 730 links, 631 731 PagedAppendingCollection { ··· 635 735 collection: "app.t.c".into(), 636 736 rkey: "asdf".into(), 637 737 },], 738 + next: None, 739 + } 740 + ); 741 + assert_eq!( 742 + dids, 743 + PagedAppendingCollection { 744 + version: (5, 0), 745 + items: vec!["did:plc:asdf-1".into()], 638 746 next: None, 639 747 } 640 748 );