BlueSky & more on desktop lazurite.stormlightlabs.org/
tauri rust typescript bluesky appview atproto solid
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 2038 lines 73 kB view raw
1use super::actors::{ 2 actor_unavailable_message, classify_actor_unavailability, requested_actor_hints, ActorAvailabilityReason, 3}; 4use super::auth::LazuriteOAuthSession; 5use super::error::{AppError, Result}; 6use super::state::AppState; 7use jacquard::api::app_bsky::actor::get_preferences::GetPreferences; 8use jacquard::api::app_bsky::actor::get_profile::GetProfile; 9use jacquard::api::app_bsky::actor::get_profiles::GetProfiles; 10use jacquard::api::app_bsky::actor::put_preferences::PutPreferences; 11use jacquard::api::app_bsky::actor::{ 12 FeedViewPref, PreferencesItem, ProfileViewDetailed, SavedFeed, SavedFeedType, SavedFeedsPrefV2, 13 SavedFeedsPrefV2Builder, 14}; 15use jacquard::api::app_bsky::bookmark::create_bookmark::CreateBookmark; 16use jacquard::api::app_bsky::bookmark::delete_bookmark::DeleteBookmark; 17use jacquard::api::app_bsky::embed::record::Record; 18use jacquard::api::app_bsky::feed::get_actor_likes::GetActorLikes; 19use jacquard::api::app_bsky::feed::get_author_feed::GetAuthorFeed; 20use jacquard::api::app_bsky::feed::get_feed::GetFeed; 21use jacquard::api::app_bsky::feed::get_feed_generator::GetFeedGenerator; 22use jacquard::api::app_bsky::feed::get_feed_generators::GetFeedGenerators; 23use jacquard::api::app_bsky::feed::get_list_feed::GetListFeed; 24use jacquard::api::app_bsky::feed::get_post_thread::GetPostThread; 25use jacquard::api::app_bsky::feed::get_timeline::GetTimeline; 26use jacquard::api::app_bsky::feed::like::Like; 27use jacquard::api::app_bsky::feed::post::{Post, PostEmbed, ReplyRef}; 28use jacquard::api::app_bsky::feed::repost::Repost; 29use jacquard::api::app_bsky::feed::GeneratorView; 30use jacquard::api::app_bsky::graph::block::Block; 31use jacquard::api::app_bsky::graph::follow::Follow; 32use jacquard::api::app_bsky::graph::get_followers::GetFollowers; 33use jacquard::api::app_bsky::graph::get_follows::GetFollows; 34use jacquard::api::com_atproto::label::Label; 35use jacquard::api::com_atproto::repo::apply_writes::{ 36 ApplyWrites, ApplyWritesOutput, ApplyWritesOutputResultsItem, ApplyWritesWritesItem, Delete, 37}; 38use jacquard::api::com_atproto::repo::create_record::CreateRecord; 39use jacquard::api::com_atproto::repo::delete_record::DeleteRecord; 40use jacquard::api::com_atproto::repo::list_records::{ListRecords, ListRecordsOutput, Record as RepoListRecord}; 41use jacquard::api::com_atproto::repo::strong_ref::StrongRef; 42use jacquard::identity::{resolver::IdentityResolver, JacquardResolver}; 43use jacquard::richtext; 44use jacquard::types::aturi::AtUri; 45use jacquard::types::cid::Cid; 46use jacquard::types::datetime::Datetime; 47use jacquard::types::did::Did; 48use jacquard::types::handle::Handle; 49use jacquard::types::ident::AtIdentifier; 50use jacquard::types::nsid::Nsid; 51use jacquard::types::recordkey::RecordKey; 52use jacquard::types::value::Data; 53use jacquard::xrpc::XrpcClient; 54use jacquard::IntoStatic; 55use reqwest::StatusCode; 56use serde::{Deserialize, Serialize}; 57use std::collections::{HashMap, HashSet}; 58use std::sync::Arc; 59use std::time::Duration; 60use tauri::{AppHandle, Emitter}; 61use tauri_plugin_log::log; 62use tokio::sync::Semaphore; 63use tokio::task::JoinSet; 64use tokio::time::sleep; 65 66const FEED_GENERATOR_BATCH_SIZE: usize = 10; 67 68const FOLLOW_COLLECTION_NSID: &str = "app.bsky.graph.follow"; 69const FOLLOW_HYGIENE_PROGRESS_EVENT: &str = "follow-hygiene:progress"; 70const FOLLOW_AUDIT_PAGE_LIMIT: i64 = 100; 71const FOLLOW_AUDIT_PROFILE_BATCH_SIZE: usize = 25; 72const FOLLOW_AUDIT_PROFILE_BATCH_CONCURRENCY: usize = 3; 73const FOLLOW_AUDIT_INTER_BATCH_DELAY: Duration = Duration::from_millis(250); 74const FOLLOW_AUDIT_RETRY_AFTER_DEFAULT: Duration = Duration::from_secs(2); 75const FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES: usize = 5; 76const FOLLOW_UNFOLLOW_WRITE_CHUNK_SIZE: usize = 200; 77 78const FOLLOW_STATUS_DELETED: u8 = 1 << 0; 79const FOLLOW_STATUS_DEACTIVATED: u8 = 1 << 1; 80const FOLLOW_STATUS_SUSPENDED: u8 = 1 << 2; 81const FOLLOW_STATUS_BLOCKED_BY: u8 = 1 << 3; 82const FOLLOW_STATUS_BLOCKING: u8 = 1 << 4; 83const FOLLOW_STATUS_HIDDEN: u8 = 1 << 5; 84const FOLLOW_STATUS_SELF_FOLLOW: u8 = 1 << 6; 85 86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 87#[serde(tag = "status", rename_all = "camelCase")] 88pub enum ProfileLookupResult { 89 Available { 90 profile: serde_json::Value, 91 }, 92 Unavailable { 93 requested_actor: String, 94 did: Option<String>, 95 handle: Option<String>, 96 reason: ActorAvailabilityReason, 97 message: String, 98 }, 99} 100 101async fn get_session(state: &AppState) -> Result<Arc<LazuriteOAuthSession>> { 102 let did = state 103 .active_session 104 .read() 105 .map_err(|error| { 106 log::error!("active_session poisoned: {error}"); 107 AppError::StatePoisoned("active_session") 108 })? 109 .as_ref() 110 .ok_or_else(|| { 111 log::error!("no active account"); 112 AppError::Validation("no active account".into()) 113 })? 114 .did 115 .clone(); 116 117 state 118 .sessions 119 .read() 120 .map_err(|error| AppError::state_poisoned(format!("sessions {error}")))? 121 .get(&did) 122 .cloned() 123 .ok_or_else(|| AppError::validation(format!("session not found for active account {did}"))) 124} 125 126fn active_did(state: &AppState) -> Result<String> { 127 state 128 .active_session 129 .read() 130 .map_err(|error| AppError::state_poisoned(format!("active_session poisoned with error {error}")))? 131 .as_ref() 132 .ok_or_else(|| AppError::Validation("no active account".into())) 133 .map(|s| s.did.clone()) 134} 135 136fn parse_actor_identifier(actor: &str) -> Result<AtIdentifier<'static>> { 137 let trimmed = actor.trim(); 138 if trimmed.is_empty() { 139 return Err(AppError::validation("actor must not be empty")); 140 } 141 142 if let Ok(did) = Did::new(trimmed) { 143 return Ok(AtIdentifier::Did(did.into_static())); 144 } 145 146 let normalized_handle = trimmed.trim_start_matches('@'); 147 if let Ok(handle) = Handle::new(normalized_handle) { 148 return Ok(AtIdentifier::Handle(handle.into_static())); 149 } 150 151 Err(AppError::validation("actor must be a valid DID or handle")) 152} 153 154#[derive(Debug, Serialize, Deserialize)] 155#[serde(rename_all = "camelCase")] 156pub struct SavedFeedItem { 157 pub id: String, 158 pub r#type: String, 159 pub value: String, 160 pub pinned: bool, 161} 162 163#[derive(Debug, Serialize, Deserialize)] 164#[serde(rename_all = "camelCase")] 165pub struct FeedViewPrefItem { 166 pub feed: String, 167 pub hide_replies: bool, 168 pub hide_replies_by_unfollowed: bool, 169 pub hide_replies_by_like_count: Option<i64>, 170 pub hide_reposts: bool, 171 pub hide_quote_posts: bool, 172} 173 174#[derive(Debug, Serialize, Deserialize)] 175#[serde(rename_all = "camelCase")] 176pub struct UserPreferences { 177 pub saved_feeds: Vec<SavedFeedItem>, 178 pub feed_view_prefs: Vec<FeedViewPrefItem>, 179} 180 181type StoredPreferences = Vec<PreferencesItem<'static>>; 182 183fn extract_saved_feeds(pref: &SavedFeedsPrefV2<'_>) -> Vec<SavedFeedItem> { 184 pref.items 185 .iter() 186 .map(|f| SavedFeedItem { 187 id: f.id.to_string(), 188 r#type: match &f.r#type { 189 SavedFeedType::Timeline => "timeline".into(), 190 SavedFeedType::Feed => "feed".into(), 191 SavedFeedType::List => "list".into(), 192 SavedFeedType::Other(s) => s.to_string(), 193 }, 194 value: f.value.to_string(), 195 pinned: f.pinned, 196 }) 197 .collect() 198} 199 200fn extract_feed_view_pref(pref: &FeedViewPref<'_>) -> FeedViewPrefItem { 201 FeedViewPrefItem { 202 feed: pref.feed.to_string(), 203 hide_replies: pref.hide_replies.unwrap_or(false), 204 hide_replies_by_unfollowed: pref.hide_replies_by_unfollowed.unwrap_or(true), 205 hide_replies_by_like_count: pref.hide_replies_by_like_count, 206 hide_reposts: pref.hide_reposts.unwrap_or(false), 207 hide_quote_posts: pref.hide_quote_posts.unwrap_or(false), 208 } 209} 210 211fn user_preferences_from_items(items: &[PreferencesItem<'_>]) -> UserPreferences { 212 let mut saved_feeds = Vec::new(); 213 let mut feed_view_prefs = Vec::new(); 214 215 for item in items { 216 match item { 217 PreferencesItem::SavedFeedsPrefV2(pref) => saved_feeds = extract_saved_feeds(pref), 218 PreferencesItem::FeedViewPref(pref) => feed_view_prefs.push(extract_feed_view_pref(pref)), 219 _ => (), 220 } 221 } 222 223 UserPreferences { saved_feeds, feed_view_prefs } 224} 225 226async fn fetch_preference_items(state: &AppState) -> Result<StoredPreferences> { 227 let session = get_session(state).await?; 228 fetch_preference_items_with_session(&session).await 229} 230 231async fn fetch_preference_items_with_session(session: &Arc<LazuriteOAuthSession>) -> Result<StoredPreferences> { 232 let output = session 233 .send(GetPreferences) 234 .await 235 .map_err(|error| { 236 log::error!("fetch Preferences error: {error}"); 237 AppError::validation("fetch Preferences error") 238 })? 239 .into_output() 240 .map_err(|error| { 241 log::error!("fetch Preferences output error: {error}"); 242 AppError::validation("fetch Preferences output error") 243 })?; 244 245 Ok(output.preferences.into_iter().map(IntoStatic::into_static).collect()) 246} 247 248fn accepts_empty_put_preferences_response(status: reqwest::StatusCode, body: &[u8]) -> bool { 249 status.is_success() && body.is_empty() 250} 251 252fn accepts_empty_bookmark_response(status: reqwest::StatusCode, body: &[u8]) -> bool { 253 status.is_success() && body.is_empty() 254} 255 256async fn store_preference_items(session: &Arc<LazuriteOAuthSession>, items: StoredPreferences) -> Result<()> { 257 let response = session 258 .send(PutPreferences::new().preferences(items).build()) 259 .await 260 .map_err(|error| { 261 log::error!("putPreferences error: {error}"); 262 AppError::validation("putPreferences error") 263 })?; 264 265 if accepts_empty_put_preferences_response(response.status(), response.buffer()) { 266 return Ok(()); 267 } 268 269 response.into_output().map_err(|error| { 270 log::error!("putPreferences output error: {error}"); 271 AppError::validation("putPreferences output error") 272 })?; 273 274 Ok(()) 275} 276 277fn build_saved_feeds_preference_item(feeds: Vec<SavedFeedItem>) -> PreferencesItem<'static> { 278 let items = feeds 279 .into_iter() 280 .map(|feed| { 281 SavedFeed::new() 282 .id(feed.id) 283 .r#type(match feed.r#type.as_str() { 284 "timeline" => SavedFeedType::Timeline, 285 "feed" => SavedFeedType::Feed, 286 "list" => SavedFeedType::List, 287 _ => SavedFeedType::Other(feed.r#type.into()), 288 }) 289 .value(feed.value) 290 .pinned(feed.pinned) 291 .build() 292 }) 293 .collect::<Vec<_>>(); 294 295 PreferencesItem::SavedFeedsPrefV2(Box::new(SavedFeedsPrefV2Builder::new().items(items).build())) 296} 297 298fn build_feed_view_pref_item(pref: FeedViewPrefItem) -> PreferencesItem<'static> { 299 PreferencesItem::FeedViewPref(Box::new(FeedViewPref { 300 feed: pref.feed.into(), 301 hide_quote_posts: Some(pref.hide_quote_posts), 302 hide_replies: Some(pref.hide_replies), 303 hide_replies_by_like_count: pref.hide_replies_by_like_count, 304 hide_replies_by_unfollowed: Some(pref.hide_replies_by_unfollowed), 305 hide_reposts: Some(pref.hide_reposts), 306 extra_data: Default::default(), 307 })) 308} 309 310fn merge_saved_feeds_preferences(preferences: StoredPreferences, feeds: Vec<SavedFeedItem>) -> StoredPreferences { 311 let mut merged = preferences 312 .into_iter() 313 .filter(|item| { 314 !matches!( 315 item, 316 PreferencesItem::SavedFeedsPref(_) | PreferencesItem::SavedFeedsPrefV2(_) 317 ) 318 }) 319 .collect::<Vec<_>>(); 320 merged.push(build_saved_feeds_preference_item(feeds)); 321 merged 322} 323 324fn merge_feed_view_preferences(preferences: StoredPreferences, pref: FeedViewPrefItem) -> StoredPreferences { 325 let feed = pref.feed.clone(); 326 let mut merged = preferences 327 .into_iter() 328 .filter(|item| match item { 329 PreferencesItem::FeedViewPref(existing) => existing.feed.as_ref() != feed.as_str(), 330 _ => true, 331 }) 332 .collect::<Vec<_>>(); 333 merged.push(build_feed_view_pref_item(pref)); 334 merged 335} 336 337#[derive(Debug, Deserialize)] 338#[serde(rename_all = "camelCase")] 339pub struct StrongRefInput { 340 pub uri: String, 341 pub cid: String, 342} 343 344#[derive(Debug, Deserialize)] 345#[serde(rename_all = "camelCase")] 346pub struct ReplyRefInput { 347 pub parent: StrongRefInput, 348 pub root: StrongRefInput, 349} 350 351#[derive(Debug, Deserialize)] 352#[serde(tag = "type", rename_all = "camelCase")] 353pub enum EmbedInput { 354 Record { record: StrongRefInput }, 355} 356 357#[derive(Debug, Deserialize, Serialize)] 358#[serde(rename_all = "camelCase")] 359pub struct CreateRecordResult { 360 pub uri: String, 361 pub cid: String, 362} 363 364#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 365#[serde(rename_all = "camelCase")] 366pub struct FlaggedFollow { 367 pub did: String, 368 pub handle: String, 369 pub follow_uri: String, 370 pub status: u8, 371 pub status_label: String, 372} 373 374#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 375#[serde(rename_all = "camelCase")] 376pub struct BatchResult { 377 pub deleted: usize, 378 pub failed: Vec<String>, 379} 380 381#[derive(Debug, Clone, Serialize)] 382#[serde(rename_all = "camelCase")] 383struct FollowHygieneProgress { 384 batch_size: usize, 385 current: usize, 386 total: usize, 387} 388 389#[derive(Debug, Clone, PartialEq, Eq)] 390struct FollowRecordEntry { 391 did: String, 392 follow_uri: String, 393} 394 395#[derive(Debug, Clone, PartialEq, Eq)] 396struct FollowStatusInfo { 397 handle: String, 398 status: u8, 399} 400 401#[derive(Debug, Clone, PartialEq, Eq)] 402struct FollowDeleteTarget { 403 uri: String, 404 rkey: String, 405} 406 407pub async fn get_preferences(state: &AppState) -> Result<UserPreferences> { 408 let preferences = fetch_preference_items(state).await?; 409 Ok(user_preferences_from_items(&preferences)) 410} 411 412pub async fn get_feed_generators(uris: Vec<String>, state: &AppState) -> Result<serde_json::Value> { 413 if uris.is_empty() { 414 return Ok(serde_json::json!({ "feeds": [] })); 415 } 416 417 let session = get_session(state).await?; 418 let feeds = uris 419 .iter() 420 .filter_map(|uri| { 421 AtUri::new(uri) 422 .map(IntoStatic::into_static) 423 .map_err(|error| { 424 log::warn!("skipping invalid feed URI in get_feed_generators input ({uri}): {error}"); 425 error 426 }) 427 .ok() 428 }) 429 .collect::<Vec<AtUri<'static>>>(); 430 431 if feeds.is_empty() { 432 return Ok(serde_json::json!({ "feeds": [] })); 433 } 434 435 let mut collected = Vec::<GeneratorView<'static>>::new(); 436 let mut seen = HashSet::<String>::new(); 437 438 for chunk in feeds.chunks(FEED_GENERATOR_BATCH_SIZE) { 439 let request = GetFeedGenerators::new().feeds(chunk.to_vec()).build(); 440 match session.send(request).await { 441 Ok(response) => match response.into_output() { 442 Ok(output) => { 443 for feed in output.feeds.into_iter().map(IntoStatic::into_static) { 444 let uri = feed.uri.as_ref().to_string(); 445 if seen.insert(uri) { 446 collected.push(feed); 447 } 448 } 449 } 450 Err(error) => { 451 log::info!( 452 "getFeedGenerators decode failed for batch ({} feeds); trying per-feed fallback: {error}", 453 chunk.len() 454 ); 455 let recovered = 456 fetch_feed_generators_individually(chunk, session.as_ref(), &mut collected, &mut seen).await; 457 if recovered == 0 { 458 log::warn!( 459 "getFeedGenerators fallback failed for batch ({} feeds); no generator metadata recovered", 460 chunk.len() 461 ); 462 } 463 } 464 }, 465 Err(error) => { 466 log::info!( 467 "getFeedGenerators request failed for batch ({} feeds); trying per-feed fallback: {error}", 468 chunk.len() 469 ); 470 let recovered = 471 fetch_feed_generators_individually(chunk, session.as_ref(), &mut collected, &mut seen).await; 472 if recovered == 0 { 473 log::warn!( 474 "getFeedGenerators fallback request failed for batch ({} feeds); no generator metadata recovered", 475 chunk.len() 476 ); 477 } 478 } 479 } 480 } 481 482 Ok(serde_json::json!({ "feeds": collected })) 483} 484 485async fn fetch_feed_generators_individually( 486 feeds: &[AtUri<'static>], session: &LazuriteOAuthSession, out: &mut Vec<GeneratorView<'static>>, 487 seen: &mut HashSet<String>, 488) -> usize { 489 let mut recovered = 0; 490 for feed in feeds { 491 let request = GetFeedGenerator::new().feed(feed.clone()).build(); 492 let response = match session.send(request).await { 493 Ok(response) => response, 494 Err(error) => { 495 log::debug!("getFeedGenerator request failed for {}: {error}", feed.as_ref()); 496 continue; 497 } 498 }; 499 500 let output = match response.into_output() { 501 Ok(output) => output, 502 Err(error) => { 503 log::debug!("getFeedGenerator decode failed for {}: {error}", feed.as_ref()); 504 continue; 505 } 506 }; 507 508 let view = output.view.into_static(); 509 let uri = view.uri.as_ref().to_string(); 510 if seen.insert(uri) { 511 out.push(view); 512 recovered += 1; 513 } 514 } 515 516 recovered 517} 518 519pub async fn get_timeline(cursor: Option<String>, limit: u32, state: &AppState) -> Result<serde_json::Value> { 520 let session = get_session(state).await?; 521 let mut req = GetTimeline::new().limit(limit as i64); 522 if let Some(c) = &cursor { 523 req = req.cursor(Some(c.into())); 524 } 525 526 let output = session 527 .send(req.build()) 528 .await 529 .map_err(|error| { 530 log::error!("getTimeline error: {error}"); 531 AppError::validation("getTimeline") 532 })? 533 .into_output() 534 .map_err(|error| { 535 log::error!("getTimeline output error: {error}"); 536 AppError::validation("getTimeline output") 537 })?; 538 539 serde_json::to_value(&output).map_err(AppError::from) 540} 541 542pub async fn get_feed(uri: String, cursor: Option<String>, limit: u32, state: &AppState) -> Result<serde_json::Value> { 543 let session = get_session(state).await?; 544 let feed_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid feed URI"))?; 545 let mut req = GetFeed::new().feed(feed_uri).limit(limit as i64); 546 if let Some(c) = &cursor { 547 req = req.cursor(Some(c.into())); 548 } 549 550 let output = session 551 .send(req.build()) 552 .await 553 .map_err(|error| { 554 log::error!("getFeed error: {error}"); 555 AppError::validation("getFeed") 556 })? 557 .into_output() 558 .map_err(|error| { 559 log::error!("getFeed output error: {error}"); 560 AppError::validation("getFeed output") 561 })?; 562 563 serde_json::to_value(&output).map_err(AppError::from) 564} 565 566pub async fn get_list_feed( 567 uri: String, cursor: Option<String>, limit: u32, state: &AppState, 568) -> Result<serde_json::Value> { 569 let session = get_session(state).await?; 570 let list_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid list URI"))?; 571 let mut req = GetListFeed::new().list(list_uri).limit(limit as i64); 572 if let Some(c) = &cursor { 573 req = req.cursor(Some(c.into())); 574 } 575 576 let output = session 577 .send(req.build()) 578 .await 579 .map_err(|error| { 580 log::error!("getListFeed error: {error}"); 581 AppError::validation("getListFeed") 582 })? 583 .into_output() 584 .map_err(|error| { 585 log::error!("getListFeed output error: {error}"); 586 AppError::validation("getListFeed output") 587 })?; 588 589 serde_json::to_value(&output).map_err(AppError::from) 590} 591 592pub async fn get_post_thread(uri: String, state: &AppState) -> Result<serde_json::Value> { 593 let session = get_session(state).await?; 594 let post_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?; 595 596 let output = session 597 .send(GetPostThread::new().uri(post_uri).build()) 598 .await 599 .map_err(|error| { 600 log::error!("getPostThread error: {error}"); 601 AppError::validation("getPostThread") 602 })? 603 .into_output() 604 .map_err(|error| { 605 log::error!("getPostThread output error: {error}"); 606 AppError::validation("getPostThread output") 607 })?; 608 609 serde_json::to_value(&output).map_err(AppError::from) 610} 611 612pub async fn get_profile(actor: String, state: &AppState) -> Result<serde_json::Value> { 613 let session = get_session(state).await?; 614 let requested_actor = actor.trim().to_string(); 615 let (did, handle) = requested_actor_hints(&requested_actor); 616 let actor = parse_actor_identifier(&actor)?; 617 618 let output = match session.send(GetProfile::new().actor(actor).build()).await { 619 Ok(output) => output, 620 Err(error) => { 621 log::error!("getProfile error: {error}"); 622 if let Some(reason) = classify_actor_unavailability(&error) { 623 return serde_json::to_value(ProfileLookupResult::Unavailable { 624 requested_actor, 625 did, 626 handle, 627 reason, 628 message: actor_unavailable_message(reason).to_string(), 629 }) 630 .map_err(AppError::from); 631 } 632 633 return Err(AppError::validation("Couldn't load this profile right now.")); 634 } 635 }; 636 let output = match output.into_output() { 637 Ok(output) => output, 638 Err(error) => { 639 log::error!("getProfile output error: {error}"); 640 if let Some(reason) = classify_actor_unavailability(&error) { 641 return serde_json::to_value(ProfileLookupResult::Unavailable { 642 requested_actor, 643 did, 644 handle, 645 reason, 646 message: actor_unavailable_message(reason).to_string(), 647 }) 648 .map_err(AppError::from); 649 } 650 651 return Err(AppError::validation("Couldn't load this profile right now.")); 652 } 653 }; 654 655 serde_json::to_value(ProfileLookupResult::Available { profile: serde_json::to_value(output.value)? }) 656 .map_err(AppError::from) 657} 658 659pub async fn get_author_feed( 660 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState, 661) -> Result<serde_json::Value> { 662 let session = get_session(state).await?; 663 let actor = parse_actor_identifier(&actor)?; 664 let mut req = GetAuthorFeed::new().actor(actor).limit(limit.map(|value| value as i64)); 665 if let Some(c) = &cursor { 666 req = req.cursor(Some(c.as_str().into())); 667 } 668 669 let output = session 670 .send(req.build()) 671 .await 672 .map_err(|error| { 673 log::error!("getAuthorFeed error: {error}"); 674 AppError::validation("getAuthorFeed") 675 })? 676 .into_output() 677 .map_err(|error| { 678 log::error!("getAuthorFeed output error: {error}"); 679 AppError::validation("getAuthorFeed output") 680 })?; 681 682 serde_json::to_value(&output).map_err(AppError::from) 683} 684 685pub async fn get_actor_likes( 686 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState, 687) -> Result<serde_json::Value> { 688 let session = get_session(state).await?; 689 let actor = parse_actor_identifier(&actor)?; 690 let mut req = GetActorLikes::new().actor(actor).limit(limit.map(|value| value as i64)); 691 if let Some(c) = &cursor { 692 req = req.cursor(Some(c.as_str().into())); 693 } 694 695 let output = session 696 .send(req.build()) 697 .await 698 .map_err(|error| { 699 log::error!("getActorLikes error: {error}"); 700 AppError::validation("getActorLikes") 701 })? 702 .into_output() 703 .map_err(|error| { 704 log::error!("getActorLikes output error: {error}"); 705 AppError::validation("getActorLikes output") 706 })?; 707 708 serde_json::to_value(&output).map_err(AppError::from) 709} 710 711pub async fn create_post( 712 text: String, reply_to: Option<ReplyRefInput>, embed: Option<EmbedInput>, state: &AppState, 713) -> Result<CreateRecordResult> { 714 if text.trim().is_empty() && embed.is_none() { 715 return Err(AppError::validation("post requires text or embed")); 716 } 717 718 let session = get_session(state).await?; 719 let did = active_did(state)?; 720 721 let resolver = JacquardResolver::default(); 722 let rich = richtext::parse(&text).build_async(&resolver).await.map_err(|error| { 723 log::error!("richtext parse error: {error}"); 724 AppError::validation("failed to parse post text") 725 })?; 726 727 let mut post = Post::new().text(rich.text).created_at(Datetime::now()); 728 729 if let Some(facets) = rich.facets { 730 post = post.facets(facets); 731 } 732 733 if let Some(reply) = reply_to { 734 let reply_ref = ReplyRef::new() 735 .parent(strong_ref_from_input(&reply.parent)?) 736 .root(strong_ref_from_input(&reply.root)?) 737 .build(); 738 post = post.reply(reply_ref); 739 } 740 741 if let Some(embed) = embed { 742 post = post.embed(post_embed_from_input(embed)?); 743 } 744 745 let record_json = serde_json::to_value(post.build())?; 746 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize post"))?; 747 748 let repo = AtIdentifier::Did(Did::new(&did)?); 749 let collection = Nsid::new("app.bsky.feed.post").map_err(|_| AppError::validation("nsid"))?; 750 751 let output = session 752 .send( 753 CreateRecord::new() 754 .repo(repo) 755 .collection(collection) 756 .record(record_data) 757 .build(), 758 ) 759 .await 760 .map_err(|error| { 761 log::error!("createRecord (post) error: {error}"); 762 AppError::validation("failed to create record (post)") 763 })? 764 .into_output() 765 .map_err(|error| { 766 log::error!("createRecord (post) output error: {error}"); 767 AppError::validation("failed to create record (post) output") 768 })?; 769 770 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() }) 771} 772 773pub async fn like_post(uri: String, cid: String, state: &AppState) -> Result<CreateRecordResult> { 774 let session = get_session(state).await?; 775 let did = active_did(state)?; 776 777 let subject = StrongRef::new() 778 .uri(AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?) 779 .cid(Cid::str(&cid)) 780 .build(); 781 782 let like = Like::new().created_at(Datetime::now()).subject(subject).build(); 783 784 let record_json = serde_json::to_value(&like)?; 785 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize like"))?; 786 787 let repo = AtIdentifier::Did(Did::new(&did)?); 788 let collection = Nsid::new("app.bsky.feed.like").map_err(|_| AppError::validation("nsid"))?; 789 790 let output = session 791 .send( 792 CreateRecord::new() 793 .repo(repo) 794 .collection(collection) 795 .record(record_data) 796 .build(), 797 ) 798 .await 799 .map_err(|error| { 800 log::error!("createRecord (like) error: {error}"); 801 AppError::validation("failed to create record (like)") 802 })? 803 .into_output() 804 .map_err(|error| { 805 log::error!("createRecord (like) output error: {error}"); 806 AppError::validation("failed to create record (like) output") 807 })?; 808 809 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() }) 810} 811 812pub async fn unlike_post(like_uri: String, state: &AppState) -> Result<()> { 813 let session = get_session(state).await?; 814 let did = active_did(state)?; 815 816 let at_uri = AtUri::new(&like_uri).map_err(|_| AppError::validation("invalid like URI"))?; 817 let RecordKey(rkey) = at_uri 818 .rkey() 819 .ok_or_else(|| AppError::Validation("like URI has no rkey".into()))?; 820 let rkey_str = rkey.to_string(); 821 822 let repo = AtIdentifier::Did(Did::new(&did)?); 823 let collection = Nsid::new("app.bsky.feed.like").map_err(|_| AppError::validation("nsid"))?; 824 let rkey = RecordKey::any(&rkey_str).map_err(|_| AppError::validation("invalid rkey"))?; 825 826 session 827 .send(DeleteRecord::new().repo(repo).collection(collection).rkey(rkey).build()) 828 .await 829 .map_err(|error| { 830 log::error!("deleteRecord (unlike) error: {error}"); 831 AppError::validation("failed to delete record (unlike)") 832 })? 833 .into_output() 834 .map_err(|error| { 835 log::error!("deleteRecord (unlike) output error: {error}"); 836 AppError::validation("failed to delete record (unlike) output") 837 })?; 838 839 Ok(()) 840} 841 842pub async fn repost(uri: String, cid: String, state: &AppState) -> Result<CreateRecordResult> { 843 let session = get_session(state).await?; 844 let did = active_did(state)?; 845 846 let subject = StrongRef::new() 847 .uri(AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?) 848 .cid(Cid::str(&cid)) 849 .build(); 850 851 let repost = Repost::new().created_at(Datetime::now()).subject(subject).build(); 852 853 let record_json = serde_json::to_value(&repost)?; 854 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize repost"))?; 855 856 let repo = AtIdentifier::Did(Did::new(&did)?); 857 let collection = Nsid::new("app.bsky.feed.repost").map_err(|_| AppError::validation("nsid"))?; 858 859 let output = session 860 .send( 861 CreateRecord::new() 862 .repo(repo) 863 .collection(collection) 864 .record(record_data) 865 .build(), 866 ) 867 .await 868 .map_err(|error| { 869 log::error!("createRecord (repost) error: {error}"); 870 AppError::validation("failed to create record (repost)") 871 })? 872 .into_output() 873 .map_err(|error| { 874 log::error!("createRecord (repost) output error: {error}"); 875 AppError::validation("failed to create record (repost) output") 876 })?; 877 878 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() }) 879} 880 881pub async fn unrepost(repost_uri: String, state: &AppState) -> Result<()> { 882 let session = get_session(state).await?; 883 let did = active_did(state)?; 884 885 let at_uri = AtUri::new(&repost_uri).map_err(|_| AppError::validation("invalid repost URI"))?; 886 let RecordKey(rkey) = at_uri 887 .rkey() 888 .ok_or_else(|| AppError::Validation("repost URI has no rkey".into()))?; 889 890 let rkey_str = rkey.to_string(); 891 892 let repo = AtIdentifier::Did(Did::new(&did)?); 893 let collection = Nsid::new("app.bsky.feed.repost").map_err(|_| AppError::validation("nsid"))?; 894 let rkey = RecordKey::any(&rkey_str).map_err(|_| AppError::validation("invalid rkey"))?; 895 896 session 897 .send(DeleteRecord::new().repo(repo).collection(collection).rkey(rkey).build()) 898 .await 899 .map_err(|error| { 900 log::error!("deleteRecord (unrepost) error: {error}"); 901 AppError::validation("failed to delete record (unrepost)") 902 })? 903 .into_output() 904 .map_err(|error| { 905 log::error!("deleteRecord (unrepost) output error: {error}"); 906 AppError::validation("failed to delete record (unrepost) output") 907 })?; 908 909 Ok(()) 910} 911 912pub async fn bookmark_post(uri: String, cid: String, state: &AppState) -> Result<()> { 913 let session = get_session(state).await?; 914 let post_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?; 915 916 let response = session 917 .send(CreateBookmark::new().uri(post_uri).cid(Cid::str(&cid)).build()) 918 .await 919 .map_err(|error| { 920 log::error!("createBookmark error: {error}"); 921 AppError::validation("Could not save this post.") 922 })?; 923 924 if accepts_empty_bookmark_response(response.status(), response.buffer()) { 925 return Ok(()); 926 } 927 928 response.into_output().map_err(|error| { 929 log::error!("createBookmark output error: {error}"); 930 AppError::validation("Could not save this post.") 931 })?; 932 933 Ok(()) 934} 935 936pub async fn remove_bookmark(uri: String, state: &AppState) -> Result<()> { 937 let session = get_session(state).await?; 938 let post_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?; 939 940 let response = session 941 .send(DeleteBookmark::new().uri(post_uri).build()) 942 .await 943 .map_err(|error| { 944 log::error!("deleteBookmark error: {error}"); 945 AppError::validation("Could not remove this saved post.") 946 })?; 947 948 if accepts_empty_bookmark_response(response.status(), response.buffer()) { 949 return Ok(()); 950 } 951 952 response.into_output().map_err(|error| { 953 log::error!("deleteBookmark output error: {error}"); 954 AppError::validation("Could not remove this saved post.") 955 })?; 956 957 Ok(()) 958} 959 960pub async fn follow_actor(did: String, state: &AppState) -> Result<CreateRecordResult> { 961 let session = get_session(state).await?; 962 let active_did = active_did(state)?; 963 964 let follow = Follow::new() 965 .created_at(Datetime::now()) 966 .subject(Did::new(&did)?) 967 .build(); 968 969 let record_json = serde_json::to_value(&follow)?; 970 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize follow"))?; 971 972 let repo = AtIdentifier::Did(Did::new(&active_did)?); 973 let collection = Nsid::new("app.bsky.graph.follow").map_err(|_| AppError::validation("nsid"))?; 974 975 let output = session 976 .send( 977 CreateRecord::new() 978 .repo(repo) 979 .collection(collection) 980 .record(record_data) 981 .build(), 982 ) 983 .await 984 .map_err(|error| { 985 log::error!("createRecord (follow) error: {error}"); 986 AppError::validation("Could not follow this account.") 987 })? 988 .into_output() 989 .map_err(|error| { 990 log::error!("createRecord (follow) output error: {error}"); 991 AppError::validation("Could not follow this account.") 992 })?; 993 994 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() }) 995} 996 997pub async fn block_actor(did: String, state: &AppState) -> Result<CreateRecordResult> { 998 let session = get_session(state).await?; 999 let active_did = active_did(state)?; 1000 1001 let block = Block::new() 1002 .created_at(Datetime::now()) 1003 .subject(Did::new(&did).map_err(|_| AppError::validation("invalid account DID"))?) 1004 .build(); 1005 1006 let record_json = serde_json::to_value(&block)?; 1007 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize block"))?; 1008 1009 let repo = AtIdentifier::Did(Did::new(&active_did)?); 1010 let collection = Nsid::new("app.bsky.graph.block").map_err(|_| AppError::validation("nsid"))?; 1011 1012 let output = session 1013 .send( 1014 CreateRecord::new() 1015 .repo(repo) 1016 .collection(collection) 1017 .record(record_data) 1018 .build(), 1019 ) 1020 .await 1021 .map_err(|error| { 1022 log::error!("createRecord (block) error: {error}"); 1023 AppError::validation("Could not block this account.") 1024 })? 1025 .into_output() 1026 .map_err(|error| { 1027 log::error!("createRecord (block) output error: {error}"); 1028 AppError::validation("Could not block this account.") 1029 })?; 1030 1031 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() }) 1032} 1033 1034pub async fn unfollow_actor(follow_uri: String, state: &AppState) -> Result<()> { 1035 let session = get_session(state).await?; 1036 let did = active_did(state)?; 1037 1038 let at_uri = AtUri::new(&follow_uri).map_err(|_| AppError::validation("invalid follow URI"))?; 1039 let RecordKey(rkey) = at_uri 1040 .rkey() 1041 .ok_or_else(|| AppError::Validation("follow URI has no rkey".into()))?; 1042 let rkey_str = rkey.to_string(); 1043 1044 let repo = AtIdentifier::Did(Did::new(&did)?); 1045 let collection = Nsid::new("app.bsky.graph.follow").map_err(|_| AppError::validation("nsid"))?; 1046 let rkey = RecordKey::any(&rkey_str).map_err(|_| AppError::validation("invalid rkey"))?; 1047 1048 session 1049 .send(DeleteRecord::new().repo(repo).collection(collection).rkey(rkey).build()) 1050 .await 1051 .map_err(|error| { 1052 log::error!("deleteRecord (unfollow) error: {error}"); 1053 AppError::validation("Could not unfollow this account.") 1054 })? 1055 .into_output() 1056 .map_err(|error| { 1057 log::error!("deleteRecord (unfollow) output error: {error}"); 1058 AppError::validation("Could not unfollow this account.") 1059 })?; 1060 1061 Ok(()) 1062} 1063 1064pub async fn get_followers( 1065 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState, 1066) -> Result<serde_json::Value> { 1067 let session = get_session(state).await?; 1068 let actor = parse_actor_identifier(&actor)?; 1069 let mut req = GetFollowers::new().actor(actor).limit(limit.map(|value| value as i64)); 1070 if let Some(c) = &cursor { 1071 req = req.cursor(Some(c.as_str().into())); 1072 } 1073 1074 let output = session 1075 .send(req.build()) 1076 .await 1077 .map_err(|error| { 1078 log::error!("getFollowers error: {error}"); 1079 AppError::validation("Could not load followers.") 1080 })? 1081 .into_output() 1082 .map_err(|error| { 1083 log::error!("getFollowers output error: {error}"); 1084 AppError::validation("Could not load followers.") 1085 })?; 1086 1087 serde_json::to_value(&output).map_err(AppError::from) 1088} 1089 1090pub async fn get_follows( 1091 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState, 1092) -> Result<serde_json::Value> { 1093 let session = get_session(state).await?; 1094 let actor = parse_actor_identifier(&actor)?; 1095 let mut req = GetFollows::new().actor(actor).limit(limit.map(|value| value as i64)); 1096 if let Some(c) = &cursor { 1097 req = req.cursor(Some(c.as_str().into())); 1098 } 1099 1100 let output = session 1101 .send(req.build()) 1102 .await 1103 .map_err(|error| { 1104 log::error!("getFollows error: {error}"); 1105 AppError::validation("Could not load following.") 1106 })? 1107 .into_output() 1108 .map_err(|error| { 1109 log::error!("getFollows output error: {error}"); 1110 AppError::validation("Could not load following.") 1111 })?; 1112 1113 serde_json::to_value(&output).map_err(AppError::from) 1114} 1115 1116pub async fn audit_follows(app: &AppHandle, state: &AppState) -> Result<Vec<FlaggedFollow>> { 1117 let session = get_session(state).await?; 1118 let active_did = active_did(state)?; 1119 let follow_records = list_follow_records_for_audit(&session, &active_did).await?; 1120 if follow_records.is_empty() { 1121 return Ok(Vec::new()); 1122 } 1123 1124 let dids = follow_records 1125 .iter() 1126 .map(|record| record.did.clone()) 1127 .collect::<Vec<_>>(); 1128 let unique_dids = dedupe_preserve_order(dids); 1129 let follow_statuses = resolve_follow_statuses(&session, app, &active_did, unique_dids).await?; 1130 1131 Ok(follow_records 1132 .into_iter() 1133 .filter_map(|record| { 1134 follow_statuses 1135 .get(&record.did) 1136 .map(|status| build_flagged_follow(record, status.clone())) 1137 }) 1138 .collect()) 1139} 1140 1141pub async fn batch_unfollow(follow_uris: Vec<String>, state: &AppState) -> Result<BatchResult> { 1142 let session = get_session(state).await?; 1143 let active_did = active_did(state)?; 1144 1145 if follow_uris.is_empty() { 1146 return Ok(BatchResult { deleted: 0, failed: Vec::new() }); 1147 } 1148 1149 let mut targets = Vec::new(); 1150 let mut failed = Vec::new(); 1151 1152 for uri in follow_uris { 1153 match parse_follow_delete_target(&uri) { 1154 Ok(target) => targets.push(target), 1155 Err(reason) => { 1156 log::warn!("skipping invalid follow URI for batch unfollow: {uri} ({reason})"); 1157 failed.push(uri); 1158 } 1159 } 1160 } 1161 1162 let mut deleted = 0usize; 1163 for chunk in targets.chunks(FOLLOW_UNFOLLOW_WRITE_CHUNK_SIZE) { 1164 let (writes, chunk_uris, chunk_failed) = build_delete_writes(chunk); 1165 failed.extend(chunk_failed); 1166 1167 if writes.is_empty() { 1168 continue; 1169 } 1170 1171 match send_apply_writes_chunk_with_retry(&session, &active_did, writes).await { 1172 Ok(output) => { 1173 let (chunk_deleted, chunk_failures) = summarize_apply_writes_result(&chunk_uris, &output); 1174 deleted += chunk_deleted; 1175 failed.extend(chunk_failures); 1176 } 1177 Err(error) => { 1178 log::warn!( 1179 "applyWrites failed for unfollow batch ({} items): {error}", 1180 chunk_uris.len() 1181 ); 1182 failed.extend(chunk_uris); 1183 } 1184 } 1185 } 1186 1187 Ok(BatchResult { deleted, failed }) 1188} 1189 1190async fn list_follow_records_for_audit( 1191 session: &Arc<LazuriteOAuthSession>, active_did: &str, 1192) -> Result<Vec<FollowRecordEntry>> { 1193 let mut records = Vec::new(); 1194 let mut cursor = None; 1195 1196 loop { 1197 let output = list_follow_records_page_with_retry(session, active_did, cursor.clone()).await?; 1198 for record in output.records { 1199 if let Some(entry) = follow_record_entry_from_list_record(&record) { 1200 records.push(entry); 1201 } 1202 } 1203 1204 cursor = output.cursor.map(|value| value.to_string()); 1205 if cursor.is_none() { 1206 break; 1207 } 1208 1209 sleep(FOLLOW_AUDIT_INTER_BATCH_DELAY).await; 1210 } 1211 1212 Ok(records) 1213} 1214 1215async fn list_follow_records_page_with_retry( 1216 session: &Arc<LazuriteOAuthSession>, active_did: &str, cursor: Option<String>, 1217) -> Result<ListRecordsOutput<'static>> { 1218 let repo = AtIdentifier::Did(Did::new(active_did)?.into_static()); 1219 let collection = Nsid::new(FOLLOW_COLLECTION_NSID) 1220 .map_err(|_| AppError::validation("invalid follow collection NSID"))? 1221 .into_static(); 1222 let mut retries = 0usize; 1223 1224 loop { 1225 let response = session 1226 .send( 1227 ListRecords::new() 1228 .repo(repo.clone()) 1229 .collection(collection.clone()) 1230 .limit(FOLLOW_AUDIT_PAGE_LIMIT) 1231 .maybe_cursor(cursor.as_deref().map(Into::into)) 1232 .build(), 1233 ) 1234 .await 1235 .map_err(|error| { 1236 log::error!("follow hygiene listRecords request failed: {error}"); 1237 AppError::validation("Couldn't scan your follows right now.") 1238 })?; 1239 1240 if response.status() == StatusCode::TOO_MANY_REQUESTS { 1241 retries += 1; 1242 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1243 log::warn!("follow hygiene listRecords exceeded max rate-limit retries"); 1244 return Err(AppError::validation("Couldn't scan your follows right now.")); 1245 } 1246 1247 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1248 log::warn!( 1249 "follow hygiene listRecords rate-limited (attempt {retries}/{}), retrying in {}s", 1250 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1251 delay.as_secs() 1252 ); 1253 sleep(delay).await; 1254 continue; 1255 } 1256 1257 return response.into_output().map(IntoStatic::into_static).map_err(|error| { 1258 log::error!("follow hygiene listRecords output decode failed: {error}"); 1259 AppError::validation("Couldn't scan your follows right now.") 1260 }); 1261 } 1262} 1263 1264async fn resolve_follow_statuses( 1265 session: &Arc<LazuriteOAuthSession>, app: &AppHandle, active_did: &str, dids: Vec<String>, 1266) -> Result<HashMap<String, FollowStatusInfo>> { 1267 let mut resolved = HashMap::new(); 1268 if dids.is_empty() { 1269 return Ok(resolved); 1270 } 1271 1272 let chunks = dids 1273 .chunks(FOLLOW_AUDIT_PROFILE_BATCH_SIZE) 1274 .map(|chunk| chunk.to_vec()) 1275 .collect::<Vec<_>>(); 1276 let total_batches = chunks.len(); 1277 let semaphore = Arc::new(Semaphore::new(FOLLOW_AUDIT_PROFILE_BATCH_CONCURRENCY)); 1278 let mut join_set = JoinSet::new(); 1279 1280 for did_chunk in chunks { 1281 let session = session.clone(); 1282 let semaphore = semaphore.clone(); 1283 join_set.spawn(async move { 1284 let _permit = semaphore.acquire_owned().await.map_err(|error| { 1285 log::error!("follow hygiene semaphore acquisition failed: {error}"); 1286 AppError::validation("Couldn't scan your follows right now.") 1287 })?; 1288 let profiles = get_profiles_batch_with_retry(&session, &did_chunk).await?; 1289 sleep(FOLLOW_AUDIT_INTER_BATCH_DELAY).await; 1290 Ok::<(Vec<String>, Vec<ProfileViewDetailed<'static>>), AppError>((did_chunk, profiles)) 1291 }); 1292 } 1293 1294 let mut missing = dids.into_iter().collect::<HashSet<_>>(); 1295 let mut completed = 0usize; 1296 1297 while let Some(joined) = join_set.join_next().await { 1298 let (requested_dids, profiles) = joined.map_err(|error| { 1299 log::error!("follow hygiene profile batch task failed: {error}"); 1300 AppError::validation("Couldn't scan your follows right now.") 1301 })??; 1302 let mut found_dids = HashSet::new(); 1303 1304 for profile in profiles { 1305 let did = profile.did.to_string(); 1306 found_dids.insert(did.clone()); 1307 let status = follow_status_from_profile(&profile, active_did); 1308 if status != 0 { 1309 resolved.insert( 1310 did.clone(), 1311 FollowStatusInfo { handle: profile.handle.to_string(), status }, 1312 ); 1313 } 1314 missing.remove(&did); 1315 } 1316 1317 for did in requested_dids { 1318 if !found_dids.contains(&did) { 1319 missing.insert(did); 1320 } else { 1321 missing.remove(&did); 1322 } 1323 } 1324 1325 completed += 1; 1326 app.emit( 1327 FOLLOW_HYGIENE_PROGRESS_EVENT, 1328 FollowHygieneProgress { 1329 batch_size: FOLLOW_AUDIT_PROFILE_BATCH_SIZE, 1330 current: completed, 1331 total: total_batches, 1332 }, 1333 )?; 1334 } 1335 1336 for did in dedupe_preserve_order(missing.into_iter().collect()) { 1337 if let Some(status) = resolve_missing_follow_status(session, &did, active_did).await { 1338 resolved.insert(did, status); 1339 } 1340 } 1341 1342 Ok(resolved) 1343} 1344 1345async fn get_profiles_batch_with_retry( 1346 session: &Arc<LazuriteOAuthSession>, dids: &[String], 1347) -> Result<Vec<ProfileViewDetailed<'static>>> { 1348 let actors = dids 1349 .iter() 1350 .map(|did| Did::new(did).map(|parsed| AtIdentifier::Did(parsed.into_static()))) 1351 .collect::<std::result::Result<Vec<_>, _>>()?; 1352 let mut retries = 0usize; 1353 1354 loop { 1355 let response = session 1356 .send(GetProfiles::new().actors(actors.clone()).build()) 1357 .await 1358 .map_err(|error| { 1359 log::error!("follow hygiene getProfiles request failed: {error}"); 1360 AppError::validation("Couldn't scan your follows right now.") 1361 })?; 1362 1363 if response.status() == StatusCode::TOO_MANY_REQUESTS { 1364 retries += 1; 1365 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1366 log::warn!("follow hygiene getProfiles exceeded max rate-limit retries"); 1367 return Err(AppError::validation("Couldn't scan your follows right now.")); 1368 } 1369 1370 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1371 log::warn!( 1372 "follow hygiene getProfiles rate-limited (attempt {retries}/{}), retrying in {}s", 1373 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1374 delay.as_secs() 1375 ); 1376 sleep(delay).await; 1377 continue; 1378 } 1379 1380 let output = response.into_output().map_err(|error| { 1381 log::error!("follow hygiene getProfiles output decode failed: {error}"); 1382 AppError::validation("Couldn't scan your follows right now.") 1383 })?; 1384 return Ok(output.profiles.into_iter().map(IntoStatic::into_static).collect()); 1385 } 1386} 1387 1388async fn resolve_missing_follow_status( 1389 session: &Arc<LazuriteOAuthSession>, did: &str, active_did: &str, 1390) -> Option<FollowStatusInfo> { 1391 let did_value = Did::new(did).ok()?.into_static(); 1392 let self_follow = if did == active_did { FOLLOW_STATUS_SELF_FOLLOW } else { 0 }; 1393 1394 match get_profile_for_did_with_retry(session, &did_value).await { 1395 Ok(profile) => { 1396 let status = follow_status_from_profile(&profile, active_did); 1397 if status == 0 { 1398 None 1399 } else { 1400 Some(FollowStatusInfo { handle: profile.handle.to_string(), status }) 1401 } 1402 } 1403 Err(error_message) => { 1404 let mut status = follow_status_from_unavailability_reason(classify_actor_unavailability(&error_message)); 1405 status |= self_follow; 1406 1407 if status == 0 { 1408 log::warn!("follow hygiene missing DID fallback unclassified for {did}: {error_message}"); 1409 return None; 1410 } 1411 1412 let handle = resolve_handle_from_did_document(session, &did_value) 1413 .await 1414 .unwrap_or_else(|| did.to_string()); 1415 Some(FollowStatusInfo { handle, status }) 1416 } 1417 } 1418} 1419 1420async fn get_profile_for_did_with_retry( 1421 session: &Arc<LazuriteOAuthSession>, did: &Did<'_>, 1422) -> std::result::Result<ProfileViewDetailed<'static>, String> { 1423 let actor = AtIdentifier::Did(did.clone().into_static()); 1424 let mut retries = 0usize; 1425 1426 loop { 1427 let response = session 1428 .send(GetProfile::new().actor(actor.clone()).build()) 1429 .await 1430 .map_err(|error| error.to_string())?; 1431 1432 if response.status() == StatusCode::TOO_MANY_REQUESTS { 1433 retries += 1; 1434 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1435 return Err("rate limit retries exhausted".into()); 1436 } 1437 1438 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1439 log::warn!( 1440 "follow hygiene getProfile rate-limited for {} (attempt {retries}/{}), retrying in {}s", 1441 did.as_ref(), 1442 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1443 delay.as_secs() 1444 ); 1445 sleep(delay).await; 1446 continue; 1447 } 1448 1449 let output = response.into_output().map_err(|error| error.to_string())?; 1450 return Ok(output.value.into_static()); 1451 } 1452} 1453 1454async fn resolve_handle_from_did_document(session: &Arc<LazuriteOAuthSession>, did: &Did<'_>) -> Option<String> { 1455 let did_doc = session.resolve_did_doc(did).await.ok()?.into_owned().ok()?; 1456 1457 did_doc.also_known_as.as_ref().and_then(|aliases| { 1458 aliases.iter().find_map(|alias| { 1459 alias 1460 .as_ref() 1461 .strip_prefix("at://") 1462 .and_then(|candidate| Handle::new(candidate).ok().map(|handle| handle.to_string())) 1463 }) 1464 }) 1465} 1466 1467fn follow_status_from_unavailability_reason(reason: Option<ActorAvailabilityReason>) -> u8 { 1468 match reason { 1469 Some(ActorAvailabilityReason::NotFound) => FOLLOW_STATUS_DELETED, 1470 Some(ActorAvailabilityReason::Deactivated) => FOLLOW_STATUS_DEACTIVATED, 1471 Some(ActorAvailabilityReason::Suspended) => FOLLOW_STATUS_SUSPENDED, 1472 _ => 0, 1473 } 1474} 1475 1476fn follow_status_from_profile(profile: &ProfileViewDetailed<'_>, active_did: &str) -> u8 { 1477 let mut status = 0u8; 1478 1479 if profile.did.as_ref() == active_did { 1480 status |= FOLLOW_STATUS_SELF_FOLLOW; 1481 } 1482 1483 if profile 1484 .viewer 1485 .as_ref() 1486 .and_then(|viewer| viewer.blocked_by) 1487 .unwrap_or(false) 1488 { 1489 status |= FOLLOW_STATUS_BLOCKED_BY; 1490 } 1491 1492 let is_blocking = profile 1493 .viewer 1494 .as_ref() 1495 .and_then(|viewer| viewer.blocking.as_ref()) 1496 .is_some() 1497 || profile 1498 .viewer 1499 .as_ref() 1500 .and_then(|viewer| viewer.blocking_by_list.as_ref()) 1501 .is_some(); 1502 if is_blocking { 1503 status |= FOLLOW_STATUS_BLOCKING; 1504 } 1505 1506 if has_active_hide_label(profile.labels.as_deref()) { 1507 status |= FOLLOW_STATUS_HIDDEN; 1508 } 1509 1510 status 1511} 1512 1513fn has_active_hide_label(labels: Option<&[Label<'_>]>) -> bool { 1514 labels.is_some_and(|labels| { 1515 labels 1516 .iter() 1517 .any(|label| label.val.as_ref() == "!hide" && !label.neg.unwrap_or(false)) 1518 }) 1519} 1520 1521fn build_flagged_follow(record: FollowRecordEntry, status: FollowStatusInfo) -> FlaggedFollow { 1522 FlaggedFollow { 1523 did: record.did, 1524 handle: status.handle, 1525 follow_uri: record.follow_uri, 1526 status: status.status, 1527 status_label: follow_status_label(status.status), 1528 } 1529} 1530 1531fn follow_status_label(status: u8) -> String { 1532 if status == 0 { 1533 return "Unknown".to_string(); 1534 } 1535 1536 let mut labels = Vec::new(); 1537 1538 if status & FOLLOW_STATUS_DELETED != 0 { 1539 labels.push("Deleted"); 1540 } 1541 if status & FOLLOW_STATUS_DEACTIVATED != 0 { 1542 labels.push("Deactivated"); 1543 } 1544 if status & FOLLOW_STATUS_SUSPENDED != 0 { 1545 labels.push("Suspended"); 1546 } 1547 1548 let has_blocked_by = status & FOLLOW_STATUS_BLOCKED_BY != 0; 1549 let has_blocking = status & FOLLOW_STATUS_BLOCKING != 0; 1550 if has_blocked_by && has_blocking { 1551 labels.push("Mutual Block"); 1552 } else if has_blocked_by { 1553 labels.push("Blocked By"); 1554 } else if has_blocking { 1555 labels.push("Blocking"); 1556 } 1557 1558 if status & FOLLOW_STATUS_HIDDEN != 0 { 1559 labels.push("Hidden"); 1560 } 1561 if status & FOLLOW_STATUS_SELF_FOLLOW != 0 { 1562 labels.push("Self-Follow"); 1563 } 1564 1565 labels.join(", ") 1566} 1567 1568fn follow_record_entry_from_list_record(record: &RepoListRecord<'_>) -> Option<FollowRecordEntry> { 1569 let follow_uri = record.uri.to_string(); 1570 let did = match record.value.get_at_path("subject").and_then(Data::as_str) { 1571 Some(subject) => match Did::new(subject) { 1572 Ok(did) => did.to_string(), 1573 Err(error) => { 1574 log::warn!("follow hygiene skipped invalid follow subject DID in {follow_uri}: {error}"); 1575 return None; 1576 } 1577 }, 1578 None => { 1579 log::warn!("follow hygiene skipped follow record with missing subject in {follow_uri}"); 1580 return None; 1581 } 1582 }; 1583 1584 Some(FollowRecordEntry { did, follow_uri }) 1585} 1586 1587fn retry_after_delay(buffer: &[u8]) -> Option<Duration> { 1588 let payload = serde_json::from_slice::<serde_json::Value>(buffer).ok(); 1589 if let Some(seconds) = payload 1590 .as_ref() 1591 .and_then(|value| value.get("retryAfter")) 1592 .and_then(serde_json::Value::as_u64) 1593 { 1594 return Some(Duration::from_secs(seconds)); 1595 } 1596 1597 let text = payload 1598 .as_ref() 1599 .and_then(|value| value.get("message")) 1600 .and_then(serde_json::Value::as_str) 1601 .or_else(|| std::str::from_utf8(buffer).ok()) 1602 .unwrap_or_default(); 1603 1604 let lowered = text.to_ascii_lowercase(); 1605 for marker in ["retry-after", "retry after", "retry_after"] { 1606 if let Some(index) = lowered.find(marker) { 1607 let seconds = lowered[index..] 1608 .chars() 1609 .skip_while(|ch| !ch.is_ascii_digit()) 1610 .take_while(char::is_ascii_digit) 1611 .collect::<String>() 1612 .parse::<u64>() 1613 .ok()?; 1614 return Some(Duration::from_secs(seconds)); 1615 } 1616 } 1617 1618 None 1619} 1620 1621fn parse_follow_delete_target(uri: &str) -> std::result::Result<FollowDeleteTarget, &'static str> { 1622 let at_uri = AtUri::new(uri).map_err(|_| "invalid URI")?; 1623 let collection = at_uri.collection().map(|value| value.to_string()); 1624 if collection.as_deref() != Some(FOLLOW_COLLECTION_NSID) { 1625 return Err("URI does not point to follow collection"); 1626 } 1627 1628 let rkey = at_uri 1629 .rkey() 1630 .map(|value| value.as_ref().to_string()) 1631 .ok_or("URI missing rkey")?; 1632 1633 Ok(FollowDeleteTarget { uri: uri.to_string(), rkey }) 1634} 1635 1636fn build_delete_writes( 1637 targets: &[FollowDeleteTarget], 1638) -> (Vec<ApplyWritesWritesItem<'static>>, Vec<String>, Vec<String>) { 1639 let mut writes = Vec::with_capacity(targets.len()); 1640 let mut chunk_uris = Vec::with_capacity(targets.len()); 1641 let mut chunk_failed = Vec::new(); 1642 let collection = match Nsid::new(FOLLOW_COLLECTION_NSID) { 1643 Ok(collection) => collection.into_static(), 1644 Err(_) => { 1645 return ( 1646 writes, 1647 chunk_uris, 1648 targets.iter().map(|target| target.uri.clone()).collect(), 1649 ) 1650 } 1651 }; 1652 1653 for target in targets { 1654 let rkey = match RecordKey::any(&target.rkey) { 1655 Ok(rkey) => rkey.into_static(), 1656 Err(error) => { 1657 log::warn!("failed to parse follow rkey from URI {}: {error}", target.uri); 1658 chunk_failed.push(target.uri.clone()); 1659 continue; 1660 } 1661 }; 1662 1663 writes.push(ApplyWritesWritesItem::Delete(Box::new( 1664 Delete::new().collection(collection.clone()).rkey(rkey).build(), 1665 ))); 1666 chunk_uris.push(target.uri.clone()); 1667 } 1668 1669 (writes, chunk_uris, chunk_failed) 1670} 1671 1672fn summarize_apply_writes_result(chunk_uris: &[String], output: &ApplyWritesOutput<'_>) -> (usize, Vec<String>) { 1673 let Some(results) = output.results.as_ref() else { 1674 return (chunk_uris.len(), Vec::new()); 1675 }; 1676 1677 let mut deleted = 0usize; 1678 let mut failed = Vec::new(); 1679 1680 for (idx, uri) in chunk_uris.iter().enumerate() { 1681 match results.get(idx) { 1682 Some(ApplyWritesOutputResultsItem::DeleteResult(_)) => deleted += 1, 1683 _ => failed.push(uri.clone()), 1684 } 1685 } 1686 1687 (deleted, failed) 1688} 1689 1690async fn send_apply_writes_chunk_with_retry( 1691 session: &Arc<LazuriteOAuthSession>, active_did: &str, writes: Vec<ApplyWritesWritesItem<'static>>, 1692) -> Result<ApplyWritesOutput<'static>> { 1693 let repo = AtIdentifier::Did(Did::new(active_did)?.into_static()); 1694 let mut retries = 0usize; 1695 1696 loop { 1697 let response = session 1698 .send(ApplyWrites::new().repo(repo.clone()).writes(writes.clone()).build()) 1699 .await 1700 .map_err(|error| { 1701 log::warn!("follow hygiene applyWrites request failed: {error}"); 1702 AppError::validation("Couldn't unfollow selected accounts right now.") 1703 })?; 1704 1705 if response.status() == StatusCode::TOO_MANY_REQUESTS { 1706 retries += 1; 1707 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1708 return Err(AppError::validation("Couldn't unfollow selected accounts right now.")); 1709 } 1710 1711 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1712 log::warn!( 1713 "follow hygiene applyWrites rate-limited (attempt {retries}/{}), retrying in {}s", 1714 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1715 delay.as_secs() 1716 ); 1717 sleep(delay).await; 1718 continue; 1719 } 1720 1721 return response.into_output().map(IntoStatic::into_static).map_err(|error| { 1722 log::warn!("follow hygiene applyWrites output decode failed: {error}"); 1723 AppError::validation("Couldn't unfollow selected accounts right now.") 1724 }); 1725 } 1726} 1727 1728fn dedupe_preserve_order(values: Vec<String>) -> Vec<String> { 1729 let mut seen = HashSet::new(); 1730 values.into_iter().filter(|value| seen.insert(value.clone())).collect() 1731} 1732 1733fn strong_ref_from_input(input: &StrongRefInput) -> Result<StrongRef<'static>> { 1734 Ok(StrongRef::new() 1735 .uri( 1736 AtUri::new(&input.uri) 1737 .map_err(|_| AppError::validation("invalid URI in StrongRef"))? 1738 .into_static(), 1739 ) 1740 .cid(Cid::from(input.cid.clone()).into_static()) 1741 .build()) 1742} 1743 1744fn post_embed_from_input(input: EmbedInput) -> Result<PostEmbed<'static>> { 1745 match input { 1746 EmbedInput::Record { record } => Ok(PostEmbed::Record(Box::new( 1747 Record::new().record(strong_ref_from_input(&record)?).build(), 1748 ))), 1749 } 1750} 1751 1752#[derive(Debug, Deserialize)] 1753#[serde(rename_all = "camelCase")] 1754pub struct UpdateSavedFeedsInput { 1755 pub feeds: Vec<SavedFeedItem>, 1756} 1757 1758pub async fn update_saved_feeds(input: UpdateSavedFeedsInput, state: &AppState) -> Result<()> { 1759 let session = get_session(state).await?; 1760 let preferences = fetch_preference_items_with_session(&session).await?; 1761 let merged = merge_saved_feeds_preferences(preferences, input.feeds); 1762 store_preference_items(&session, merged).await 1763} 1764 1765pub async fn update_feed_view_pref(pref: FeedViewPrefItem, state: &AppState) -> Result<()> { 1766 let session = get_session(state).await?; 1767 let preferences = fetch_preference_items_with_session(&session).await?; 1768 let merged = merge_feed_view_preferences(preferences, pref); 1769 store_preference_items(&session, merged).await 1770} 1771 1772#[cfg(test)] 1773mod tests { 1774 use super::{ 1775 accepts_empty_bookmark_response, accepts_empty_put_preferences_response, build_delete_writes, 1776 follow_status_from_profile, follow_status_label, merge_feed_view_preferences, merge_saved_feeds_preferences, 1777 parse_follow_delete_target, retry_after_delay, summarize_apply_writes_result, user_preferences_from_items, 1778 FeedViewPrefItem, FollowDeleteTarget, SavedFeedItem, FOLLOW_STATUS_BLOCKED_BY, FOLLOW_STATUS_BLOCKING, 1779 FOLLOW_STATUS_HIDDEN, FOLLOW_STATUS_SELF_FOLLOW, 1780 }; 1781 use jacquard::api::app_bsky::actor::ProfileViewDetailed; 1782 use jacquard::api::app_bsky::actor::{AdultContentPref, FeedViewPref, PreferencesItem}; 1783 use jacquard::api::app_bsky::richtext::facet::FacetFeaturesItem; 1784 use jacquard::api::com_atproto::repo::apply_writes::{ 1785 ApplyWritesOutput, ApplyWritesOutputResultsItem, DeleteResult, 1786 }; 1787 use jacquard::richtext; 1788 use jacquard::types::aturi::AtUri; 1789 use jacquard::types::did::Did; 1790 use jacquard::types::handle::Handle; 1791 use jacquard::IntoStatic; 1792 use reqwest::StatusCode; 1793 use std::time::Duration; 1794 1795 fn adult_content_pref_item() -> PreferencesItem<'static> { 1796 PreferencesItem::AdultContentPref(Box::new(AdultContentPref::new().enabled(true).build())) 1797 } 1798 1799 fn feed_view_pref_item(feed: &str, hide_reposts: bool) -> PreferencesItem<'static> { 1800 PreferencesItem::FeedViewPref(Box::new(FeedViewPref { 1801 feed: feed.to_owned().into(), 1802 hide_quote_posts: Some(false), 1803 hide_replies: Some(false), 1804 hide_replies_by_like_count: None, 1805 hide_replies_by_unfollowed: Some(true), 1806 hide_reposts: Some(hide_reposts), 1807 extra_data: Default::default(), 1808 })) 1809 } 1810 1811 #[test] 1812 fn merging_saved_feeds_preserves_other_preferences() { 1813 let preferences = vec![adult_content_pref_item(), feed_view_pref_item("following", true)]; 1814 let merged = merge_saved_feeds_preferences( 1815 preferences, 1816 vec![SavedFeedItem { 1817 id: "following".into(), 1818 r#type: "timeline".into(), 1819 value: "following".into(), 1820 pinned: true, 1821 }], 1822 ); 1823 1824 assert!(merged 1825 .iter() 1826 .any(|item| matches!(item, PreferencesItem::AdultContentPref(_)))); 1827 assert!(merged 1828 .iter() 1829 .any(|item| matches!(item, PreferencesItem::FeedViewPref(_)))); 1830 1831 let user_preferences = user_preferences_from_items(&merged); 1832 assert_eq!(user_preferences.saved_feeds.len(), 1); 1833 assert_eq!(user_preferences.feed_view_prefs.len(), 1); 1834 assert!(user_preferences.feed_view_prefs[0].hide_reposts); 1835 } 1836 1837 #[test] 1838 fn merging_feed_view_pref_replaces_only_matching_feed() { 1839 let preferences = vec![ 1840 adult_content_pref_item(), 1841 feed_view_pref_item("following", true), 1842 feed_view_pref_item("at://feed/custom", false), 1843 ]; 1844 let merged = merge_feed_view_preferences( 1845 preferences, 1846 FeedViewPrefItem { 1847 feed: "following".into(), 1848 hide_replies: true, 1849 hide_replies_by_unfollowed: false, 1850 hide_replies_by_like_count: Some(4), 1851 hide_reposts: false, 1852 hide_quote_posts: true, 1853 }, 1854 ); 1855 1856 let user_preferences = user_preferences_from_items(&merged); 1857 assert_eq!(user_preferences.feed_view_prefs.len(), 2); 1858 1859 let following = user_preferences 1860 .feed_view_prefs 1861 .iter() 1862 .find(|pref| pref.feed == "following") 1863 .expect("following pref should exist"); 1864 assert!(!following.hide_reposts); 1865 assert!(following.hide_quote_posts); 1866 assert_eq!(following.hide_replies_by_like_count, Some(4)); 1867 1868 let custom = user_preferences 1869 .feed_view_prefs 1870 .iter() 1871 .find(|pref| pref.feed == "at://feed/custom") 1872 .expect("custom pref should exist"); 1873 assert!(!custom.hide_quote_posts); 1874 assert!(!custom.hide_replies); 1875 } 1876 1877 #[test] 1878 fn empty_success_put_preferences_response_is_treated_as_valid() { 1879 assert!(accepts_empty_put_preferences_response(StatusCode::OK, b"")); 1880 assert!(!accepts_empty_put_preferences_response(StatusCode::OK, b"null")); 1881 assert!(!accepts_empty_put_preferences_response(StatusCode::BAD_REQUEST, b"")); 1882 } 1883 1884 #[test] 1885 fn empty_success_bookmark_response_is_treated_as_valid() { 1886 assert!(accepts_empty_bookmark_response(StatusCode::OK, b"")); 1887 assert!(!accepts_empty_bookmark_response(StatusCode::OK, b"{}")); 1888 assert!(!accepts_empty_bookmark_response(StatusCode::BAD_REQUEST, b"")); 1889 } 1890 1891 #[test] 1892 fn follow_status_label_collapses_mutual_block() { 1893 let status = FOLLOW_STATUS_BLOCKED_BY | FOLLOW_STATUS_BLOCKING | FOLLOW_STATUS_HIDDEN; 1894 assert_eq!(follow_status_label(status), "Mutual Block, Hidden"); 1895 } 1896 1897 #[test] 1898 fn follow_status_from_profile_sets_expected_flags() { 1899 let mut viewer = jacquard::api::app_bsky::actor::ViewerState::default(); 1900 viewer.blocked_by = Some(true); 1901 viewer.blocking = Some( 1902 AtUri::new("at://did:plc:me/app.bsky.graph.block/abc123") 1903 .expect("uri should parse") 1904 .into_static(), 1905 ); 1906 1907 let profile = ProfileViewDetailed::new() 1908 .did(Did::new("did:plc:alice").expect("did should parse").into_static()) 1909 .handle(Handle::new("alice.test").expect("handle should parse").into_static()) 1910 .viewer(Some(viewer)) 1911 .build(); 1912 1913 let status = follow_status_from_profile(&profile, "did:plc:alice"); 1914 1915 assert_ne!(status & FOLLOW_STATUS_BLOCKED_BY, 0); 1916 assert_ne!(status & FOLLOW_STATUS_BLOCKING, 0); 1917 assert_ne!(status & FOLLOW_STATUS_SELF_FOLLOW, 0); 1918 } 1919 1920 #[test] 1921 fn parse_follow_delete_target_rejects_invalid_inputs() { 1922 assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.graph.follow/abc123").is_ok()); 1923 assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.feed.like/abc123").is_err()); 1924 assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.graph.follow").is_err()); 1925 assert!(parse_follow_delete_target("not-a-uri").is_err()); 1926 } 1927 1928 #[test] 1929 fn build_delete_writes_skips_invalid_rkeys() { 1930 let targets = vec![ 1931 FollowDeleteTarget { uri: "at://did:plc:alice/app.bsky.graph.follow/abc123".into(), rkey: "abc123".into() }, 1932 FollowDeleteTarget { uri: "at://did:plc:alice/app.bsky.graph.follow/bad".into(), rkey: "bad key".into() }, 1933 ]; 1934 1935 let (writes, chunk_uris, failed) = build_delete_writes(&targets); 1936 assert_eq!(writes.len(), 1); 1937 assert_eq!(chunk_uris, vec!["at://did:plc:alice/app.bsky.graph.follow/abc123"]); 1938 assert_eq!(failed, vec!["at://did:plc:alice/app.bsky.graph.follow/bad"]); 1939 } 1940 1941 #[test] 1942 fn summarize_apply_writes_result_handles_missing_entries_as_failures() { 1943 let output: ApplyWritesOutput<'_> = ApplyWritesOutput { 1944 results: Some(vec![ApplyWritesOutputResultsItem::DeleteResult(Box::new( 1945 DeleteResult::default(), 1946 ))]), 1947 ..Default::default() 1948 }; 1949 let chunk_uris = vec![ 1950 "at://did:plc:a/app.bsky.graph.follow/1".to_string(), 1951 "at://did:plc:b/app.bsky.graph.follow/2".to_string(), 1952 ]; 1953 1954 let (deleted, failed) = summarize_apply_writes_result(&chunk_uris, &output); 1955 assert_eq!(deleted, 1); 1956 assert_eq!(failed, vec!["at://did:plc:b/app.bsky.graph.follow/2"]); 1957 } 1958 1959 #[test] 1960 fn summarize_apply_writes_result_treats_missing_results_as_all_successful() { 1961 let output: ApplyWritesOutput<'_> = ApplyWritesOutput::default(); 1962 let chunk_uris = vec![ 1963 "at://did:plc:a/app.bsky.graph.follow/1".to_string(), 1964 "at://did:plc:b/app.bsky.graph.follow/2".to_string(), 1965 ]; 1966 1967 let (deleted, failed) = summarize_apply_writes_result(&chunk_uris, &output); 1968 assert_eq!(deleted, 2); 1969 assert!(failed.is_empty()); 1970 } 1971 1972 #[test] 1973 fn retry_after_delay_reads_numeric_seconds_from_payload() { 1974 let body = br#"{"message":"rate limited, retry after 7 seconds"}"#; 1975 assert_eq!(retry_after_delay(body), Some(Duration::from_secs(7))); 1976 1977 assert_eq!(retry_after_delay(br#"{"message":"slow down"}"#), None); 1978 } 1979 1980 #[test] 1981 fn richtext_parse_converts_markdown_links_into_plain_text_and_link_facets() { 1982 let rich = tokio::runtime::Runtime::new() 1983 .expect("tokio runtime should build") 1984 .block_on(async { 1985 richtext::parse("[example](https://example.com)") 1986 .build_async(&super::JacquardResolver::default()) 1987 .await 1988 }) 1989 .expect("richtext should build"); 1990 1991 assert_eq!(rich.text.as_ref(), "example"); 1992 let facets = rich.facets.expect("markdown link should create a facet"); 1993 assert_eq!(facets.len(), 1); 1994 assert_eq!(facets[0].index.byte_start, 0); 1995 assert_eq!(facets[0].index.byte_end, 7); 1996 1997 match &facets[0].features[0] { 1998 FacetFeaturesItem::Link(link) => assert_eq!(link.uri.as_ref(), "https://example.com"), 1999 other => panic!("expected link facet, got {other:?}"), 2000 } 2001 } 2002 2003 #[test] 2004 fn richtext_parse_keeps_other_facets_after_markdown_link_normalization() { 2005 let rich = tokio::runtime::Runtime::new() 2006 .expect("tokio runtime should build") 2007 .block_on(async { 2008 richtext::parse("[example](https://example.com) #rust https://docs.rs @did:plc:alice") 2009 .build_async(&super::JacquardResolver::default()) 2010 .await 2011 }) 2012 .expect("richtext should build"); 2013 2014 assert_eq!(rich.text.as_ref(), "example #rust https://docs.rs @did:plc:alice"); 2015 let facets = rich.facets.expect("text should produce facets"); 2016 2017 assert_eq!(facets.len(), 4); 2018 assert!(matches!(facets[0].features[0], FacetFeaturesItem::Link(_))); 2019 assert!(matches!(facets[1].features[0], FacetFeaturesItem::Tag(_))); 2020 assert!(matches!(facets[2].features[0], FacetFeaturesItem::Link(_))); 2021 assert!(matches!(facets[3].features[0], FacetFeaturesItem::Mention(_))); 2022 } 2023 2024 #[test] 2025 fn richtext_parse_leaves_invalid_markdown_link_syntax_unchanged() { 2026 let rich = tokio::runtime::Runtime::new() 2027 .expect("tokio runtime should build") 2028 .block_on(async { 2029 richtext::parse("[broken](not a url") 2030 .build_async(&super::JacquardResolver::default()) 2031 .await 2032 }) 2033 .expect("richtext should build"); 2034 2035 assert_eq!(rich.text.as_ref(), "[broken](not a url"); 2036 assert!(rich.facets.is_none(), "invalid markdown should not produce facets"); 2037 } 2038}