BlueSky & more on desktop
lazurite.stormlightlabs.org/
tauri
rust
typescript
bluesky
appview
atproto
solid
1use super::actors::{
2 actor_unavailable_message, classify_actor_unavailability, requested_actor_hints, ActorAvailabilityReason,
3};
4use super::auth::LazuriteOAuthSession;
5use super::error::{AppError, Result};
6use super::state::AppState;
7use jacquard::api::app_bsky::actor::get_preferences::GetPreferences;
8use jacquard::api::app_bsky::actor::get_profile::GetProfile;
9use jacquard::api::app_bsky::actor::get_profiles::GetProfiles;
10use jacquard::api::app_bsky::actor::put_preferences::PutPreferences;
11use jacquard::api::app_bsky::actor::{
12 FeedViewPref, PreferencesItem, ProfileViewDetailed, SavedFeed, SavedFeedType, SavedFeedsPrefV2,
13 SavedFeedsPrefV2Builder,
14};
15use jacquard::api::app_bsky::bookmark::create_bookmark::CreateBookmark;
16use jacquard::api::app_bsky::bookmark::delete_bookmark::DeleteBookmark;
17use jacquard::api::app_bsky::embed::record::Record;
18use jacquard::api::app_bsky::feed::get_actor_likes::GetActorLikes;
19use jacquard::api::app_bsky::feed::get_author_feed::GetAuthorFeed;
20use jacquard::api::app_bsky::feed::get_feed::GetFeed;
21use jacquard::api::app_bsky::feed::get_feed_generator::GetFeedGenerator;
22use jacquard::api::app_bsky::feed::get_feed_generators::GetFeedGenerators;
23use jacquard::api::app_bsky::feed::get_list_feed::GetListFeed;
24use jacquard::api::app_bsky::feed::get_post_thread::GetPostThread;
25use jacquard::api::app_bsky::feed::get_timeline::GetTimeline;
26use jacquard::api::app_bsky::feed::like::Like;
27use jacquard::api::app_bsky::feed::post::{Post, PostEmbed, ReplyRef};
28use jacquard::api::app_bsky::feed::repost::Repost;
29use jacquard::api::app_bsky::feed::GeneratorView;
30use jacquard::api::app_bsky::graph::block::Block;
31use jacquard::api::app_bsky::graph::follow::Follow;
32use jacquard::api::app_bsky::graph::get_followers::GetFollowers;
33use jacquard::api::app_bsky::graph::get_follows::GetFollows;
34use jacquard::api::com_atproto::label::Label;
35use jacquard::api::com_atproto::repo::apply_writes::{
36 ApplyWrites, ApplyWritesOutput, ApplyWritesOutputResultsItem, ApplyWritesWritesItem, Delete,
37};
38use jacquard::api::com_atproto::repo::create_record::CreateRecord;
39use jacquard::api::com_atproto::repo::delete_record::DeleteRecord;
40use jacquard::api::com_atproto::repo::list_records::{ListRecords, ListRecordsOutput, Record as RepoListRecord};
41use jacquard::api::com_atproto::repo::strong_ref::StrongRef;
42use jacquard::identity::{resolver::IdentityResolver, JacquardResolver};
43use jacquard::richtext;
44use jacquard::types::aturi::AtUri;
45use jacquard::types::cid::Cid;
46use jacquard::types::datetime::Datetime;
47use jacquard::types::did::Did;
48use jacquard::types::handle::Handle;
49use jacquard::types::ident::AtIdentifier;
50use jacquard::types::nsid::Nsid;
51use jacquard::types::recordkey::RecordKey;
52use jacquard::types::value::Data;
53use jacquard::xrpc::XrpcClient;
54use jacquard::IntoStatic;
55use reqwest::StatusCode;
56use serde::{Deserialize, Serialize};
57use std::collections::{HashMap, HashSet};
58use std::sync::Arc;
59use std::time::Duration;
60use tauri::{AppHandle, Emitter};
61use tauri_plugin_log::log;
62use tokio::sync::Semaphore;
63use tokio::task::JoinSet;
64use tokio::time::sleep;
65
66const FEED_GENERATOR_BATCH_SIZE: usize = 10;
67
68const FOLLOW_COLLECTION_NSID: &str = "app.bsky.graph.follow";
69const FOLLOW_HYGIENE_PROGRESS_EVENT: &str = "follow-hygiene:progress";
70const FOLLOW_AUDIT_PAGE_LIMIT: i64 = 100;
71const FOLLOW_AUDIT_PROFILE_BATCH_SIZE: usize = 25;
72const FOLLOW_AUDIT_PROFILE_BATCH_CONCURRENCY: usize = 3;
73const FOLLOW_AUDIT_INTER_BATCH_DELAY: Duration = Duration::from_millis(250);
74const FOLLOW_AUDIT_RETRY_AFTER_DEFAULT: Duration = Duration::from_secs(2);
75const FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES: usize = 5;
76const FOLLOW_UNFOLLOW_WRITE_CHUNK_SIZE: usize = 200;
77
78const FOLLOW_STATUS_DELETED: u8 = 1 << 0;
79const FOLLOW_STATUS_DEACTIVATED: u8 = 1 << 1;
80const FOLLOW_STATUS_SUSPENDED: u8 = 1 << 2;
81const FOLLOW_STATUS_BLOCKED_BY: u8 = 1 << 3;
82const FOLLOW_STATUS_BLOCKING: u8 = 1 << 4;
83const FOLLOW_STATUS_HIDDEN: u8 = 1 << 5;
84const FOLLOW_STATUS_SELF_FOLLOW: u8 = 1 << 6;
85
86#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
87#[serde(tag = "status", rename_all = "camelCase")]
88pub enum ProfileLookupResult {
89 Available {
90 profile: serde_json::Value,
91 },
92 Unavailable {
93 requested_actor: String,
94 did: Option<String>,
95 handle: Option<String>,
96 reason: ActorAvailabilityReason,
97 message: String,
98 },
99}
100
101async fn get_session(state: &AppState) -> Result<Arc<LazuriteOAuthSession>> {
102 let did = state
103 .active_session
104 .read()
105 .map_err(|error| {
106 log::error!("active_session poisoned: {error}");
107 AppError::StatePoisoned("active_session")
108 })?
109 .as_ref()
110 .ok_or_else(|| {
111 log::error!("no active account");
112 AppError::Validation("no active account".into())
113 })?
114 .did
115 .clone();
116
117 state
118 .sessions
119 .read()
120 .map_err(|error| AppError::state_poisoned(format!("sessions {error}")))?
121 .get(&did)
122 .cloned()
123 .ok_or_else(|| AppError::validation(format!("session not found for active account {did}")))
124}
125
126fn active_did(state: &AppState) -> Result<String> {
127 state
128 .active_session
129 .read()
130 .map_err(|error| AppError::state_poisoned(format!("active_session poisoned with error {error}")))?
131 .as_ref()
132 .ok_or_else(|| AppError::Validation("no active account".into()))
133 .map(|s| s.did.clone())
134}
135
136fn parse_actor_identifier(actor: &str) -> Result<AtIdentifier<'static>> {
137 let trimmed = actor.trim();
138 if trimmed.is_empty() {
139 return Err(AppError::validation("actor must not be empty"));
140 }
141
142 if let Ok(did) = Did::new(trimmed) {
143 return Ok(AtIdentifier::Did(did.into_static()));
144 }
145
146 let normalized_handle = trimmed.trim_start_matches('@');
147 if let Ok(handle) = Handle::new(normalized_handle) {
148 return Ok(AtIdentifier::Handle(handle.into_static()));
149 }
150
151 Err(AppError::validation("actor must be a valid DID or handle"))
152}
153
154#[derive(Debug, Serialize, Deserialize)]
155#[serde(rename_all = "camelCase")]
156pub struct SavedFeedItem {
157 pub id: String,
158 pub r#type: String,
159 pub value: String,
160 pub pinned: bool,
161}
162
163#[derive(Debug, Serialize, Deserialize)]
164#[serde(rename_all = "camelCase")]
165pub struct FeedViewPrefItem {
166 pub feed: String,
167 pub hide_replies: bool,
168 pub hide_replies_by_unfollowed: bool,
169 pub hide_replies_by_like_count: Option<i64>,
170 pub hide_reposts: bool,
171 pub hide_quote_posts: bool,
172}
173
174#[derive(Debug, Serialize, Deserialize)]
175#[serde(rename_all = "camelCase")]
176pub struct UserPreferences {
177 pub saved_feeds: Vec<SavedFeedItem>,
178 pub feed_view_prefs: Vec<FeedViewPrefItem>,
179}
180
181type StoredPreferences = Vec<PreferencesItem<'static>>;
182
183fn extract_saved_feeds(pref: &SavedFeedsPrefV2<'_>) -> Vec<SavedFeedItem> {
184 pref.items
185 .iter()
186 .map(|f| SavedFeedItem {
187 id: f.id.to_string(),
188 r#type: match &f.r#type {
189 SavedFeedType::Timeline => "timeline".into(),
190 SavedFeedType::Feed => "feed".into(),
191 SavedFeedType::List => "list".into(),
192 SavedFeedType::Other(s) => s.to_string(),
193 },
194 value: f.value.to_string(),
195 pinned: f.pinned,
196 })
197 .collect()
198}
199
200fn extract_feed_view_pref(pref: &FeedViewPref<'_>) -> FeedViewPrefItem {
201 FeedViewPrefItem {
202 feed: pref.feed.to_string(),
203 hide_replies: pref.hide_replies.unwrap_or(false),
204 hide_replies_by_unfollowed: pref.hide_replies_by_unfollowed.unwrap_or(true),
205 hide_replies_by_like_count: pref.hide_replies_by_like_count,
206 hide_reposts: pref.hide_reposts.unwrap_or(false),
207 hide_quote_posts: pref.hide_quote_posts.unwrap_or(false),
208 }
209}
210
211fn user_preferences_from_items(items: &[PreferencesItem<'_>]) -> UserPreferences {
212 let mut saved_feeds = Vec::new();
213 let mut feed_view_prefs = Vec::new();
214
215 for item in items {
216 match item {
217 PreferencesItem::SavedFeedsPrefV2(pref) => saved_feeds = extract_saved_feeds(pref),
218 PreferencesItem::FeedViewPref(pref) => feed_view_prefs.push(extract_feed_view_pref(pref)),
219 _ => (),
220 }
221 }
222
223 UserPreferences { saved_feeds, feed_view_prefs }
224}
225
226async fn fetch_preference_items(state: &AppState) -> Result<StoredPreferences> {
227 let session = get_session(state).await?;
228 fetch_preference_items_with_session(&session).await
229}
230
231async fn fetch_preference_items_with_session(session: &Arc<LazuriteOAuthSession>) -> Result<StoredPreferences> {
232 let output = session
233 .send(GetPreferences)
234 .await
235 .map_err(|error| {
236 log::error!("fetch Preferences error: {error}");
237 AppError::validation("fetch Preferences error")
238 })?
239 .into_output()
240 .map_err(|error| {
241 log::error!("fetch Preferences output error: {error}");
242 AppError::validation("fetch Preferences output error")
243 })?;
244
245 Ok(output.preferences.into_iter().map(IntoStatic::into_static).collect())
246}
247
248fn accepts_empty_put_preferences_response(status: reqwest::StatusCode, body: &[u8]) -> bool {
249 status.is_success() && body.is_empty()
250}
251
252fn accepts_empty_bookmark_response(status: reqwest::StatusCode, body: &[u8]) -> bool {
253 status.is_success() && body.is_empty()
254}
255
256async fn store_preference_items(session: &Arc<LazuriteOAuthSession>, items: StoredPreferences) -> Result<()> {
257 let response = session
258 .send(PutPreferences::new().preferences(items).build())
259 .await
260 .map_err(|error| {
261 log::error!("putPreferences error: {error}");
262 AppError::validation("putPreferences error")
263 })?;
264
265 if accepts_empty_put_preferences_response(response.status(), response.buffer()) {
266 return Ok(());
267 }
268
269 response.into_output().map_err(|error| {
270 log::error!("putPreferences output error: {error}");
271 AppError::validation("putPreferences output error")
272 })?;
273
274 Ok(())
275}
276
277fn build_saved_feeds_preference_item(feeds: Vec<SavedFeedItem>) -> PreferencesItem<'static> {
278 let items = feeds
279 .into_iter()
280 .map(|feed| {
281 SavedFeed::new()
282 .id(feed.id)
283 .r#type(match feed.r#type.as_str() {
284 "timeline" => SavedFeedType::Timeline,
285 "feed" => SavedFeedType::Feed,
286 "list" => SavedFeedType::List,
287 _ => SavedFeedType::Other(feed.r#type.into()),
288 })
289 .value(feed.value)
290 .pinned(feed.pinned)
291 .build()
292 })
293 .collect::<Vec<_>>();
294
295 PreferencesItem::SavedFeedsPrefV2(Box::new(SavedFeedsPrefV2Builder::new().items(items).build()))
296}
297
298fn build_feed_view_pref_item(pref: FeedViewPrefItem) -> PreferencesItem<'static> {
299 PreferencesItem::FeedViewPref(Box::new(FeedViewPref {
300 feed: pref.feed.into(),
301 hide_quote_posts: Some(pref.hide_quote_posts),
302 hide_replies: Some(pref.hide_replies),
303 hide_replies_by_like_count: pref.hide_replies_by_like_count,
304 hide_replies_by_unfollowed: Some(pref.hide_replies_by_unfollowed),
305 hide_reposts: Some(pref.hide_reposts),
306 extra_data: Default::default(),
307 }))
308}
309
310fn merge_saved_feeds_preferences(preferences: StoredPreferences, feeds: Vec<SavedFeedItem>) -> StoredPreferences {
311 let mut merged = preferences
312 .into_iter()
313 .filter(|item| {
314 !matches!(
315 item,
316 PreferencesItem::SavedFeedsPref(_) | PreferencesItem::SavedFeedsPrefV2(_)
317 )
318 })
319 .collect::<Vec<_>>();
320 merged.push(build_saved_feeds_preference_item(feeds));
321 merged
322}
323
324fn merge_feed_view_preferences(preferences: StoredPreferences, pref: FeedViewPrefItem) -> StoredPreferences {
325 let feed = pref.feed.clone();
326 let mut merged = preferences
327 .into_iter()
328 .filter(|item| match item {
329 PreferencesItem::FeedViewPref(existing) => existing.feed.as_ref() != feed.as_str(),
330 _ => true,
331 })
332 .collect::<Vec<_>>();
333 merged.push(build_feed_view_pref_item(pref));
334 merged
335}
336
337#[derive(Debug, Deserialize)]
338#[serde(rename_all = "camelCase")]
339pub struct StrongRefInput {
340 pub uri: String,
341 pub cid: String,
342}
343
344#[derive(Debug, Deserialize)]
345#[serde(rename_all = "camelCase")]
346pub struct ReplyRefInput {
347 pub parent: StrongRefInput,
348 pub root: StrongRefInput,
349}
350
351#[derive(Debug, Deserialize)]
352#[serde(tag = "type", rename_all = "camelCase")]
353pub enum EmbedInput {
354 Record { record: StrongRefInput },
355}
356
357#[derive(Debug, Deserialize, Serialize)]
358#[serde(rename_all = "camelCase")]
359pub struct CreateRecordResult {
360 pub uri: String,
361 pub cid: String,
362}
363
364#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
365#[serde(rename_all = "camelCase")]
366pub struct FlaggedFollow {
367 pub did: String,
368 pub handle: String,
369 pub follow_uri: String,
370 pub status: u8,
371 pub status_label: String,
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
375#[serde(rename_all = "camelCase")]
376pub struct BatchResult {
377 pub deleted: usize,
378 pub failed: Vec<String>,
379}
380
381#[derive(Debug, Clone, Serialize)]
382#[serde(rename_all = "camelCase")]
383struct FollowHygieneProgress {
384 batch_size: usize,
385 current: usize,
386 total: usize,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390struct FollowRecordEntry {
391 did: String,
392 follow_uri: String,
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
396struct FollowStatusInfo {
397 handle: String,
398 status: u8,
399}
400
401#[derive(Debug, Clone, PartialEq, Eq)]
402struct FollowDeleteTarget {
403 uri: String,
404 rkey: String,
405}
406
407pub async fn get_preferences(state: &AppState) -> Result<UserPreferences> {
408 let preferences = fetch_preference_items(state).await?;
409 Ok(user_preferences_from_items(&preferences))
410}
411
412pub async fn get_feed_generators(uris: Vec<String>, state: &AppState) -> Result<serde_json::Value> {
413 if uris.is_empty() {
414 return Ok(serde_json::json!({ "feeds": [] }));
415 }
416
417 let session = get_session(state).await?;
418 let feeds = uris
419 .iter()
420 .filter_map(|uri| {
421 AtUri::new(uri)
422 .map(IntoStatic::into_static)
423 .map_err(|error| {
424 log::warn!("skipping invalid feed URI in get_feed_generators input ({uri}): {error}");
425 error
426 })
427 .ok()
428 })
429 .collect::<Vec<AtUri<'static>>>();
430
431 if feeds.is_empty() {
432 return Ok(serde_json::json!({ "feeds": [] }));
433 }
434
435 let mut collected = Vec::<GeneratorView<'static>>::new();
436 let mut seen = HashSet::<String>::new();
437
438 for chunk in feeds.chunks(FEED_GENERATOR_BATCH_SIZE) {
439 let request = GetFeedGenerators::new().feeds(chunk.to_vec()).build();
440 match session.send(request).await {
441 Ok(response) => match response.into_output() {
442 Ok(output) => {
443 for feed in output.feeds.into_iter().map(IntoStatic::into_static) {
444 let uri = feed.uri.as_ref().to_string();
445 if seen.insert(uri) {
446 collected.push(feed);
447 }
448 }
449 }
450 Err(error) => {
451 log::info!(
452 "getFeedGenerators decode failed for batch ({} feeds); trying per-feed fallback: {error}",
453 chunk.len()
454 );
455 let recovered =
456 fetch_feed_generators_individually(chunk, session.as_ref(), &mut collected, &mut seen).await;
457 if recovered == 0 {
458 log::warn!(
459 "getFeedGenerators fallback failed for batch ({} feeds); no generator metadata recovered",
460 chunk.len()
461 );
462 }
463 }
464 },
465 Err(error) => {
466 log::info!(
467 "getFeedGenerators request failed for batch ({} feeds); trying per-feed fallback: {error}",
468 chunk.len()
469 );
470 let recovered =
471 fetch_feed_generators_individually(chunk, session.as_ref(), &mut collected, &mut seen).await;
472 if recovered == 0 {
473 log::warn!(
474 "getFeedGenerators fallback request failed for batch ({} feeds); no generator metadata recovered",
475 chunk.len()
476 );
477 }
478 }
479 }
480 }
481
482 Ok(serde_json::json!({ "feeds": collected }))
483}
484
485async fn fetch_feed_generators_individually(
486 feeds: &[AtUri<'static>], session: &LazuriteOAuthSession, out: &mut Vec<GeneratorView<'static>>,
487 seen: &mut HashSet<String>,
488) -> usize {
489 let mut recovered = 0;
490 for feed in feeds {
491 let request = GetFeedGenerator::new().feed(feed.clone()).build();
492 let response = match session.send(request).await {
493 Ok(response) => response,
494 Err(error) => {
495 log::debug!("getFeedGenerator request failed for {}: {error}", feed.as_ref());
496 continue;
497 }
498 };
499
500 let output = match response.into_output() {
501 Ok(output) => output,
502 Err(error) => {
503 log::debug!("getFeedGenerator decode failed for {}: {error}", feed.as_ref());
504 continue;
505 }
506 };
507
508 let view = output.view.into_static();
509 let uri = view.uri.as_ref().to_string();
510 if seen.insert(uri) {
511 out.push(view);
512 recovered += 1;
513 }
514 }
515
516 recovered
517}
518
519pub async fn get_timeline(cursor: Option<String>, limit: u32, state: &AppState) -> Result<serde_json::Value> {
520 let session = get_session(state).await?;
521 let mut req = GetTimeline::new().limit(limit as i64);
522 if let Some(c) = &cursor {
523 req = req.cursor(Some(c.into()));
524 }
525
526 let output = session
527 .send(req.build())
528 .await
529 .map_err(|error| {
530 log::error!("getTimeline error: {error}");
531 AppError::validation("getTimeline")
532 })?
533 .into_output()
534 .map_err(|error| {
535 log::error!("getTimeline output error: {error}");
536 AppError::validation("getTimeline output")
537 })?;
538
539 serde_json::to_value(&output).map_err(AppError::from)
540}
541
542pub async fn get_feed(uri: String, cursor: Option<String>, limit: u32, state: &AppState) -> Result<serde_json::Value> {
543 let session = get_session(state).await?;
544 let feed_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid feed URI"))?;
545 let mut req = GetFeed::new().feed(feed_uri).limit(limit as i64);
546 if let Some(c) = &cursor {
547 req = req.cursor(Some(c.into()));
548 }
549
550 let output = session
551 .send(req.build())
552 .await
553 .map_err(|error| {
554 log::error!("getFeed error: {error}");
555 AppError::validation("getFeed")
556 })?
557 .into_output()
558 .map_err(|error| {
559 log::error!("getFeed output error: {error}");
560 AppError::validation("getFeed output")
561 })?;
562
563 serde_json::to_value(&output).map_err(AppError::from)
564}
565
566pub async fn get_list_feed(
567 uri: String, cursor: Option<String>, limit: u32, state: &AppState,
568) -> Result<serde_json::Value> {
569 let session = get_session(state).await?;
570 let list_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid list URI"))?;
571 let mut req = GetListFeed::new().list(list_uri).limit(limit as i64);
572 if let Some(c) = &cursor {
573 req = req.cursor(Some(c.into()));
574 }
575
576 let output = session
577 .send(req.build())
578 .await
579 .map_err(|error| {
580 log::error!("getListFeed error: {error}");
581 AppError::validation("getListFeed")
582 })?
583 .into_output()
584 .map_err(|error| {
585 log::error!("getListFeed output error: {error}");
586 AppError::validation("getListFeed output")
587 })?;
588
589 serde_json::to_value(&output).map_err(AppError::from)
590}
591
592pub async fn get_post_thread(uri: String, state: &AppState) -> Result<serde_json::Value> {
593 let session = get_session(state).await?;
594 let post_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?;
595
596 let output = session
597 .send(GetPostThread::new().uri(post_uri).build())
598 .await
599 .map_err(|error| {
600 log::error!("getPostThread error: {error}");
601 AppError::validation("getPostThread")
602 })?
603 .into_output()
604 .map_err(|error| {
605 log::error!("getPostThread output error: {error}");
606 AppError::validation("getPostThread output")
607 })?;
608
609 serde_json::to_value(&output).map_err(AppError::from)
610}
611
612pub async fn get_profile(actor: String, state: &AppState) -> Result<serde_json::Value> {
613 let session = get_session(state).await?;
614 let requested_actor = actor.trim().to_string();
615 let (did, handle) = requested_actor_hints(&requested_actor);
616 let actor = parse_actor_identifier(&actor)?;
617
618 let output = match session.send(GetProfile::new().actor(actor).build()).await {
619 Ok(output) => output,
620 Err(error) => {
621 log::error!("getProfile error: {error}");
622 if let Some(reason) = classify_actor_unavailability(&error) {
623 return serde_json::to_value(ProfileLookupResult::Unavailable {
624 requested_actor,
625 did,
626 handle,
627 reason,
628 message: actor_unavailable_message(reason).to_string(),
629 })
630 .map_err(AppError::from);
631 }
632
633 return Err(AppError::validation("Couldn't load this profile right now."));
634 }
635 };
636 let output = match output.into_output() {
637 Ok(output) => output,
638 Err(error) => {
639 log::error!("getProfile output error: {error}");
640 if let Some(reason) = classify_actor_unavailability(&error) {
641 return serde_json::to_value(ProfileLookupResult::Unavailable {
642 requested_actor,
643 did,
644 handle,
645 reason,
646 message: actor_unavailable_message(reason).to_string(),
647 })
648 .map_err(AppError::from);
649 }
650
651 return Err(AppError::validation("Couldn't load this profile right now."));
652 }
653 };
654
655 serde_json::to_value(ProfileLookupResult::Available { profile: serde_json::to_value(output.value)? })
656 .map_err(AppError::from)
657}
658
659pub async fn get_author_feed(
660 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState,
661) -> Result<serde_json::Value> {
662 let session = get_session(state).await?;
663 let actor = parse_actor_identifier(&actor)?;
664 let mut req = GetAuthorFeed::new().actor(actor).limit(limit.map(|value| value as i64));
665 if let Some(c) = &cursor {
666 req = req.cursor(Some(c.as_str().into()));
667 }
668
669 let output = session
670 .send(req.build())
671 .await
672 .map_err(|error| {
673 log::error!("getAuthorFeed error: {error}");
674 AppError::validation("getAuthorFeed")
675 })?
676 .into_output()
677 .map_err(|error| {
678 log::error!("getAuthorFeed output error: {error}");
679 AppError::validation("getAuthorFeed output")
680 })?;
681
682 serde_json::to_value(&output).map_err(AppError::from)
683}
684
685pub async fn get_actor_likes(
686 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState,
687) -> Result<serde_json::Value> {
688 let session = get_session(state).await?;
689 let actor = parse_actor_identifier(&actor)?;
690 let mut req = GetActorLikes::new().actor(actor).limit(limit.map(|value| value as i64));
691 if let Some(c) = &cursor {
692 req = req.cursor(Some(c.as_str().into()));
693 }
694
695 let output = session
696 .send(req.build())
697 .await
698 .map_err(|error| {
699 log::error!("getActorLikes error: {error}");
700 AppError::validation("getActorLikes")
701 })?
702 .into_output()
703 .map_err(|error| {
704 log::error!("getActorLikes output error: {error}");
705 AppError::validation("getActorLikes output")
706 })?;
707
708 serde_json::to_value(&output).map_err(AppError::from)
709}
710
711pub async fn create_post(
712 text: String, reply_to: Option<ReplyRefInput>, embed: Option<EmbedInput>, state: &AppState,
713) -> Result<CreateRecordResult> {
714 if text.trim().is_empty() && embed.is_none() {
715 return Err(AppError::validation("post requires text or embed"));
716 }
717
718 let session = get_session(state).await?;
719 let did = active_did(state)?;
720
721 let resolver = JacquardResolver::default();
722 let rich = richtext::parse(&text).build_async(&resolver).await.map_err(|error| {
723 log::error!("richtext parse error: {error}");
724 AppError::validation("failed to parse post text")
725 })?;
726
727 let mut post = Post::new().text(rich.text).created_at(Datetime::now());
728
729 if let Some(facets) = rich.facets {
730 post = post.facets(facets);
731 }
732
733 if let Some(reply) = reply_to {
734 let reply_ref = ReplyRef::new()
735 .parent(strong_ref_from_input(&reply.parent)?)
736 .root(strong_ref_from_input(&reply.root)?)
737 .build();
738 post = post.reply(reply_ref);
739 }
740
741 if let Some(embed) = embed {
742 post = post.embed(post_embed_from_input(embed)?);
743 }
744
745 let record_json = serde_json::to_value(post.build())?;
746 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize post"))?;
747
748 let repo = AtIdentifier::Did(Did::new(&did)?);
749 let collection = Nsid::new("app.bsky.feed.post").map_err(|_| AppError::validation("nsid"))?;
750
751 let output = session
752 .send(
753 CreateRecord::new()
754 .repo(repo)
755 .collection(collection)
756 .record(record_data)
757 .build(),
758 )
759 .await
760 .map_err(|error| {
761 log::error!("createRecord (post) error: {error}");
762 AppError::validation("failed to create record (post)")
763 })?
764 .into_output()
765 .map_err(|error| {
766 log::error!("createRecord (post) output error: {error}");
767 AppError::validation("failed to create record (post) output")
768 })?;
769
770 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() })
771}
772
773pub async fn like_post(uri: String, cid: String, state: &AppState) -> Result<CreateRecordResult> {
774 let session = get_session(state).await?;
775 let did = active_did(state)?;
776
777 let subject = StrongRef::new()
778 .uri(AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?)
779 .cid(Cid::str(&cid))
780 .build();
781
782 let like = Like::new().created_at(Datetime::now()).subject(subject).build();
783
784 let record_json = serde_json::to_value(&like)?;
785 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize like"))?;
786
787 let repo = AtIdentifier::Did(Did::new(&did)?);
788 let collection = Nsid::new("app.bsky.feed.like").map_err(|_| AppError::validation("nsid"))?;
789
790 let output = session
791 .send(
792 CreateRecord::new()
793 .repo(repo)
794 .collection(collection)
795 .record(record_data)
796 .build(),
797 )
798 .await
799 .map_err(|error| {
800 log::error!("createRecord (like) error: {error}");
801 AppError::validation("failed to create record (like)")
802 })?
803 .into_output()
804 .map_err(|error| {
805 log::error!("createRecord (like) output error: {error}");
806 AppError::validation("failed to create record (like) output")
807 })?;
808
809 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() })
810}
811
812pub async fn unlike_post(like_uri: String, state: &AppState) -> Result<()> {
813 let session = get_session(state).await?;
814 let did = active_did(state)?;
815
816 let at_uri = AtUri::new(&like_uri).map_err(|_| AppError::validation("invalid like URI"))?;
817 let RecordKey(rkey) = at_uri
818 .rkey()
819 .ok_or_else(|| AppError::Validation("like URI has no rkey".into()))?;
820 let rkey_str = rkey.to_string();
821
822 let repo = AtIdentifier::Did(Did::new(&did)?);
823 let collection = Nsid::new("app.bsky.feed.like").map_err(|_| AppError::validation("nsid"))?;
824 let rkey = RecordKey::any(&rkey_str).map_err(|_| AppError::validation("invalid rkey"))?;
825
826 session
827 .send(DeleteRecord::new().repo(repo).collection(collection).rkey(rkey).build())
828 .await
829 .map_err(|error| {
830 log::error!("deleteRecord (unlike) error: {error}");
831 AppError::validation("failed to delete record (unlike)")
832 })?
833 .into_output()
834 .map_err(|error| {
835 log::error!("deleteRecord (unlike) output error: {error}");
836 AppError::validation("failed to delete record (unlike) output")
837 })?;
838
839 Ok(())
840}
841
842pub async fn repost(uri: String, cid: String, state: &AppState) -> Result<CreateRecordResult> {
843 let session = get_session(state).await?;
844 let did = active_did(state)?;
845
846 let subject = StrongRef::new()
847 .uri(AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?)
848 .cid(Cid::str(&cid))
849 .build();
850
851 let repost = Repost::new().created_at(Datetime::now()).subject(subject).build();
852
853 let record_json = serde_json::to_value(&repost)?;
854 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize repost"))?;
855
856 let repo = AtIdentifier::Did(Did::new(&did)?);
857 let collection = Nsid::new("app.bsky.feed.repost").map_err(|_| AppError::validation("nsid"))?;
858
859 let output = session
860 .send(
861 CreateRecord::new()
862 .repo(repo)
863 .collection(collection)
864 .record(record_data)
865 .build(),
866 )
867 .await
868 .map_err(|error| {
869 log::error!("createRecord (repost) error: {error}");
870 AppError::validation("failed to create record (repost)")
871 })?
872 .into_output()
873 .map_err(|error| {
874 log::error!("createRecord (repost) output error: {error}");
875 AppError::validation("failed to create record (repost) output")
876 })?;
877
878 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() })
879}
880
881pub async fn unrepost(repost_uri: String, state: &AppState) -> Result<()> {
882 let session = get_session(state).await?;
883 let did = active_did(state)?;
884
885 let at_uri = AtUri::new(&repost_uri).map_err(|_| AppError::validation("invalid repost URI"))?;
886 let RecordKey(rkey) = at_uri
887 .rkey()
888 .ok_or_else(|| AppError::Validation("repost URI has no rkey".into()))?;
889
890 let rkey_str = rkey.to_string();
891
892 let repo = AtIdentifier::Did(Did::new(&did)?);
893 let collection = Nsid::new("app.bsky.feed.repost").map_err(|_| AppError::validation("nsid"))?;
894 let rkey = RecordKey::any(&rkey_str).map_err(|_| AppError::validation("invalid rkey"))?;
895
896 session
897 .send(DeleteRecord::new().repo(repo).collection(collection).rkey(rkey).build())
898 .await
899 .map_err(|error| {
900 log::error!("deleteRecord (unrepost) error: {error}");
901 AppError::validation("failed to delete record (unrepost)")
902 })?
903 .into_output()
904 .map_err(|error| {
905 log::error!("deleteRecord (unrepost) output error: {error}");
906 AppError::validation("failed to delete record (unrepost) output")
907 })?;
908
909 Ok(())
910}
911
912pub async fn bookmark_post(uri: String, cid: String, state: &AppState) -> Result<()> {
913 let session = get_session(state).await?;
914 let post_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?;
915
916 let response = session
917 .send(CreateBookmark::new().uri(post_uri).cid(Cid::str(&cid)).build())
918 .await
919 .map_err(|error| {
920 log::error!("createBookmark error: {error}");
921 AppError::validation("Could not save this post.")
922 })?;
923
924 if accepts_empty_bookmark_response(response.status(), response.buffer()) {
925 return Ok(());
926 }
927
928 response.into_output().map_err(|error| {
929 log::error!("createBookmark output error: {error}");
930 AppError::validation("Could not save this post.")
931 })?;
932
933 Ok(())
934}
935
936pub async fn remove_bookmark(uri: String, state: &AppState) -> Result<()> {
937 let session = get_session(state).await?;
938 let post_uri = AtUri::new(&uri).map_err(|_| AppError::validation("invalid post URI"))?;
939
940 let response = session
941 .send(DeleteBookmark::new().uri(post_uri).build())
942 .await
943 .map_err(|error| {
944 log::error!("deleteBookmark error: {error}");
945 AppError::validation("Could not remove this saved post.")
946 })?;
947
948 if accepts_empty_bookmark_response(response.status(), response.buffer()) {
949 return Ok(());
950 }
951
952 response.into_output().map_err(|error| {
953 log::error!("deleteBookmark output error: {error}");
954 AppError::validation("Could not remove this saved post.")
955 })?;
956
957 Ok(())
958}
959
960pub async fn follow_actor(did: String, state: &AppState) -> Result<CreateRecordResult> {
961 let session = get_session(state).await?;
962 let active_did = active_did(state)?;
963
964 let follow = Follow::new()
965 .created_at(Datetime::now())
966 .subject(Did::new(&did)?)
967 .build();
968
969 let record_json = serde_json::to_value(&follow)?;
970 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize follow"))?;
971
972 let repo = AtIdentifier::Did(Did::new(&active_did)?);
973 let collection = Nsid::new("app.bsky.graph.follow").map_err(|_| AppError::validation("nsid"))?;
974
975 let output = session
976 .send(
977 CreateRecord::new()
978 .repo(repo)
979 .collection(collection)
980 .record(record_data)
981 .build(),
982 )
983 .await
984 .map_err(|error| {
985 log::error!("createRecord (follow) error: {error}");
986 AppError::validation("Could not follow this account.")
987 })?
988 .into_output()
989 .map_err(|error| {
990 log::error!("createRecord (follow) output error: {error}");
991 AppError::validation("Could not follow this account.")
992 })?;
993
994 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() })
995}
996
997pub async fn block_actor(did: String, state: &AppState) -> Result<CreateRecordResult> {
998 let session = get_session(state).await?;
999 let active_did = active_did(state)?;
1000
1001 let block = Block::new()
1002 .created_at(Datetime::now())
1003 .subject(Did::new(&did).map_err(|_| AppError::validation("invalid account DID"))?)
1004 .build();
1005
1006 let record_json = serde_json::to_value(&block)?;
1007 let record_data = Data::from_json_owned(record_json).map_err(|_| AppError::validation("serialize block"))?;
1008
1009 let repo = AtIdentifier::Did(Did::new(&active_did)?);
1010 let collection = Nsid::new("app.bsky.graph.block").map_err(|_| AppError::validation("nsid"))?;
1011
1012 let output = session
1013 .send(
1014 CreateRecord::new()
1015 .repo(repo)
1016 .collection(collection)
1017 .record(record_data)
1018 .build(),
1019 )
1020 .await
1021 .map_err(|error| {
1022 log::error!("createRecord (block) error: {error}");
1023 AppError::validation("Could not block this account.")
1024 })?
1025 .into_output()
1026 .map_err(|error| {
1027 log::error!("createRecord (block) output error: {error}");
1028 AppError::validation("Could not block this account.")
1029 })?;
1030
1031 Ok(CreateRecordResult { uri: output.uri.to_string(), cid: output.cid.to_string() })
1032}
1033
1034pub async fn unfollow_actor(follow_uri: String, state: &AppState) -> Result<()> {
1035 let session = get_session(state).await?;
1036 let did = active_did(state)?;
1037
1038 let at_uri = AtUri::new(&follow_uri).map_err(|_| AppError::validation("invalid follow URI"))?;
1039 let RecordKey(rkey) = at_uri
1040 .rkey()
1041 .ok_or_else(|| AppError::Validation("follow URI has no rkey".into()))?;
1042 let rkey_str = rkey.to_string();
1043
1044 let repo = AtIdentifier::Did(Did::new(&did)?);
1045 let collection = Nsid::new("app.bsky.graph.follow").map_err(|_| AppError::validation("nsid"))?;
1046 let rkey = RecordKey::any(&rkey_str).map_err(|_| AppError::validation("invalid rkey"))?;
1047
1048 session
1049 .send(DeleteRecord::new().repo(repo).collection(collection).rkey(rkey).build())
1050 .await
1051 .map_err(|error| {
1052 log::error!("deleteRecord (unfollow) error: {error}");
1053 AppError::validation("Could not unfollow this account.")
1054 })?
1055 .into_output()
1056 .map_err(|error| {
1057 log::error!("deleteRecord (unfollow) output error: {error}");
1058 AppError::validation("Could not unfollow this account.")
1059 })?;
1060
1061 Ok(())
1062}
1063
1064pub async fn get_followers(
1065 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState,
1066) -> Result<serde_json::Value> {
1067 let session = get_session(state).await?;
1068 let actor = parse_actor_identifier(&actor)?;
1069 let mut req = GetFollowers::new().actor(actor).limit(limit.map(|value| value as i64));
1070 if let Some(c) = &cursor {
1071 req = req.cursor(Some(c.as_str().into()));
1072 }
1073
1074 let output = session
1075 .send(req.build())
1076 .await
1077 .map_err(|error| {
1078 log::error!("getFollowers error: {error}");
1079 AppError::validation("Could not load followers.")
1080 })?
1081 .into_output()
1082 .map_err(|error| {
1083 log::error!("getFollowers output error: {error}");
1084 AppError::validation("Could not load followers.")
1085 })?;
1086
1087 serde_json::to_value(&output).map_err(AppError::from)
1088}
1089
1090pub async fn get_follows(
1091 actor: String, cursor: Option<String>, limit: Option<u32>, state: &AppState,
1092) -> Result<serde_json::Value> {
1093 let session = get_session(state).await?;
1094 let actor = parse_actor_identifier(&actor)?;
1095 let mut req = GetFollows::new().actor(actor).limit(limit.map(|value| value as i64));
1096 if let Some(c) = &cursor {
1097 req = req.cursor(Some(c.as_str().into()));
1098 }
1099
1100 let output = session
1101 .send(req.build())
1102 .await
1103 .map_err(|error| {
1104 log::error!("getFollows error: {error}");
1105 AppError::validation("Could not load following.")
1106 })?
1107 .into_output()
1108 .map_err(|error| {
1109 log::error!("getFollows output error: {error}");
1110 AppError::validation("Could not load following.")
1111 })?;
1112
1113 serde_json::to_value(&output).map_err(AppError::from)
1114}
1115
1116pub async fn audit_follows(app: &AppHandle, state: &AppState) -> Result<Vec<FlaggedFollow>> {
1117 let session = get_session(state).await?;
1118 let active_did = active_did(state)?;
1119 let follow_records = list_follow_records_for_audit(&session, &active_did).await?;
1120 if follow_records.is_empty() {
1121 return Ok(Vec::new());
1122 }
1123
1124 let dids = follow_records
1125 .iter()
1126 .map(|record| record.did.clone())
1127 .collect::<Vec<_>>();
1128 let unique_dids = dedupe_preserve_order(dids);
1129 let follow_statuses = resolve_follow_statuses(&session, app, &active_did, unique_dids).await?;
1130
1131 Ok(follow_records
1132 .into_iter()
1133 .filter_map(|record| {
1134 follow_statuses
1135 .get(&record.did)
1136 .map(|status| build_flagged_follow(record, status.clone()))
1137 })
1138 .collect())
1139}
1140
1141pub async fn batch_unfollow(follow_uris: Vec<String>, state: &AppState) -> Result<BatchResult> {
1142 let session = get_session(state).await?;
1143 let active_did = active_did(state)?;
1144
1145 if follow_uris.is_empty() {
1146 return Ok(BatchResult { deleted: 0, failed: Vec::new() });
1147 }
1148
1149 let mut targets = Vec::new();
1150 let mut failed = Vec::new();
1151
1152 for uri in follow_uris {
1153 match parse_follow_delete_target(&uri) {
1154 Ok(target) => targets.push(target),
1155 Err(reason) => {
1156 log::warn!("skipping invalid follow URI for batch unfollow: {uri} ({reason})");
1157 failed.push(uri);
1158 }
1159 }
1160 }
1161
1162 let mut deleted = 0usize;
1163 for chunk in targets.chunks(FOLLOW_UNFOLLOW_WRITE_CHUNK_SIZE) {
1164 let (writes, chunk_uris, chunk_failed) = build_delete_writes(chunk);
1165 failed.extend(chunk_failed);
1166
1167 if writes.is_empty() {
1168 continue;
1169 }
1170
1171 match send_apply_writes_chunk_with_retry(&session, &active_did, writes).await {
1172 Ok(output) => {
1173 let (chunk_deleted, chunk_failures) = summarize_apply_writes_result(&chunk_uris, &output);
1174 deleted += chunk_deleted;
1175 failed.extend(chunk_failures);
1176 }
1177 Err(error) => {
1178 log::warn!(
1179 "applyWrites failed for unfollow batch ({} items): {error}",
1180 chunk_uris.len()
1181 );
1182 failed.extend(chunk_uris);
1183 }
1184 }
1185 }
1186
1187 Ok(BatchResult { deleted, failed })
1188}
1189
1190async fn list_follow_records_for_audit(
1191 session: &Arc<LazuriteOAuthSession>, active_did: &str,
1192) -> Result<Vec<FollowRecordEntry>> {
1193 let mut records = Vec::new();
1194 let mut cursor = None;
1195
1196 loop {
1197 let output = list_follow_records_page_with_retry(session, active_did, cursor.clone()).await?;
1198 for record in output.records {
1199 if let Some(entry) = follow_record_entry_from_list_record(&record) {
1200 records.push(entry);
1201 }
1202 }
1203
1204 cursor = output.cursor.map(|value| value.to_string());
1205 if cursor.is_none() {
1206 break;
1207 }
1208
1209 sleep(FOLLOW_AUDIT_INTER_BATCH_DELAY).await;
1210 }
1211
1212 Ok(records)
1213}
1214
1215async fn list_follow_records_page_with_retry(
1216 session: &Arc<LazuriteOAuthSession>, active_did: &str, cursor: Option<String>,
1217) -> Result<ListRecordsOutput<'static>> {
1218 let repo = AtIdentifier::Did(Did::new(active_did)?.into_static());
1219 let collection = Nsid::new(FOLLOW_COLLECTION_NSID)
1220 .map_err(|_| AppError::validation("invalid follow collection NSID"))?
1221 .into_static();
1222 let mut retries = 0usize;
1223
1224 loop {
1225 let response = session
1226 .send(
1227 ListRecords::new()
1228 .repo(repo.clone())
1229 .collection(collection.clone())
1230 .limit(FOLLOW_AUDIT_PAGE_LIMIT)
1231 .maybe_cursor(cursor.as_deref().map(Into::into))
1232 .build(),
1233 )
1234 .await
1235 .map_err(|error| {
1236 log::error!("follow hygiene listRecords request failed: {error}");
1237 AppError::validation("Couldn't scan your follows right now.")
1238 })?;
1239
1240 if response.status() == StatusCode::TOO_MANY_REQUESTS {
1241 retries += 1;
1242 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES {
1243 log::warn!("follow hygiene listRecords exceeded max rate-limit retries");
1244 return Err(AppError::validation("Couldn't scan your follows right now."));
1245 }
1246
1247 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT);
1248 log::warn!(
1249 "follow hygiene listRecords rate-limited (attempt {retries}/{}), retrying in {}s",
1250 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES,
1251 delay.as_secs()
1252 );
1253 sleep(delay).await;
1254 continue;
1255 }
1256
1257 return response.into_output().map(IntoStatic::into_static).map_err(|error| {
1258 log::error!("follow hygiene listRecords output decode failed: {error}");
1259 AppError::validation("Couldn't scan your follows right now.")
1260 });
1261 }
1262}
1263
1264async fn resolve_follow_statuses(
1265 session: &Arc<LazuriteOAuthSession>, app: &AppHandle, active_did: &str, dids: Vec<String>,
1266) -> Result<HashMap<String, FollowStatusInfo>> {
1267 let mut resolved = HashMap::new();
1268 if dids.is_empty() {
1269 return Ok(resolved);
1270 }
1271
1272 let chunks = dids
1273 .chunks(FOLLOW_AUDIT_PROFILE_BATCH_SIZE)
1274 .map(|chunk| chunk.to_vec())
1275 .collect::<Vec<_>>();
1276 let total_batches = chunks.len();
1277 let semaphore = Arc::new(Semaphore::new(FOLLOW_AUDIT_PROFILE_BATCH_CONCURRENCY));
1278 let mut join_set = JoinSet::new();
1279
1280 for did_chunk in chunks {
1281 let session = session.clone();
1282 let semaphore = semaphore.clone();
1283 join_set.spawn(async move {
1284 let _permit = semaphore.acquire_owned().await.map_err(|error| {
1285 log::error!("follow hygiene semaphore acquisition failed: {error}");
1286 AppError::validation("Couldn't scan your follows right now.")
1287 })?;
1288 let profiles = get_profiles_batch_with_retry(&session, &did_chunk).await?;
1289 sleep(FOLLOW_AUDIT_INTER_BATCH_DELAY).await;
1290 Ok::<(Vec<String>, Vec<ProfileViewDetailed<'static>>), AppError>((did_chunk, profiles))
1291 });
1292 }
1293
1294 let mut missing = dids.into_iter().collect::<HashSet<_>>();
1295 let mut completed = 0usize;
1296
1297 while let Some(joined) = join_set.join_next().await {
1298 let (requested_dids, profiles) = joined.map_err(|error| {
1299 log::error!("follow hygiene profile batch task failed: {error}");
1300 AppError::validation("Couldn't scan your follows right now.")
1301 })??;
1302 let mut found_dids = HashSet::new();
1303
1304 for profile in profiles {
1305 let did = profile.did.to_string();
1306 found_dids.insert(did.clone());
1307 let status = follow_status_from_profile(&profile, active_did);
1308 if status != 0 {
1309 resolved.insert(
1310 did.clone(),
1311 FollowStatusInfo { handle: profile.handle.to_string(), status },
1312 );
1313 }
1314 missing.remove(&did);
1315 }
1316
1317 for did in requested_dids {
1318 if !found_dids.contains(&did) {
1319 missing.insert(did);
1320 } else {
1321 missing.remove(&did);
1322 }
1323 }
1324
1325 completed += 1;
1326 app.emit(
1327 FOLLOW_HYGIENE_PROGRESS_EVENT,
1328 FollowHygieneProgress {
1329 batch_size: FOLLOW_AUDIT_PROFILE_BATCH_SIZE,
1330 current: completed,
1331 total: total_batches,
1332 },
1333 )?;
1334 }
1335
1336 for did in dedupe_preserve_order(missing.into_iter().collect()) {
1337 if let Some(status) = resolve_missing_follow_status(session, &did, active_did).await {
1338 resolved.insert(did, status);
1339 }
1340 }
1341
1342 Ok(resolved)
1343}
1344
1345async fn get_profiles_batch_with_retry(
1346 session: &Arc<LazuriteOAuthSession>, dids: &[String],
1347) -> Result<Vec<ProfileViewDetailed<'static>>> {
1348 let actors = dids
1349 .iter()
1350 .map(|did| Did::new(did).map(|parsed| AtIdentifier::Did(parsed.into_static())))
1351 .collect::<std::result::Result<Vec<_>, _>>()?;
1352 let mut retries = 0usize;
1353
1354 loop {
1355 let response = session
1356 .send(GetProfiles::new().actors(actors.clone()).build())
1357 .await
1358 .map_err(|error| {
1359 log::error!("follow hygiene getProfiles request failed: {error}");
1360 AppError::validation("Couldn't scan your follows right now.")
1361 })?;
1362
1363 if response.status() == StatusCode::TOO_MANY_REQUESTS {
1364 retries += 1;
1365 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES {
1366 log::warn!("follow hygiene getProfiles exceeded max rate-limit retries");
1367 return Err(AppError::validation("Couldn't scan your follows right now."));
1368 }
1369
1370 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT);
1371 log::warn!(
1372 "follow hygiene getProfiles rate-limited (attempt {retries}/{}), retrying in {}s",
1373 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES,
1374 delay.as_secs()
1375 );
1376 sleep(delay).await;
1377 continue;
1378 }
1379
1380 let output = response.into_output().map_err(|error| {
1381 log::error!("follow hygiene getProfiles output decode failed: {error}");
1382 AppError::validation("Couldn't scan your follows right now.")
1383 })?;
1384 return Ok(output.profiles.into_iter().map(IntoStatic::into_static).collect());
1385 }
1386}
1387
1388async fn resolve_missing_follow_status(
1389 session: &Arc<LazuriteOAuthSession>, did: &str, active_did: &str,
1390) -> Option<FollowStatusInfo> {
1391 let did_value = Did::new(did).ok()?.into_static();
1392 let self_follow = if did == active_did { FOLLOW_STATUS_SELF_FOLLOW } else { 0 };
1393
1394 match get_profile_for_did_with_retry(session, &did_value).await {
1395 Ok(profile) => {
1396 let status = follow_status_from_profile(&profile, active_did);
1397 if status == 0 {
1398 None
1399 } else {
1400 Some(FollowStatusInfo { handle: profile.handle.to_string(), status })
1401 }
1402 }
1403 Err(error_message) => {
1404 let mut status = follow_status_from_unavailability_reason(classify_actor_unavailability(&error_message));
1405 status |= self_follow;
1406
1407 if status == 0 {
1408 log::warn!("follow hygiene missing DID fallback unclassified for {did}: {error_message}");
1409 return None;
1410 }
1411
1412 let handle = resolve_handle_from_did_document(session, &did_value)
1413 .await
1414 .unwrap_or_else(|| did.to_string());
1415 Some(FollowStatusInfo { handle, status })
1416 }
1417 }
1418}
1419
1420async fn get_profile_for_did_with_retry(
1421 session: &Arc<LazuriteOAuthSession>, did: &Did<'_>,
1422) -> std::result::Result<ProfileViewDetailed<'static>, String> {
1423 let actor = AtIdentifier::Did(did.clone().into_static());
1424 let mut retries = 0usize;
1425
1426 loop {
1427 let response = session
1428 .send(GetProfile::new().actor(actor.clone()).build())
1429 .await
1430 .map_err(|error| error.to_string())?;
1431
1432 if response.status() == StatusCode::TOO_MANY_REQUESTS {
1433 retries += 1;
1434 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES {
1435 return Err("rate limit retries exhausted".into());
1436 }
1437
1438 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT);
1439 log::warn!(
1440 "follow hygiene getProfile rate-limited for {} (attempt {retries}/{}), retrying in {}s",
1441 did.as_ref(),
1442 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES,
1443 delay.as_secs()
1444 );
1445 sleep(delay).await;
1446 continue;
1447 }
1448
1449 let output = response.into_output().map_err(|error| error.to_string())?;
1450 return Ok(output.value.into_static());
1451 }
1452}
1453
1454async fn resolve_handle_from_did_document(session: &Arc<LazuriteOAuthSession>, did: &Did<'_>) -> Option<String> {
1455 let did_doc = session.resolve_did_doc(did).await.ok()?.into_owned().ok()?;
1456
1457 did_doc.also_known_as.as_ref().and_then(|aliases| {
1458 aliases.iter().find_map(|alias| {
1459 alias
1460 .as_ref()
1461 .strip_prefix("at://")
1462 .and_then(|candidate| Handle::new(candidate).ok().map(|handle| handle.to_string()))
1463 })
1464 })
1465}
1466
1467fn follow_status_from_unavailability_reason(reason: Option<ActorAvailabilityReason>) -> u8 {
1468 match reason {
1469 Some(ActorAvailabilityReason::NotFound) => FOLLOW_STATUS_DELETED,
1470 Some(ActorAvailabilityReason::Deactivated) => FOLLOW_STATUS_DEACTIVATED,
1471 Some(ActorAvailabilityReason::Suspended) => FOLLOW_STATUS_SUSPENDED,
1472 _ => 0,
1473 }
1474}
1475
1476fn follow_status_from_profile(profile: &ProfileViewDetailed<'_>, active_did: &str) -> u8 {
1477 let mut status = 0u8;
1478
1479 if profile.did.as_ref() == active_did {
1480 status |= FOLLOW_STATUS_SELF_FOLLOW;
1481 }
1482
1483 if profile
1484 .viewer
1485 .as_ref()
1486 .and_then(|viewer| viewer.blocked_by)
1487 .unwrap_or(false)
1488 {
1489 status |= FOLLOW_STATUS_BLOCKED_BY;
1490 }
1491
1492 let is_blocking = profile
1493 .viewer
1494 .as_ref()
1495 .and_then(|viewer| viewer.blocking.as_ref())
1496 .is_some()
1497 || profile
1498 .viewer
1499 .as_ref()
1500 .and_then(|viewer| viewer.blocking_by_list.as_ref())
1501 .is_some();
1502 if is_blocking {
1503 status |= FOLLOW_STATUS_BLOCKING;
1504 }
1505
1506 if has_active_hide_label(profile.labels.as_deref()) {
1507 status |= FOLLOW_STATUS_HIDDEN;
1508 }
1509
1510 status
1511}
1512
1513fn has_active_hide_label(labels: Option<&[Label<'_>]>) -> bool {
1514 labels.is_some_and(|labels| {
1515 labels
1516 .iter()
1517 .any(|label| label.val.as_ref() == "!hide" && !label.neg.unwrap_or(false))
1518 })
1519}
1520
1521fn build_flagged_follow(record: FollowRecordEntry, status: FollowStatusInfo) -> FlaggedFollow {
1522 FlaggedFollow {
1523 did: record.did,
1524 handle: status.handle,
1525 follow_uri: record.follow_uri,
1526 status: status.status,
1527 status_label: follow_status_label(status.status),
1528 }
1529}
1530
1531fn follow_status_label(status: u8) -> String {
1532 if status == 0 {
1533 return "Unknown".to_string();
1534 }
1535
1536 let mut labels = Vec::new();
1537
1538 if status & FOLLOW_STATUS_DELETED != 0 {
1539 labels.push("Deleted");
1540 }
1541 if status & FOLLOW_STATUS_DEACTIVATED != 0 {
1542 labels.push("Deactivated");
1543 }
1544 if status & FOLLOW_STATUS_SUSPENDED != 0 {
1545 labels.push("Suspended");
1546 }
1547
1548 let has_blocked_by = status & FOLLOW_STATUS_BLOCKED_BY != 0;
1549 let has_blocking = status & FOLLOW_STATUS_BLOCKING != 0;
1550 if has_blocked_by && has_blocking {
1551 labels.push("Mutual Block");
1552 } else if has_blocked_by {
1553 labels.push("Blocked By");
1554 } else if has_blocking {
1555 labels.push("Blocking");
1556 }
1557
1558 if status & FOLLOW_STATUS_HIDDEN != 0 {
1559 labels.push("Hidden");
1560 }
1561 if status & FOLLOW_STATUS_SELF_FOLLOW != 0 {
1562 labels.push("Self-Follow");
1563 }
1564
1565 labels.join(", ")
1566}
1567
1568fn follow_record_entry_from_list_record(record: &RepoListRecord<'_>) -> Option<FollowRecordEntry> {
1569 let follow_uri = record.uri.to_string();
1570 let did = match record.value.get_at_path("subject").and_then(Data::as_str) {
1571 Some(subject) => match Did::new(subject) {
1572 Ok(did) => did.to_string(),
1573 Err(error) => {
1574 log::warn!("follow hygiene skipped invalid follow subject DID in {follow_uri}: {error}");
1575 return None;
1576 }
1577 },
1578 None => {
1579 log::warn!("follow hygiene skipped follow record with missing subject in {follow_uri}");
1580 return None;
1581 }
1582 };
1583
1584 Some(FollowRecordEntry { did, follow_uri })
1585}
1586
1587fn retry_after_delay(buffer: &[u8]) -> Option<Duration> {
1588 let payload = serde_json::from_slice::<serde_json::Value>(buffer).ok();
1589 if let Some(seconds) = payload
1590 .as_ref()
1591 .and_then(|value| value.get("retryAfter"))
1592 .and_then(serde_json::Value::as_u64)
1593 {
1594 return Some(Duration::from_secs(seconds));
1595 }
1596
1597 let text = payload
1598 .as_ref()
1599 .and_then(|value| value.get("message"))
1600 .and_then(serde_json::Value::as_str)
1601 .or_else(|| std::str::from_utf8(buffer).ok())
1602 .unwrap_or_default();
1603
1604 let lowered = text.to_ascii_lowercase();
1605 for marker in ["retry-after", "retry after", "retry_after"] {
1606 if let Some(index) = lowered.find(marker) {
1607 let seconds = lowered[index..]
1608 .chars()
1609 .skip_while(|ch| !ch.is_ascii_digit())
1610 .take_while(char::is_ascii_digit)
1611 .collect::<String>()
1612 .parse::<u64>()
1613 .ok()?;
1614 return Some(Duration::from_secs(seconds));
1615 }
1616 }
1617
1618 None
1619}
1620
1621fn parse_follow_delete_target(uri: &str) -> std::result::Result<FollowDeleteTarget, &'static str> {
1622 let at_uri = AtUri::new(uri).map_err(|_| "invalid URI")?;
1623 let collection = at_uri.collection().map(|value| value.to_string());
1624 if collection.as_deref() != Some(FOLLOW_COLLECTION_NSID) {
1625 return Err("URI does not point to follow collection");
1626 }
1627
1628 let rkey = at_uri
1629 .rkey()
1630 .map(|value| value.as_ref().to_string())
1631 .ok_or("URI missing rkey")?;
1632
1633 Ok(FollowDeleteTarget { uri: uri.to_string(), rkey })
1634}
1635
1636fn build_delete_writes(
1637 targets: &[FollowDeleteTarget],
1638) -> (Vec<ApplyWritesWritesItem<'static>>, Vec<String>, Vec<String>) {
1639 let mut writes = Vec::with_capacity(targets.len());
1640 let mut chunk_uris = Vec::with_capacity(targets.len());
1641 let mut chunk_failed = Vec::new();
1642 let collection = match Nsid::new(FOLLOW_COLLECTION_NSID) {
1643 Ok(collection) => collection.into_static(),
1644 Err(_) => {
1645 return (
1646 writes,
1647 chunk_uris,
1648 targets.iter().map(|target| target.uri.clone()).collect(),
1649 )
1650 }
1651 };
1652
1653 for target in targets {
1654 let rkey = match RecordKey::any(&target.rkey) {
1655 Ok(rkey) => rkey.into_static(),
1656 Err(error) => {
1657 log::warn!("failed to parse follow rkey from URI {}: {error}", target.uri);
1658 chunk_failed.push(target.uri.clone());
1659 continue;
1660 }
1661 };
1662
1663 writes.push(ApplyWritesWritesItem::Delete(Box::new(
1664 Delete::new().collection(collection.clone()).rkey(rkey).build(),
1665 )));
1666 chunk_uris.push(target.uri.clone());
1667 }
1668
1669 (writes, chunk_uris, chunk_failed)
1670}
1671
1672fn summarize_apply_writes_result(chunk_uris: &[String], output: &ApplyWritesOutput<'_>) -> (usize, Vec<String>) {
1673 let Some(results) = output.results.as_ref() else {
1674 return (chunk_uris.len(), Vec::new());
1675 };
1676
1677 let mut deleted = 0usize;
1678 let mut failed = Vec::new();
1679
1680 for (idx, uri) in chunk_uris.iter().enumerate() {
1681 match results.get(idx) {
1682 Some(ApplyWritesOutputResultsItem::DeleteResult(_)) => deleted += 1,
1683 _ => failed.push(uri.clone()),
1684 }
1685 }
1686
1687 (deleted, failed)
1688}
1689
1690async fn send_apply_writes_chunk_with_retry(
1691 session: &Arc<LazuriteOAuthSession>, active_did: &str, writes: Vec<ApplyWritesWritesItem<'static>>,
1692) -> Result<ApplyWritesOutput<'static>> {
1693 let repo = AtIdentifier::Did(Did::new(active_did)?.into_static());
1694 let mut retries = 0usize;
1695
1696 loop {
1697 let response = session
1698 .send(ApplyWrites::new().repo(repo.clone()).writes(writes.clone()).build())
1699 .await
1700 .map_err(|error| {
1701 log::warn!("follow hygiene applyWrites request failed: {error}");
1702 AppError::validation("Couldn't unfollow selected accounts right now.")
1703 })?;
1704
1705 if response.status() == StatusCode::TOO_MANY_REQUESTS {
1706 retries += 1;
1707 if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES {
1708 return Err(AppError::validation("Couldn't unfollow selected accounts right now."));
1709 }
1710
1711 let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT);
1712 log::warn!(
1713 "follow hygiene applyWrites rate-limited (attempt {retries}/{}), retrying in {}s",
1714 FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES,
1715 delay.as_secs()
1716 );
1717 sleep(delay).await;
1718 continue;
1719 }
1720
1721 return response.into_output().map(IntoStatic::into_static).map_err(|error| {
1722 log::warn!("follow hygiene applyWrites output decode failed: {error}");
1723 AppError::validation("Couldn't unfollow selected accounts right now.")
1724 });
1725 }
1726}
1727
1728fn dedupe_preserve_order(values: Vec<String>) -> Vec<String> {
1729 let mut seen = HashSet::new();
1730 values.into_iter().filter(|value| seen.insert(value.clone())).collect()
1731}
1732
1733fn strong_ref_from_input(input: &StrongRefInput) -> Result<StrongRef<'static>> {
1734 Ok(StrongRef::new()
1735 .uri(
1736 AtUri::new(&input.uri)
1737 .map_err(|_| AppError::validation("invalid URI in StrongRef"))?
1738 .into_static(),
1739 )
1740 .cid(Cid::from(input.cid.clone()).into_static())
1741 .build())
1742}
1743
1744fn post_embed_from_input(input: EmbedInput) -> Result<PostEmbed<'static>> {
1745 match input {
1746 EmbedInput::Record { record } => Ok(PostEmbed::Record(Box::new(
1747 Record::new().record(strong_ref_from_input(&record)?).build(),
1748 ))),
1749 }
1750}
1751
1752#[derive(Debug, Deserialize)]
1753#[serde(rename_all = "camelCase")]
1754pub struct UpdateSavedFeedsInput {
1755 pub feeds: Vec<SavedFeedItem>,
1756}
1757
1758pub async fn update_saved_feeds(input: UpdateSavedFeedsInput, state: &AppState) -> Result<()> {
1759 let session = get_session(state).await?;
1760 let preferences = fetch_preference_items_with_session(&session).await?;
1761 let merged = merge_saved_feeds_preferences(preferences, input.feeds);
1762 store_preference_items(&session, merged).await
1763}
1764
1765pub async fn update_feed_view_pref(pref: FeedViewPrefItem, state: &AppState) -> Result<()> {
1766 let session = get_session(state).await?;
1767 let preferences = fetch_preference_items_with_session(&session).await?;
1768 let merged = merge_feed_view_preferences(preferences, pref);
1769 store_preference_items(&session, merged).await
1770}
1771
1772#[cfg(test)]
1773mod tests {
1774 use super::{
1775 accepts_empty_bookmark_response, accepts_empty_put_preferences_response, build_delete_writes,
1776 follow_status_from_profile, follow_status_label, merge_feed_view_preferences, merge_saved_feeds_preferences,
1777 parse_follow_delete_target, retry_after_delay, summarize_apply_writes_result, user_preferences_from_items,
1778 FeedViewPrefItem, FollowDeleteTarget, SavedFeedItem, FOLLOW_STATUS_BLOCKED_BY, FOLLOW_STATUS_BLOCKING,
1779 FOLLOW_STATUS_HIDDEN, FOLLOW_STATUS_SELF_FOLLOW,
1780 };
1781 use jacquard::api::app_bsky::actor::ProfileViewDetailed;
1782 use jacquard::api::app_bsky::actor::{AdultContentPref, FeedViewPref, PreferencesItem};
1783 use jacquard::api::app_bsky::richtext::facet::FacetFeaturesItem;
1784 use jacquard::api::com_atproto::repo::apply_writes::{
1785 ApplyWritesOutput, ApplyWritesOutputResultsItem, DeleteResult,
1786 };
1787 use jacquard::richtext;
1788 use jacquard::types::aturi::AtUri;
1789 use jacquard::types::did::Did;
1790 use jacquard::types::handle::Handle;
1791 use jacquard::IntoStatic;
1792 use reqwest::StatusCode;
1793 use std::time::Duration;
1794
1795 fn adult_content_pref_item() -> PreferencesItem<'static> {
1796 PreferencesItem::AdultContentPref(Box::new(AdultContentPref::new().enabled(true).build()))
1797 }
1798
1799 fn feed_view_pref_item(feed: &str, hide_reposts: bool) -> PreferencesItem<'static> {
1800 PreferencesItem::FeedViewPref(Box::new(FeedViewPref {
1801 feed: feed.to_owned().into(),
1802 hide_quote_posts: Some(false),
1803 hide_replies: Some(false),
1804 hide_replies_by_like_count: None,
1805 hide_replies_by_unfollowed: Some(true),
1806 hide_reposts: Some(hide_reposts),
1807 extra_data: Default::default(),
1808 }))
1809 }
1810
1811 #[test]
1812 fn merging_saved_feeds_preserves_other_preferences() {
1813 let preferences = vec![adult_content_pref_item(), feed_view_pref_item("following", true)];
1814 let merged = merge_saved_feeds_preferences(
1815 preferences,
1816 vec![SavedFeedItem {
1817 id: "following".into(),
1818 r#type: "timeline".into(),
1819 value: "following".into(),
1820 pinned: true,
1821 }],
1822 );
1823
1824 assert!(merged
1825 .iter()
1826 .any(|item| matches!(item, PreferencesItem::AdultContentPref(_))));
1827 assert!(merged
1828 .iter()
1829 .any(|item| matches!(item, PreferencesItem::FeedViewPref(_))));
1830
1831 let user_preferences = user_preferences_from_items(&merged);
1832 assert_eq!(user_preferences.saved_feeds.len(), 1);
1833 assert_eq!(user_preferences.feed_view_prefs.len(), 1);
1834 assert!(user_preferences.feed_view_prefs[0].hide_reposts);
1835 }
1836
1837 #[test]
1838 fn merging_feed_view_pref_replaces_only_matching_feed() {
1839 let preferences = vec![
1840 adult_content_pref_item(),
1841 feed_view_pref_item("following", true),
1842 feed_view_pref_item("at://feed/custom", false),
1843 ];
1844 let merged = merge_feed_view_preferences(
1845 preferences,
1846 FeedViewPrefItem {
1847 feed: "following".into(),
1848 hide_replies: true,
1849 hide_replies_by_unfollowed: false,
1850 hide_replies_by_like_count: Some(4),
1851 hide_reposts: false,
1852 hide_quote_posts: true,
1853 },
1854 );
1855
1856 let user_preferences = user_preferences_from_items(&merged);
1857 assert_eq!(user_preferences.feed_view_prefs.len(), 2);
1858
1859 let following = user_preferences
1860 .feed_view_prefs
1861 .iter()
1862 .find(|pref| pref.feed == "following")
1863 .expect("following pref should exist");
1864 assert!(!following.hide_reposts);
1865 assert!(following.hide_quote_posts);
1866 assert_eq!(following.hide_replies_by_like_count, Some(4));
1867
1868 let custom = user_preferences
1869 .feed_view_prefs
1870 .iter()
1871 .find(|pref| pref.feed == "at://feed/custom")
1872 .expect("custom pref should exist");
1873 assert!(!custom.hide_quote_posts);
1874 assert!(!custom.hide_replies);
1875 }
1876
1877 #[test]
1878 fn empty_success_put_preferences_response_is_treated_as_valid() {
1879 assert!(accepts_empty_put_preferences_response(StatusCode::OK, b""));
1880 assert!(!accepts_empty_put_preferences_response(StatusCode::OK, b"null"));
1881 assert!(!accepts_empty_put_preferences_response(StatusCode::BAD_REQUEST, b""));
1882 }
1883
1884 #[test]
1885 fn empty_success_bookmark_response_is_treated_as_valid() {
1886 assert!(accepts_empty_bookmark_response(StatusCode::OK, b""));
1887 assert!(!accepts_empty_bookmark_response(StatusCode::OK, b"{}"));
1888 assert!(!accepts_empty_bookmark_response(StatusCode::BAD_REQUEST, b""));
1889 }
1890
1891 #[test]
1892 fn follow_status_label_collapses_mutual_block() {
1893 let status = FOLLOW_STATUS_BLOCKED_BY | FOLLOW_STATUS_BLOCKING | FOLLOW_STATUS_HIDDEN;
1894 assert_eq!(follow_status_label(status), "Mutual Block, Hidden");
1895 }
1896
1897 #[test]
1898 fn follow_status_from_profile_sets_expected_flags() {
1899 let mut viewer = jacquard::api::app_bsky::actor::ViewerState::default();
1900 viewer.blocked_by = Some(true);
1901 viewer.blocking = Some(
1902 AtUri::new("at://did:plc:me/app.bsky.graph.block/abc123")
1903 .expect("uri should parse")
1904 .into_static(),
1905 );
1906
1907 let profile = ProfileViewDetailed::new()
1908 .did(Did::new("did:plc:alice").expect("did should parse").into_static())
1909 .handle(Handle::new("alice.test").expect("handle should parse").into_static())
1910 .viewer(Some(viewer))
1911 .build();
1912
1913 let status = follow_status_from_profile(&profile, "did:plc:alice");
1914
1915 assert_ne!(status & FOLLOW_STATUS_BLOCKED_BY, 0);
1916 assert_ne!(status & FOLLOW_STATUS_BLOCKING, 0);
1917 assert_ne!(status & FOLLOW_STATUS_SELF_FOLLOW, 0);
1918 }
1919
1920 #[test]
1921 fn parse_follow_delete_target_rejects_invalid_inputs() {
1922 assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.graph.follow/abc123").is_ok());
1923 assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.feed.like/abc123").is_err());
1924 assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.graph.follow").is_err());
1925 assert!(parse_follow_delete_target("not-a-uri").is_err());
1926 }
1927
1928 #[test]
1929 fn build_delete_writes_skips_invalid_rkeys() {
1930 let targets = vec![
1931 FollowDeleteTarget { uri: "at://did:plc:alice/app.bsky.graph.follow/abc123".into(), rkey: "abc123".into() },
1932 FollowDeleteTarget { uri: "at://did:plc:alice/app.bsky.graph.follow/bad".into(), rkey: "bad key".into() },
1933 ];
1934
1935 let (writes, chunk_uris, failed) = build_delete_writes(&targets);
1936 assert_eq!(writes.len(), 1);
1937 assert_eq!(chunk_uris, vec!["at://did:plc:alice/app.bsky.graph.follow/abc123"]);
1938 assert_eq!(failed, vec!["at://did:plc:alice/app.bsky.graph.follow/bad"]);
1939 }
1940
1941 #[test]
1942 fn summarize_apply_writes_result_handles_missing_entries_as_failures() {
1943 let output: ApplyWritesOutput<'_> = ApplyWritesOutput {
1944 results: Some(vec![ApplyWritesOutputResultsItem::DeleteResult(Box::new(
1945 DeleteResult::default(),
1946 ))]),
1947 ..Default::default()
1948 };
1949 let chunk_uris = vec![
1950 "at://did:plc:a/app.bsky.graph.follow/1".to_string(),
1951 "at://did:plc:b/app.bsky.graph.follow/2".to_string(),
1952 ];
1953
1954 let (deleted, failed) = summarize_apply_writes_result(&chunk_uris, &output);
1955 assert_eq!(deleted, 1);
1956 assert_eq!(failed, vec!["at://did:plc:b/app.bsky.graph.follow/2"]);
1957 }
1958
1959 #[test]
1960 fn summarize_apply_writes_result_treats_missing_results_as_all_successful() {
1961 let output: ApplyWritesOutput<'_> = ApplyWritesOutput::default();
1962 let chunk_uris = vec![
1963 "at://did:plc:a/app.bsky.graph.follow/1".to_string(),
1964 "at://did:plc:b/app.bsky.graph.follow/2".to_string(),
1965 ];
1966
1967 let (deleted, failed) = summarize_apply_writes_result(&chunk_uris, &output);
1968 assert_eq!(deleted, 2);
1969 assert!(failed.is_empty());
1970 }
1971
1972 #[test]
1973 fn retry_after_delay_reads_numeric_seconds_from_payload() {
1974 let body = br#"{"message":"rate limited, retry after 7 seconds"}"#;
1975 assert_eq!(retry_after_delay(body), Some(Duration::from_secs(7)));
1976
1977 assert_eq!(retry_after_delay(br#"{"message":"slow down"}"#), None);
1978 }
1979
1980 #[test]
1981 fn richtext_parse_converts_markdown_links_into_plain_text_and_link_facets() {
1982 let rich = tokio::runtime::Runtime::new()
1983 .expect("tokio runtime should build")
1984 .block_on(async {
1985 richtext::parse("[example](https://example.com)")
1986 .build_async(&super::JacquardResolver::default())
1987 .await
1988 })
1989 .expect("richtext should build");
1990
1991 assert_eq!(rich.text.as_ref(), "example");
1992 let facets = rich.facets.expect("markdown link should create a facet");
1993 assert_eq!(facets.len(), 1);
1994 assert_eq!(facets[0].index.byte_start, 0);
1995 assert_eq!(facets[0].index.byte_end, 7);
1996
1997 match &facets[0].features[0] {
1998 FacetFeaturesItem::Link(link) => assert_eq!(link.uri.as_ref(), "https://example.com"),
1999 other => panic!("expected link facet, got {other:?}"),
2000 }
2001 }
2002
2003 #[test]
2004 fn richtext_parse_keeps_other_facets_after_markdown_link_normalization() {
2005 let rich = tokio::runtime::Runtime::new()
2006 .expect("tokio runtime should build")
2007 .block_on(async {
2008 richtext::parse("[example](https://example.com) #rust https://docs.rs @did:plc:alice")
2009 .build_async(&super::JacquardResolver::default())
2010 .await
2011 })
2012 .expect("richtext should build");
2013
2014 assert_eq!(rich.text.as_ref(), "example #rust https://docs.rs @did:plc:alice");
2015 let facets = rich.facets.expect("text should produce facets");
2016
2017 assert_eq!(facets.len(), 4);
2018 assert!(matches!(facets[0].features[0], FacetFeaturesItem::Link(_)));
2019 assert!(matches!(facets[1].features[0], FacetFeaturesItem::Tag(_)));
2020 assert!(matches!(facets[2].features[0], FacetFeaturesItem::Link(_)));
2021 assert!(matches!(facets[3].features[0], FacetFeaturesItem::Mention(_)));
2022 }
2023
2024 #[test]
2025 fn richtext_parse_leaves_invalid_markdown_link_syntax_unchanged() {
2026 let rich = tokio::runtime::Runtime::new()
2027 .expect("tokio runtime should build")
2028 .block_on(async {
2029 richtext::parse("[broken](not a url")
2030 .build_async(&super::JacquardResolver::default())
2031 .await
2032 })
2033 .expect("richtext should build");
2034
2035 assert_eq!(rich.text.as_ref(), "[broken](not a url");
2036 assert!(rich.facets.is_none(), "invalid markdown should not produce facets");
2037 }
2038}