BlueSky & more on desktop lazurite.stormlightlabs.org/
tauri rust typescript bluesky appview atproto solid
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 997 lines 34 kB view raw
1use crate::actors::{actor_unavailable_message, classify_actor_unavailability}; 2use crate::actors::{ActorAvailability, ActorAvailabilityReason}; 3use crate::constellation::{BacklinksResponse, ConstellationClient, ConstellationLinkRecord}; 4use crate::error::{AppError, Result}; 5use crate::explorer; 6use crate::settings; 7use crate::state::AppState; 8use jacquard::api::app_bsky::actor::get_profile::GetProfile; 9use jacquard::api::app_bsky::actor::get_profiles::GetProfiles; 10use jacquard::api::app_bsky::graph::get_list::GetList; 11use jacquard::api::app_bsky::graph::get_relationships::{GetRelationships, GetRelationshipsOutputRelationshipsItem}; 12use jacquard::api::app_bsky::graph::get_starter_packs::GetStarterPacks; 13use jacquard::api::com_atproto::label::query_labels::QueryLabels; 14use jacquard::client::{Agent, UnauthenticatedSession}; 15use jacquard::identity::JacquardResolver; 16use jacquard::types::aturi::AtUri; 17use jacquard::types::did::Did; 18use jacquard::types::ident::AtIdentifier; 19use jacquard::xrpc::XrpcClient; 20use jacquard::IntoStatic; 21use serde::{Deserialize, Serialize}; 22use serde_json::Value; 23use std::collections::{BTreeMap, BTreeSet, HashMap}; 24use tauri_plugin_log::log; 25 26const LIST_MEMBERSHIP_PATH_TO_OTHER: &str = "list"; 27const BLOCK_COLLECTION: &str = "app.bsky.graph.block"; 28 29// TODO: this should be a source enum 30const LIST_MEMBERSHIP_SOURCE: &str = "app.bsky.graph.listitem:subject"; 31const BLOCK_SOURCE: &str = "app.bsky.graph.block:subject"; 32const STARTER_PACK_SOURCE: &str = "app.bsky.graph.starterpack:listItemsSample[].subject"; 33const LIKES_SOURCE: &str = "app.bsky.feed.like:subject.uri"; 34const REPOSTS_SOURCE: &str = "app.bsky.feed.repost:subject.uri"; 35const REPLIES_SOURCE: &str = "app.bsky.feed.post:reply.parent.uri"; 36const QUOTES_SOURCE: &str = "app.bsky.feed.post:embed.record.uri"; 37 38// TODO: this should be a Limit enum 39const PUBLIC_BATCH_LIMIT: usize = 25; 40const ACCOUNT_LIST_PAGE_LIMIT: u32 = 100; 41const ACCOUNT_LIST_MAX_ITEMS: usize = 200; 42const STARTER_PACK_LIMIT: u32 = 100; 43const STARTER_PACK_MAX_ITEMS: usize = 200; 44const BACKLINK_PREVIEW_LIMIT: u32 = 25; 45const BLOCK_PREVIEW_LIMIT: u32 = 50; 46const LABEL_LIMIT: i64 = 100; 47 48type PublicClient = Agent<UnauthenticatedSession<JacquardResolver>>; 49 50#[derive(Debug, Clone, Serialize)] 51#[serde(rename_all = "camelCase")] 52pub struct AccountListsResult { 53 pub total: usize, 54 pub lists: Vec<Value>, 55 pub truncated: bool, 56} 57 58#[derive(Debug, Clone, Serialize)] 59#[serde(rename_all = "camelCase")] 60pub struct AccountLabelsResult { 61 pub labels: Vec<Value>, 62 pub source_profiles: BTreeMap<String, Value>, 63 pub cursor: Option<String>, 64} 65 66#[derive(Debug, Clone, Serialize)] 67#[serde(rename_all = "camelCase")] 68pub struct DidProfileItem { 69 pub did: String, 70 pub availability: ActorAvailability, 71 pub profile: Option<Value>, 72 pub unavailable_reason: Option<ActorAvailabilityReason>, 73 pub unavailable_message: Option<String>, 74} 75 76#[derive(Debug, Clone, Serialize)] 77#[serde(rename_all = "camelCase")] 78pub struct AccountBlockedByResult { 79 pub total: u64, 80 pub items: Vec<DidProfileItem>, 81 pub cursor: Option<String>, 82} 83 84#[derive(Debug, Clone, Serialize)] 85#[serde(rename_all = "camelCase")] 86pub struct AccountBlockingResult { 87 pub items: Vec<AccountBlockingItem>, 88 pub cursor: Option<String>, 89} 90 91#[derive(Debug, Clone, Serialize)] 92#[serde(rename_all = "camelCase")] 93pub struct AccountBlockingItem { 94 pub uri: String, 95 pub cid: String, 96 pub subject_did: String, 97 pub created_at: Option<String>, 98 pub availability: ActorAvailability, 99 pub value: Value, 100 pub profile: Option<Value>, 101 pub unavailable_reason: Option<ActorAvailabilityReason>, 102 pub unavailable_message: Option<String>, 103} 104 105#[derive(Debug, Clone, Serialize)] 106#[serde(rename_all = "camelCase")] 107pub struct AccountStarterPacksResult { 108 pub total: u64, 109 pub starter_packs: Vec<Value>, 110 pub truncated: bool, 111} 112 113#[derive(Debug, Clone, Serialize)] 114#[serde(rename_all = "camelCase")] 115pub struct RecordBacklinksResult { 116 pub likes: BacklinkGroup, 117 pub reposts: BacklinkGroup, 118 pub replies: BacklinkGroup, 119 pub quotes: BacklinkGroup, 120} 121 122#[derive(Debug, Clone, Serialize)] 123#[serde(rename_all = "camelCase")] 124pub struct BacklinkGroup { 125 pub total: u64, 126 pub records: Vec<BacklinkRecordItem>, 127 pub cursor: Option<String>, 128} 129 130#[derive(Debug, Clone, Serialize)] 131#[serde(rename_all = "camelCase")] 132pub struct BacklinkRecordItem { 133 pub uri: String, 134 pub did: String, 135 pub collection: String, 136 pub rkey: String, 137 pub profile: Option<Value>, 138 pub value: Option<Value>, 139} 140 141#[derive(Debug, Clone, Deserialize)] 142#[serde(rename_all = "camelCase")] 143struct RepoListRecordsOutput { 144 cursor: Option<String>, 145 #[serde(default)] 146 records: Vec<RepoRecord>, 147} 148 149#[derive(Debug, Clone, Deserialize)] 150#[serde(rename_all = "camelCase")] 151struct RepoRecord { 152 uri: String, 153 cid: String, 154 value: Value, 155} 156 157pub async fn get_account_lists(did: String, state: &AppState) -> Result<AccountListsResult> { 158 let normalized_did = normalize_did(&did)?; 159 let client = constellation_client(state)?; 160 let counts = match client 161 .get_many_to_many_counts( 162 normalized_did.clone(), 163 LIST_MEMBERSHIP_SOURCE.to_string(), 164 LIST_MEMBERSHIP_PATH_TO_OTHER.to_string(), 165 ) 166 .await 167 { 168 Ok(counts) => counts, 169 Err(error) if should_skip_missing_resource(&error) => { 170 log_missing_resource("account lists", &normalized_did, &error); 171 return Ok(AccountListsResult { total: 0, lists: Vec::new(), truncated: false }); 172 } 173 Err(error) => return Err(AppError::diagnostics("Couldn't load lists for this account.", error)), 174 }; 175 176 let mut list_uris = Vec::new(); 177 let mut cursor = None; 178 let mut truncated = false; 179 180 while list_uris.len() < ACCOUNT_LIST_MAX_ITEMS { 181 let response = match client 182 .get_many_to_many( 183 normalized_did.clone(), 184 LIST_MEMBERSHIP_SOURCE.to_string(), 185 LIST_MEMBERSHIP_PATH_TO_OTHER.to_string(), 186 Some(ACCOUNT_LIST_PAGE_LIMIT), 187 cursor.clone(), 188 ) 189 .await 190 { 191 Ok(response) => response, 192 Err(error) if should_skip_missing_resource(&error) => { 193 log_missing_resource("account lists", &normalized_did, &error); 194 break; 195 } 196 Err(error) => return Err(AppError::diagnostics("Couldn't load lists for this account.", error)), 197 }; 198 199 if response.items.is_empty() { 200 break; 201 } 202 203 for item in response.items { 204 if list_uris.len() >= ACCOUNT_LIST_MAX_ITEMS { 205 truncated = true; 206 break; 207 } 208 list_uris.push(item.other_subject); 209 } 210 211 match response.cursor { 212 Some(next_cursor) if list_uris.len() < ACCOUNT_LIST_MAX_ITEMS => cursor = Some(next_cursor), 213 Some(_) => { 214 truncated = true; 215 break; 216 } 217 None => break, 218 } 219 } 220 221 let unique_list_uris = dedupe_preserve_order(list_uris); 222 let lists = fetch_lists(&unique_list_uris).await?; 223 224 Ok(AccountListsResult { total: counts.counts_by_other_subject.len(), lists, truncated }) 225} 226 227pub async fn get_account_labels(did: String) -> Result<AccountLabelsResult> { 228 let normalized_did = normalize_did(&did)?; 229 let client = public_client(); 230 let output = client 231 .send( 232 QueryLabels::new() 233 .uri_patterns(vec![normalized_did.clone().into()]) 234 .limit(LABEL_LIMIT) 235 .build(), 236 ) 237 .await 238 .map_err(|error| AppError::diagnostics("Couldn't load labels for this account.", error))? 239 .into_output() 240 .map_err(|error| AppError::diagnostics("Couldn't read labels for this account.", error))? 241 .into_static(); 242 243 let labels = output 244 .labels 245 .iter() 246 .map(serde_json::to_value) 247 .collect::<std::result::Result<Vec<_>, _>>()?; 248 249 let source_dids = output 250 .labels 251 .iter() 252 .map(|label| label.src.to_string()) 253 .collect::<Vec<_>>(); 254 let source_profiles = fetch_profiles_map(&source_dids).await?; 255 256 Ok(AccountLabelsResult { labels, source_profiles, cursor: output.cursor.map(|cursor| cursor.to_string()) }) 257} 258 259pub async fn get_account_blocked_by( 260 did: String, limit: Option<u32>, cursor: Option<String>, state: &AppState, 261) -> Result<AccountBlockedByResult> { 262 let normalized_did = normalize_did(&did)?; 263 let client = constellation_client(state)?; 264 let response = match client 265 .get_backlinks( 266 normalized_did.clone(), 267 BLOCK_SOURCE.to_string(), 268 limit.or(Some(BLOCK_PREVIEW_LIMIT)), 269 cursor, 270 ) 271 .await 272 { 273 Ok(response) => response, 274 Err(error) if should_skip_missing_resource(&error) => { 275 return Ok(AccountBlockedByResult { total: 0, items: Vec::new(), cursor: None }); 276 } 277 Err(error) => { 278 return Err(AppError::diagnostics( 279 "Couldn't load the accounts blocking this profile.", 280 error, 281 )) 282 } 283 }; 284 285 let candidate_dids = extract_blocker_dids(&response.records); 286 let confirmed_dids = confirm_blocked_by(&normalized_did, &candidate_dids).await?; 287 let actor_states = fetch_actor_states(&confirmed_dids).await?; 288 let items = confirmed_dids 289 .into_iter() 290 .map(|entry_did| build_did_profile_item(entry_did.clone(), actor_states.get(&entry_did))) 291 .collect::<Vec<_>>(); 292 293 Ok(AccountBlockedByResult { total: response.total, items, cursor: response.cursor }) 294} 295 296pub async fn get_account_blocking(did: String, cursor: Option<String>) -> Result<AccountBlockingResult> { 297 let normalized_did = normalize_did(&did)?; 298 let output = match explorer::list_records(normalized_did.clone(), BLOCK_COLLECTION.to_string(), cursor).await { 299 Ok(output) => output, 300 Err(error) if should_skip_missing_resource(&error) => { 301 log_missing_resource("block records", &normalized_did, &error); 302 return Ok(AccountBlockingResult { items: Vec::new(), cursor: None }); 303 } 304 Err(error) => { 305 return Err(AppError::diagnostics( 306 "Couldn't load this account's block records.", 307 error, 308 )) 309 } 310 }; 311 let parsed: RepoListRecordsOutput = serde_json::from_value(output).map_err(|error| { 312 log::error!("failed to decode block listRecords output: {error}"); 313 AppError::validation("Lazurite couldn't read this account's block records.") 314 })?; 315 316 let subject_dids = parsed 317 .records 318 .iter() 319 .filter_map(|record| extract_subject_did(&record.value)) 320 .collect::<Vec<_>>(); 321 let actor_states = fetch_actor_states(&subject_dids).await?; 322 323 let items = parsed 324 .records 325 .into_iter() 326 .filter_map(|record| { 327 let subject_did = extract_subject_did(&record.value)?; 328 let actor_state = actor_states.get(&subject_did); 329 Some(AccountBlockingItem { 330 created_at: extract_created_at(&record.value), 331 availability: actor_state 332 .map(|state| state.availability) 333 .unwrap_or(ActorAvailability::Unavailable), 334 profile: actor_state.and_then(|state| state.profile.clone()), 335 uri: record.uri, 336 cid: record.cid, 337 unavailable_reason: actor_state.and_then(|state| state.unavailable_reason), 338 unavailable_message: actor_state.and_then(|state| state.unavailable_message.clone()), 339 subject_did, 340 value: record.value, 341 }) 342 }) 343 .collect(); 344 345 Ok(AccountBlockingResult { items, cursor: parsed.cursor }) 346} 347 348pub async fn get_account_starter_packs(did: String, state: &AppState) -> Result<AccountStarterPacksResult> { 349 let normalized_did = normalize_did(&did)?; 350 let client = constellation_client(state)?; 351 let count = match client 352 .get_backlinks_count(normalized_did.clone(), STARTER_PACK_SOURCE.to_string()) 353 .await 354 { 355 Ok(count) => count, 356 Err(error) if should_skip_missing_resource(&error) => { 357 log_missing_resource("starter packs", &normalized_did, &error); 358 return Ok(AccountStarterPacksResult { total: 0, starter_packs: Vec::new(), truncated: false }); 359 } 360 Err(error) => { 361 return Err(AppError::diagnostics( 362 "Couldn't load starter packs for this account.", 363 error, 364 )) 365 } 366 }; 367 368 let mut pack_uris = Vec::new(); 369 let mut cursor = None; 370 let mut truncated = false; 371 372 while pack_uris.len() < STARTER_PACK_MAX_ITEMS { 373 let response = match client 374 .get_backlinks( 375 normalized_did.clone(), 376 STARTER_PACK_SOURCE.to_string(), 377 Some(STARTER_PACK_LIMIT), 378 cursor.clone(), 379 ) 380 .await 381 { 382 Ok(response) => response, 383 Err(error) if should_skip_missing_resource(&error) => { 384 log_missing_resource("starter packs", &normalized_did, &error); 385 break; 386 } 387 Err(error) => { 388 return Err(AppError::diagnostics( 389 "Couldn't load starter packs for this account.", 390 error, 391 )) 392 } 393 }; 394 395 if response.records.is_empty() { 396 break; 397 } 398 399 for record in response.records { 400 if pack_uris.len() >= STARTER_PACK_MAX_ITEMS { 401 truncated = true; 402 break; 403 } 404 pack_uris.push(link_record_uri(&record)); 405 } 406 407 match response.cursor { 408 Some(next_cursor) if pack_uris.len() < STARTER_PACK_MAX_ITEMS => cursor = Some(next_cursor), 409 Some(_) => { 410 truncated = true; 411 break; 412 } 413 None => break, 414 } 415 } 416 417 let starter_packs = fetch_starter_packs(&dedupe_preserve_order(pack_uris)).await?; 418 Ok(AccountStarterPacksResult { total: count.total, starter_packs, truncated }) 419} 420 421pub async fn get_record_backlinks(uri: String, state: &AppState) -> Result<RecordBacklinksResult> { 422 let normalized_uri = normalize_at_uri(&uri)?; 423 let client = constellation_client(state)?; 424 425 let likes = fetch_backlink_group(&client, &normalized_uri, LIKES_SOURCE, false).await?; 426 let reposts = fetch_backlink_group(&client, &normalized_uri, REPOSTS_SOURCE, false).await?; 427 let replies = fetch_backlink_group(&client, &normalized_uri, REPLIES_SOURCE, false).await?; 428 let quotes = fetch_backlink_group(&client, &normalized_uri, QUOTES_SOURCE, true).await?; 429 430 Ok(RecordBacklinksResult { likes, reposts, replies, quotes }) 431} 432 433fn constellation_client(state: &AppState) -> Result<ConstellationClient> { 434 ConstellationClient::new(&settings::get_constellation_url(state)?) 435} 436 437fn public_client() -> PublicClient { 438 Agent::new(UnauthenticatedSession::new_public()) 439} 440 441fn normalize_did(input: &str) -> Result<String> { 442 let trimmed = input.trim(); 443 if trimmed.is_empty() { 444 return Err(AppError::validation("A DID is required.")); 445 } 446 447 Did::new(trimmed) 448 .map(|did| did.to_string()) 449 .map_err(|_| AppError::validation("Enter a valid DID.")) 450} 451 452fn normalize_at_uri(input: &str) -> Result<String> { 453 let trimmed = input.trim(); 454 if trimmed.is_empty() { 455 return Err(AppError::validation("A record URI is required.")); 456 } 457 458 AtUri::new(trimmed) 459 .map(|uri| uri.to_string()) 460 .map_err(|_| AppError::validation("Enter a valid AT-URI.")) 461} 462 463fn log_missing_resource(kind: &str, identifier: &str, error: impl std::fmt::Display) { 464 log::warn!("Skipping missing {kind} for {identifier}: {error}"); 465} 466 467fn should_skip_missing_resource(error: &impl std::fmt::Display) -> bool { 468 let message = error.to_string().to_ascii_lowercase(); 469 let mentions_missing = message.contains("not found") || message.contains("notfound"); 470 let mentions_resource = message.contains("list") 471 || message.contains("record") 472 || message.contains("repo") 473 || message.contains("profile") 474 || message.contains("starter pack") 475 || message.contains("starterpack"); 476 477 mentions_missing && mentions_resource 478} 479 480fn link_record_uri(record: &ConstellationLinkRecord) -> String { 481 format!("at://{}/{}/{}", record.did, record.collection, record.rkey) 482} 483 484fn dedupe_preserve_order(values: Vec<String>) -> Vec<String> { 485 let mut seen = BTreeSet::new(); 486 let mut deduped = Vec::new(); 487 488 for value in values { 489 if seen.insert(value.clone()) { 490 deduped.push(value); 491 } 492 } 493 494 deduped 495} 496 497fn did_identifier(did: &str) -> Result<AtIdentifier<'static>> { 498 Ok(AtIdentifier::Did(Did::new(did)?.into_static())) 499} 500 501#[derive(Debug, Clone)] 502struct ActorState { 503 availability: ActorAvailability, 504 profile: Option<Value>, 505 unavailable_reason: Option<ActorAvailabilityReason>, 506 unavailable_message: Option<String>, 507} 508 509async fn fetch_actor_states(dids: &[String]) -> Result<BTreeMap<String, ActorState>> { 510 let unique_dids = dedupe_preserve_order(dids.to_vec()); 511 if unique_dids.is_empty() { 512 return Ok(BTreeMap::new()); 513 } 514 515 let profiles = fetch_profiles_map(&unique_dids).await?; 516 let mut states = profiles 517 .into_iter() 518 .map(|(did, profile)| { 519 ( 520 did, 521 ActorState { 522 availability: ActorAvailability::Available, 523 profile: Some(profile), 524 unavailable_reason: None, 525 unavailable_message: None, 526 }, 527 ) 528 }) 529 .collect::<BTreeMap<_, _>>(); 530 531 for did in unique_dids { 532 if states.contains_key(&did) { 533 continue; 534 } 535 536 states.insert(did.clone(), fetch_missing_actor_state(&did).await); 537 } 538 539 Ok(states) 540} 541 542async fn fetch_profiles_map(dids: &[String]) -> Result<BTreeMap<String, Value>> { 543 let unique_dids = dedupe_preserve_order(dids.to_vec()); 544 if unique_dids.is_empty() { 545 return Ok(BTreeMap::new()); 546 } 547 548 let client = public_client(); 549 let mut profiles = BTreeMap::new(); 550 551 for chunk in unique_dids.chunks(PUBLIC_BATCH_LIMIT) { 552 let actors = chunk 553 .iter() 554 .filter_map(|did| match did_identifier(did) { 555 Ok(actor) => Some(actor), 556 Err(error) => { 557 log_missing_resource("profile", did, error); 558 None 559 } 560 }) 561 .collect::<Vec<_>>(); 562 if actors.is_empty() { 563 continue; 564 } 565 566 let output = match client.send(GetProfiles::new().actors(actors).build()).await { 567 Ok(output) => output, 568 Err(error) if should_skip_missing_resource(&error) => { 569 log_missing_resource("profiles", &chunk.join(","), error); 570 continue; 571 } 572 Err(error) => return Err(AppError::diagnostics("Couldn't load account profiles.", error)), 573 }; 574 let output = match output.into_output() { 575 Ok(output) => output.into_static(), 576 Err(error) if should_skip_missing_resource(&error) => { 577 log_missing_resource("profiles", &chunk.join(","), error); 578 continue; 579 } 580 Err(error) => return Err(AppError::diagnostics("Couldn't read account profiles.", error)), 581 }; 582 583 for profile in output.profiles { 584 profiles.insert(profile.did.to_string(), serde_json::to_value(profile)?); 585 } 586 } 587 588 Ok(profiles) 589} 590 591async fn fetch_missing_actor_state(did: &str) -> ActorState { 592 let actor = match did_identifier(did) { 593 Ok(actor) => actor, 594 Err(error) => { 595 log_missing_resource("profile", did, error); 596 return unavailable_actor_state(ActorAvailabilityReason::Unavailable); 597 } 598 }; 599 let client = public_client(); 600 601 let output = match client.send(GetProfile::new().actor(actor).build()).await { 602 Ok(output) => output, 603 Err(error) => { 604 log::warn!("failed to load missing actor profile for {did}: {error}"); 605 return actor_state_from_error(&error); 606 } 607 }; 608 609 match output.into_output() { 610 Ok(output) => match serde_json::to_value(output.value) { 611 Ok(profile) => ActorState { 612 availability: ActorAvailability::Available, 613 profile: Some(profile), 614 unavailable_reason: None, 615 unavailable_message: None, 616 }, 617 Err(error) => { 618 log::warn!("failed to serialize actor profile for {did}: {error}"); 619 unavailable_actor_state(ActorAvailabilityReason::Unavailable) 620 } 621 }, 622 Err(error) => { 623 log::warn!("failed to decode actor profile for {did}: {error}"); 624 actor_state_from_error(&error) 625 } 626 } 627} 628 629fn actor_state_from_error(error: &impl std::fmt::Display) -> ActorState { 630 unavailable_actor_state(classify_actor_unavailability(error).unwrap_or(ActorAvailabilityReason::Unavailable)) 631} 632 633fn unavailable_actor_state(reason: ActorAvailabilityReason) -> ActorState { 634 ActorState { 635 availability: ActorAvailability::Unavailable, 636 profile: None, 637 unavailable_reason: Some(reason), 638 unavailable_message: Some(actor_unavailable_message(reason).to_string()), 639 } 640} 641 642fn build_did_profile_item(did: String, actor_state: Option<&ActorState>) -> DidProfileItem { 643 DidProfileItem { 644 availability: actor_state 645 .map(|state| state.availability) 646 .unwrap_or(ActorAvailability::Unavailable), 647 did, 648 profile: actor_state.and_then(|state| state.profile.clone()), 649 unavailable_reason: actor_state.and_then(|state| state.unavailable_reason), 650 unavailable_message: actor_state.and_then(|state| state.unavailable_message.clone()), 651 } 652} 653 654async fn fetch_lists(list_uris: &[String]) -> Result<Vec<Value>> { 655 let client = public_client(); 656 let mut lists = Vec::new(); 657 658 for list_uri in list_uris { 659 let parsed_uri = match AtUri::new(list_uri) { 660 Ok(uri) => uri, 661 Err(error) => { 662 log_missing_resource("list", list_uri, error); 663 continue; 664 } 665 }; 666 let output = match client 667 .send(GetList::new().list(parsed_uri.into_static()).limit(1).build()) 668 .await 669 { 670 Ok(output) => output, 671 Err(error) if should_skip_missing_resource(&error) => { 672 log_missing_resource("list", list_uri, error); 673 continue; 674 } 675 Err(error) => return Err(AppError::diagnostics("Couldn't load one of the matching lists.", error)), 676 }; 677 let output = match output.into_output() { 678 Ok(output) => output.into_static(), 679 Err(error) if should_skip_missing_resource(&error) => { 680 log_missing_resource("list", list_uri, error); 681 continue; 682 } 683 Err(error) => return Err(AppError::diagnostics("Couldn't read one of the matching lists.", error)), 684 }; 685 lists.push(serde_json::to_value(output.list)?); 686 } 687 688 Ok(lists) 689} 690 691async fn fetch_starter_packs(uris: &[String]) -> Result<Vec<Value>> { 692 if uris.is_empty() { 693 return Ok(Vec::new()); 694 } 695 696 let client = public_client(); 697 let mut starter_packs = Vec::new(); 698 699 for uri in uris { 700 let parsed_uri = match AtUri::new(uri).map(IntoStatic::into_static) { 701 Ok(parsed_uri) => parsed_uri, 702 Err(error) => { 703 log_missing_resource("starter pack", uri, error); 704 continue; 705 } 706 }; 707 let output = match client.send(GetStarterPacks::new().uris(vec![parsed_uri]).build()).await { 708 Ok(output) => output, 709 Err(error) if should_skip_missing_resource(&error) => { 710 log_missing_resource("starter pack", uri, error); 711 continue; 712 } 713 Err(error) => { 714 return Err(AppError::diagnostics( 715 "Couldn't load starter packs for this account.", 716 error, 717 )) 718 } 719 }; 720 let output = match output.into_output() { 721 Ok(output) => output.into_static(), 722 Err(error) if should_skip_missing_resource(&error) => { 723 log_missing_resource("starter pack", uri, error); 724 continue; 725 } 726 Err(error) => return Err(AppError::diagnostics("Couldn't read starter pack details.", error)), 727 }; 728 729 for starter_pack in output.starter_packs { 730 starter_packs.push(serde_json::to_value(starter_pack)?); 731 } 732 } 733 734 Ok(starter_packs) 735} 736 737async fn fetch_backlink_group( 738 client: &ConstellationClient, subject: &str, source: &str, include_record_value: bool, 739) -> Result<BacklinkGroup> { 740 let response = client 741 .get_backlinks( 742 subject.to_string(), 743 source.to_string(), 744 Some(BACKLINK_PREVIEW_LIMIT), 745 None, 746 ) 747 .await 748 .map_err(|error| AppError::diagnostics("Couldn't load record backlinks right now.", error))?; 749 750 build_backlink_group(response, include_record_value).await 751} 752 753async fn build_backlink_group(response: BacklinksResponse, include_record_value: bool) -> Result<BacklinkGroup> { 754 let dids = response 755 .records 756 .iter() 757 .map(|record| record.did.clone()) 758 .collect::<Vec<_>>(); 759 let profiles = fetch_profiles_lookup(&dids).await?; 760 let values = if include_record_value { 761 fetch_backlink_record_values(&response.records).await 762 } else { 763 HashMap::new() 764 }; 765 766 let records = response 767 .records 768 .into_iter() 769 .map(|record| { 770 let uri = link_record_uri(&record); 771 BacklinkRecordItem { 772 value: values.get(&uri).cloned(), 773 profile: profiles.get(&record.did).cloned(), 774 did: record.did, 775 collection: record.collection, 776 rkey: record.rkey, 777 uri, 778 } 779 }) 780 .collect(); 781 782 Ok(BacklinkGroup { total: response.total, records, cursor: response.cursor }) 783} 784 785async fn fetch_backlink_record_values(records: &[ConstellationLinkRecord]) -> HashMap<String, Value> { 786 let mut values = HashMap::with_capacity(records.len()); 787 788 for record in records { 789 let uri = link_record_uri(record); 790 match explorer::get_record(record.did.clone(), record.collection.clone(), record.rkey.clone()).await { 791 Ok(payload) => { 792 if let Some(value) = extract_backlink_record_value(payload) { 793 values.insert(uri, value); 794 } 795 } 796 Err(error) => { 797 log::warn!("failed to load backlink record payload for {uri}: {error}"); 798 } 799 } 800 } 801 802 values 803} 804 805fn extract_backlink_record_value(payload: Value) -> Option<Value> { 806 match payload { 807 Value::Object(mut object) => object.remove("value").or(Some(Value::Object(object))), 808 _ => None, 809 } 810} 811 812async fn fetch_profiles_lookup(dids: &[String]) -> Result<HashMap<String, Value>> { 813 Ok(fetch_profiles_map(dids).await?.into_iter().collect()) 814} 815 816fn extract_blocker_dids(records: &[ConstellationLinkRecord]) -> Vec<String> { 817 dedupe_preserve_order(records.iter().map(|record| record.did.clone()).collect()) 818} 819 820async fn confirm_blocked_by(actor_did: &str, candidate_dids: &[String]) -> Result<Vec<String>> { 821 if candidate_dids.is_empty() { 822 return Ok(Vec::new()); 823 } 824 825 let actor = did_identifier(actor_did)?; 826 let client = public_client(); 827 let mut confirmed = BTreeSet::new(); 828 829 for chunk in candidate_dids.chunks(PUBLIC_BATCH_LIMIT) { 830 let others = chunk 831 .iter() 832 .filter_map(|did| match did_identifier(did) { 833 Ok(actor) => Some(actor), 834 Err(error) => { 835 log_missing_resource("relationship", did, error); 836 None 837 } 838 }) 839 .collect::<Vec<_>>(); 840 if others.is_empty() { 841 continue; 842 } 843 844 let output = client 845 .send(GetRelationships::new().actor(actor.clone()).others(others).build()) 846 .await 847 .map_err(|error| AppError::diagnostics("Couldn't confirm who blocks this profile.", error))? 848 .into_output() 849 .map_err(|error| AppError::diagnostics("Couldn't read who blocks this profile.", error))? 850 .into_static(); 851 852 for did in extract_confirmed_blocked_by_dids(&output.relationships) { 853 confirmed.insert(did); 854 } 855 } 856 857 Ok(candidate_dids 858 .iter() 859 .filter(|did| confirmed.contains(did.as_str())) 860 .cloned() 861 .collect()) 862} 863 864fn extract_confirmed_blocked_by_dids(relationships: &[GetRelationshipsOutputRelationshipsItem<'_>]) -> Vec<String> { 865 relationships 866 .iter() 867 .filter_map(|relationship| match relationship { 868 GetRelationshipsOutputRelationshipsItem::Relationship(relationship) 869 if relationship.blocked_by.is_some() => 870 { 871 Some(relationship.did.to_string()) 872 } 873 _ => None, 874 }) 875 .collect() 876} 877 878fn extract_subject_did(value: &Value) -> Option<String> { 879 value.get("subject").and_then(Value::as_str).map(str::to_string) 880} 881 882fn extract_created_at(value: &Value) -> Option<String> { 883 value.get("createdAt").and_then(Value::as_str).map(str::to_string) 884} 885 886#[cfg(test)] 887mod tests { 888 use super::{ 889 dedupe_preserve_order, extract_backlink_record_value, extract_blocker_dids, extract_confirmed_blocked_by_dids, 890 extract_created_at, extract_subject_did, should_skip_missing_resource, 891 }; 892 use crate::constellation::ConstellationLinkRecord; 893 use jacquard::api::app_bsky::graph::{get_relationships::GetRelationshipsOutputRelationshipsItem, Relationship}; 894 use jacquard::types::{aturi::AtUri, did::Did}; 895 use serde_json::json; 896 897 #[test] 898 fn dedupe_preserve_order_keeps_first_occurrence() { 899 let values = vec!["at://one".to_string(), "at://two".to_string(), "at://one".to_string()]; 900 901 assert_eq!( 902 dedupe_preserve_order(values), 903 vec!["at://one".to_string(), "at://two".to_string()] 904 ); 905 } 906 907 #[test] 908 fn extract_subject_and_created_at_from_block_value() { 909 let value = json!({ 910 "subject": "did:plc:blocked", 911 "createdAt": "2025-01-01T00:00:00Z" 912 }); 913 914 assert_eq!(extract_subject_did(&value).as_deref(), Some("did:plc:blocked")); 915 assert_eq!(extract_created_at(&value).as_deref(), Some("2025-01-01T00:00:00Z")); 916 } 917 918 #[test] 919 fn treats_missing_list_errors_as_skippable() { 920 assert!(should_skip_missing_resource( 921 &"XRPC error: Object(Object({\"error\":\"InvalidRequest\",\"message\":\"List not found\"}))" 922 )); 923 assert!(should_skip_missing_resource(&"repo not found")); 924 assert!(!should_skip_missing_resource(&"rate limit exceeded")); 925 } 926 927 #[test] 928 fn extract_blocker_dids_preserves_order_and_dedupes() { 929 let records = vec![ 930 ConstellationLinkRecord { 931 did: "did:plc:one".to_string(), 932 collection: "app.bsky.graph.block".to_string(), 933 rkey: "1".to_string(), 934 }, 935 ConstellationLinkRecord { 936 did: "did:plc:two".to_string(), 937 collection: "app.bsky.graph.block".to_string(), 938 rkey: "2".to_string(), 939 }, 940 ConstellationLinkRecord { 941 did: "did:plc:one".to_string(), 942 collection: "app.bsky.graph.block".to_string(), 943 rkey: "3".to_string(), 944 }, 945 ]; 946 947 assert_eq!( 948 extract_blocker_dids(&records), 949 vec!["did:plc:one".to_string(), "did:plc:two".to_string()] 950 ); 951 } 952 953 #[test] 954 fn extracts_only_confirmed_blocked_by_relationships() { 955 let relationships = vec![ 956 GetRelationshipsOutputRelationshipsItem::Relationship(Box::new( 957 Relationship::new() 958 .did(Did::new("did:plc:one").expect("did should parse")) 959 .blocked_by(AtUri::new("at://did:plc:one/app.bsky.graph.block/1").expect("uri should parse")) 960 .build(), 961 )), 962 GetRelationshipsOutputRelationshipsItem::Relationship(Box::new( 963 Relationship::new() 964 .did(Did::new("did:plc:two").expect("did should parse")) 965 .build(), 966 )), 967 ]; 968 969 assert_eq!( 970 extract_confirmed_blocked_by_dids(&relationships), 971 vec!["did:plc:one".to_string()] 972 ); 973 } 974 975 #[test] 976 fn extracts_backlink_record_value_field() { 977 let payload = json!({ 978 "uri": "at://did:plc:alice/app.bsky.feed.post/1", 979 "value": { "text": "quoted body" } 980 }); 981 982 assert_eq!( 983 extract_backlink_record_value(payload), 984 Some(json!({ "text": "quoted body" })) 985 ); 986 } 987 988 #[test] 989 fn keeps_object_payload_when_value_field_missing() { 990 let payload = json!({ "text": "direct record body" }); 991 992 assert_eq!( 993 extract_backlink_record_value(payload), 994 Some(json!({ "text": "direct record body" })) 995 ); 996 } 997}