BlueSky & more on desktop lazurite.stormlightlabs.org/
tauri rust typescript bluesky appview atproto solid
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: follower mgmt/audit commands

+838 -30
+10 -7
docs/tasks/16-follows.md
··· 11 11 12 12 ### Backend 13 13 14 - - [ ] **Add `FlaggedFollow` type** to `src-tauri/src/feed.rs` (or a new `graph.rs` module if feed.rs is getting large). 14 + - [x] **Add `FlaggedFollow` type** to `src-tauri/src/feed.rs` (or a new `graph.rs` module if feed.rs is getting large). 15 15 Bitflag status field matching the spec's status table. 16 - - [ ] **Implement `audit_follows` command.** Paginate `com.atproto.repo.listRecords` for the follow collection, batch-resolve via `getProfiles` (25/batch, bounded concurrency via semaphore), individually resolve missing DIDs via `getProfile` + DID document handle resolution. 16 + - [x] **Implement `audit_follows` command.** Paginate `com.atproto.repo.listRecords` for the follow collection, batch-resolve via `getProfiles` (25/batch, bounded concurrency via semaphore), individually resolve missing DIDs via `getProfile` + DID document handle resolution. 17 17 Emit `follow-hygiene:progress` events per batch. Return only accounts with non-zero status. 18 - - [ ] **Implement `batch_unfollow` command.** Accept a `Vec<String>` of follow AT-URIs. 18 + - [x] **Implement `batch_unfollow` command.** Accept a `Vec<String>` of follow AT-URIs. 19 19 Extract rkeys, build `Delete` operations, chunk into groups of 200, send via `applyWrites`. Return `BatchResult` with deleted count and any failed URIs. 20 - - [ ] **Rate-limit handling.** Add inter-batch delays and respect `429` / `Retry-After` headers in the audit scan. 20 + - [x] **Rate-limit handling.** Add inter-batch delays and respect `429` in the audit scan. 21 21 Log warnings on rate-limit hits. 22 22 23 23 ### Frontend 24 24 25 + #### Core 26 + 25 27 - [ ] **Create `FollowHygienePanel` component** (`src/components/profile/FollowHygienePanel.tsx`). Local state via `createStore<FollowHygieneState>`. Phases: idle → scanning → ready → unfollowing → done. 26 28 - [ ] **Progress bar.** Listen to `follow-hygiene:progress` Tauri events during scan. Determinate bar with animated fill. 27 29 - [ ] **Flagged account list.** Scrollable list with per-row checkbox, handle, DID, status label chip. Selected rows get background tint. Use `For` (not map). 28 30 - [ ] **Category filter sidebar.** Sticky sidebar with visibility toggles and select-all checkboxes per status category. Selection counter. 29 - - [ ] **Unfollow flow.** Confirmation dialog before destructive action. Invoke `batch_unfollow`, remove completed rows with exit animation, show result summary. 30 - - [ ] **Entry points.** Add "Audit follows" button to the authenticated user's own profile panel. Add secondary entry in Settings > Account section. 31 + - [ ] **Unfollow flow.** Confirmation dialog before destructive action should invoke `batch_unfollow`, remove completed rows with exit animation, show result summary. 32 + - [ ] **Entry points.** Add "Audit follows" button to the authenticated user's own profile panel. 33 + - [ ] Add secondary entry in Settings > Account section. 31 34 32 - ### Polish 35 + #### Polish 33 36 34 37 - [ ] Keyboard shortcuts: `Space` toggle, `Ctrl+A` select all, `Escape` close 35 38 - [ ] `Motion` staggered fade-in on scan results, exit animation on unfollow
+13 -1
src-tauri/src/commands/mod.rs
··· 2 2 use super::auth::{self, LoginSuggestion}; 3 3 use super::conversations; 4 4 use super::error::Result; 5 - use super::feed::{self, CreateRecordResult, EmbedInput, FeedViewPrefItem, ReplyRefInput, UserPreferences}; 5 + use super::feed::{ 6 + self, BatchResult, CreateRecordResult, EmbedInput, FeedViewPrefItem, FlaggedFollow, ReplyRefInput, UserPreferences, 7 + }; 6 8 use super::notifications; 7 9 use super::state::{AccountSummary, AppBootstrap, AppState}; 8 10 use serde_json::Value; ··· 165 167 #[tauri::command] 166 168 pub async fn get_follows(actor: String, cursor: Option<String>, limit: Option<u32>, state: State<'_>) -> Result<Value> { 167 169 feed::get_follows(actor, cursor, limit, &state).await 170 + } 171 + 172 + #[tauri::command] 173 + pub async fn audit_follows(app: AppHandle, state: State<'_>) -> Result<Vec<FlaggedFollow>> { 174 + feed::audit_follows(&app, &state).await 175 + } 176 + 177 + #[tauri::command] 178 + pub async fn batch_unfollow(follow_uris: Vec<String>, state: State<'_>) -> Result<BatchResult> { 179 + feed::batch_unfollow(follow_uris, &state).await 168 180 } 169 181 170 182 #[tauri::command]
+1 -5
src-tauri/src/commands/moderation.rs
··· 1 1 use super::super::error::Result; 2 2 use super::super::moderation::{ 3 - self, 4 - ModerationLabelerPolicyDefinition, 5 - ModerationUI, 6 - ReportSubjectInput, 7 - StoredModerationPrefs, 3 + self, ModerationLabelerPolicyDefinition, ModerationUI, ReportSubjectInput, StoredModerationPrefs, 8 4 }; 9 5 use super::super::state::AppState; 10 6 use tauri_plugin_log::log;
+792 -4
src-tauri/src/feed.rs
··· 6 6 use super::state::AppState; 7 7 use jacquard::api::app_bsky::actor::get_preferences::GetPreferences; 8 8 use jacquard::api::app_bsky::actor::get_profile::GetProfile; 9 + use jacquard::api::app_bsky::actor::get_profiles::GetProfiles; 9 10 use jacquard::api::app_bsky::actor::put_preferences::PutPreferences; 10 11 use jacquard::api::app_bsky::actor::{ 11 - FeedViewPref, PreferencesItem, SavedFeed, SavedFeedType, SavedFeedsPrefV2, SavedFeedsPrefV2Builder, 12 + FeedViewPref, PreferencesItem, ProfileViewDetailed, SavedFeed, SavedFeedType, SavedFeedsPrefV2, 13 + SavedFeedsPrefV2Builder, 12 14 }; 13 15 use jacquard::api::app_bsky::bookmark::create_bookmark::CreateBookmark; 14 16 use jacquard::api::app_bsky::bookmark::delete_bookmark::DeleteBookmark; ··· 27 29 use jacquard::api::app_bsky::graph::follow::Follow; 28 30 use jacquard::api::app_bsky::graph::get_followers::GetFollowers; 29 31 use jacquard::api::app_bsky::graph::get_follows::GetFollows; 32 + use jacquard::api::com_atproto::label::Label; 33 + use jacquard::api::com_atproto::repo::apply_writes::{ 34 + ApplyWrites, ApplyWritesOutput, ApplyWritesOutputResultsItem, ApplyWritesWritesItem, Delete, 35 + }; 30 36 use jacquard::api::com_atproto::repo::create_record::CreateRecord; 31 37 use jacquard::api::com_atproto::repo::delete_record::DeleteRecord; 38 + use jacquard::api::com_atproto::repo::list_records::{ListRecords, ListRecordsOutput, Record as RepoListRecord}; 32 39 use jacquard::api::com_atproto::repo::strong_ref::StrongRef; 33 - use jacquard::identity::JacquardResolver; 40 + use jacquard::identity::{resolver::IdentityResolver, JacquardResolver}; 34 41 use jacquard::richtext; 35 42 use jacquard::types::aturi::AtUri; 36 43 use jacquard::types::cid::Cid; ··· 43 50 use jacquard::types::value::Data; 44 51 use jacquard::xrpc::XrpcClient; 45 52 use jacquard::IntoStatic; 53 + use reqwest::StatusCode; 46 54 use serde::{Deserialize, Serialize}; 55 + use std::collections::{HashMap, HashSet}; 47 56 use std::sync::Arc; 57 + use std::time::Duration; 58 + use tauri::{AppHandle, Emitter}; 48 59 use tauri_plugin_log::log; 60 + use tokio::sync::Semaphore; 61 + use tokio::task::JoinSet; 62 + use tokio::time::sleep; 49 63 50 64 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 51 65 #[serde(tag = "status", rename_all = "camelCase")] ··· 323 337 pub struct CreateRecordResult { 324 338 pub uri: String, 325 339 pub cid: String, 340 + } 341 + 342 + const FOLLOW_COLLECTION_NSID: &str = "app.bsky.graph.follow"; 343 + const FOLLOW_HYGIENE_PROGRESS_EVENT: &str = "follow-hygiene:progress"; 344 + const FOLLOW_AUDIT_PAGE_LIMIT: i64 = 100; 345 + const FOLLOW_AUDIT_PROFILE_BATCH_SIZE: usize = 25; 346 + const FOLLOW_AUDIT_PROFILE_BATCH_CONCURRENCY: usize = 3; 347 + const FOLLOW_AUDIT_INTER_BATCH_DELAY: Duration = Duration::from_millis(250); 348 + const FOLLOW_AUDIT_RETRY_AFTER_DEFAULT: Duration = Duration::from_secs(2); 349 + const FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES: usize = 5; 350 + const FOLLOW_UNFOLLOW_WRITE_CHUNK_SIZE: usize = 200; 351 + 352 + const FOLLOW_STATUS_DELETED: u8 = 1 << 0; 353 + const FOLLOW_STATUS_DEACTIVATED: u8 = 1 << 1; 354 + const FOLLOW_STATUS_SUSPENDED: u8 = 1 << 2; 355 + const FOLLOW_STATUS_BLOCKED_BY: u8 = 1 << 3; 356 + const FOLLOW_STATUS_BLOCKING: u8 = 1 << 4; 357 + const FOLLOW_STATUS_HIDDEN: u8 = 1 << 5; 358 + const FOLLOW_STATUS_SELF_FOLLOW: u8 = 1 << 6; 359 + 360 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 361 + #[serde(rename_all = "camelCase")] 362 + pub struct FlaggedFollow { 363 + pub did: String, 364 + pub handle: String, 365 + pub follow_uri: String, 366 + pub status: u8, 367 + pub status_label: String, 368 + } 369 + 370 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 371 + #[serde(rename_all = "camelCase")] 372 + pub struct BatchResult { 373 + pub deleted: usize, 374 + pub failed: Vec<String>, 375 + } 376 + 377 + #[derive(Debug, Clone, Serialize)] 378 + #[serde(rename_all = "camelCase")] 379 + struct FollowHygieneProgress { 380 + current: usize, 381 + total: usize, 382 + } 383 + 384 + #[derive(Debug, Clone, PartialEq, Eq)] 385 + struct FollowRecordEntry { 386 + did: String, 387 + follow_uri: String, 388 + } 389 + 390 + #[derive(Debug, Clone, PartialEq, Eq)] 391 + struct FollowStatusInfo { 392 + handle: String, 393 + status: u8, 394 + } 395 + 396 + #[derive(Debug, Clone, PartialEq, Eq)] 397 + struct FollowDeleteTarget { 398 + uri: String, 399 + rkey: String, 326 400 } 327 401 328 402 pub async fn get_preferences(state: &AppState) -> Result<UserPreferences> { ··· 955 1029 serde_json::to_value(&output).map_err(AppError::from) 956 1030 } 957 1031 1032 + pub async fn audit_follows(app: &AppHandle, state: &AppState) -> Result<Vec<FlaggedFollow>> { 1033 + let session = get_session(state).await?; 1034 + let active_did = active_did(state)?; 1035 + let follow_records = list_follow_records_for_audit(&session, &active_did).await?; 1036 + if follow_records.is_empty() { 1037 + return Ok(Vec::new()); 1038 + } 1039 + 1040 + let dids = follow_records 1041 + .iter() 1042 + .map(|record| record.did.clone()) 1043 + .collect::<Vec<_>>(); 1044 + let unique_dids = dedupe_preserve_order(dids); 1045 + let follow_statuses = resolve_follow_statuses(&session, app, &active_did, unique_dids).await?; 1046 + 1047 + Ok(follow_records 1048 + .into_iter() 1049 + .filter_map(|record| { 1050 + follow_statuses 1051 + .get(&record.did) 1052 + .map(|status| build_flagged_follow(record, status.clone())) 1053 + }) 1054 + .collect()) 1055 + } 1056 + 1057 + pub async fn batch_unfollow(follow_uris: Vec<String>, state: &AppState) -> Result<BatchResult> { 1058 + let session = get_session(state).await?; 1059 + let active_did = active_did(state)?; 1060 + 1061 + if follow_uris.is_empty() { 1062 + return Ok(BatchResult { deleted: 0, failed: Vec::new() }); 1063 + } 1064 + 1065 + let mut targets = Vec::new(); 1066 + let mut failed = Vec::new(); 1067 + 1068 + for uri in follow_uris { 1069 + match parse_follow_delete_target(&uri) { 1070 + Ok(target) => targets.push(target), 1071 + Err(reason) => { 1072 + log::warn!("skipping invalid follow URI for batch unfollow: {uri} ({reason})"); 1073 + failed.push(uri); 1074 + } 1075 + } 1076 + } 1077 + 1078 + let mut deleted = 0usize; 1079 + for chunk in targets.chunks(FOLLOW_UNFOLLOW_WRITE_CHUNK_SIZE) { 1080 + let (writes, chunk_uris, chunk_failed) = build_delete_writes(chunk); 1081 + failed.extend(chunk_failed); 1082 + 1083 + if writes.is_empty() { 1084 + continue; 1085 + } 1086 + 1087 + match send_apply_writes_chunk_with_retry(&session, &active_did, writes).await { 1088 + Ok(output) => { 1089 + let (chunk_deleted, chunk_failures) = summarize_apply_writes_result(&chunk_uris, &output); 1090 + deleted += chunk_deleted; 1091 + failed.extend(chunk_failures); 1092 + } 1093 + Err(error) => { 1094 + log::warn!( 1095 + "applyWrites failed for unfollow batch ({} items): {error}", 1096 + chunk_uris.len() 1097 + ); 1098 + failed.extend(chunk_uris); 1099 + } 1100 + } 1101 + } 1102 + 1103 + Ok(BatchResult { deleted, failed }) 1104 + } 1105 + 1106 + async fn list_follow_records_for_audit( 1107 + session: &Arc<LazuriteOAuthSession>, active_did: &str, 1108 + ) -> Result<Vec<FollowRecordEntry>> { 1109 + let mut records = Vec::new(); 1110 + let mut cursor = None; 1111 + 1112 + loop { 1113 + let output = list_follow_records_page_with_retry(session, active_did, cursor.clone()).await?; 1114 + for record in output.records { 1115 + if let Some(entry) = follow_record_entry_from_list_record(&record) { 1116 + records.push(entry); 1117 + } 1118 + } 1119 + 1120 + cursor = output.cursor.map(|value| value.to_string()); 1121 + if cursor.is_none() { 1122 + break; 1123 + } 1124 + 1125 + sleep(FOLLOW_AUDIT_INTER_BATCH_DELAY).await; 1126 + } 1127 + 1128 + Ok(records) 1129 + } 1130 + 1131 + async fn list_follow_records_page_with_retry( 1132 + session: &Arc<LazuriteOAuthSession>, active_did: &str, cursor: Option<String>, 1133 + ) -> Result<ListRecordsOutput<'static>> { 1134 + let repo = AtIdentifier::Did(Did::new(active_did)?.into_static()); 1135 + let collection = Nsid::new(FOLLOW_COLLECTION_NSID) 1136 + .map_err(|_| AppError::validation("invalid follow collection NSID"))? 1137 + .into_static(); 1138 + let mut retries = 0usize; 1139 + 1140 + loop { 1141 + let response = session 1142 + .send( 1143 + ListRecords::new() 1144 + .repo(repo.clone()) 1145 + .collection(collection.clone()) 1146 + .limit(FOLLOW_AUDIT_PAGE_LIMIT) 1147 + .maybe_cursor(cursor.as_deref().map(Into::into)) 1148 + .build(), 1149 + ) 1150 + .await 1151 + .map_err(|error| { 1152 + log::error!("follow hygiene listRecords request failed: {error}"); 1153 + AppError::validation("Couldn't scan your follows right now.") 1154 + })?; 1155 + 1156 + if response.status() == StatusCode::TOO_MANY_REQUESTS { 1157 + retries += 1; 1158 + if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1159 + log::warn!("follow hygiene listRecords exceeded max rate-limit retries"); 1160 + return Err(AppError::validation("Couldn't scan your follows right now.")); 1161 + } 1162 + 1163 + let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1164 + log::warn!( 1165 + "follow hygiene listRecords rate-limited (attempt {retries}/{}), retrying in {}s", 1166 + FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1167 + delay.as_secs() 1168 + ); 1169 + sleep(delay).await; 1170 + continue; 1171 + } 1172 + 1173 + return response.into_output().map(IntoStatic::into_static).map_err(|error| { 1174 + log::error!("follow hygiene listRecords output decode failed: {error}"); 1175 + AppError::validation("Couldn't scan your follows right now.") 1176 + }); 1177 + } 1178 + } 1179 + 1180 + async fn resolve_follow_statuses( 1181 + session: &Arc<LazuriteOAuthSession>, app: &AppHandle, active_did: &str, dids: Vec<String>, 1182 + ) -> Result<HashMap<String, FollowStatusInfo>> { 1183 + let mut resolved = HashMap::new(); 1184 + if dids.is_empty() { 1185 + return Ok(resolved); 1186 + } 1187 + 1188 + let chunks = dids 1189 + .chunks(FOLLOW_AUDIT_PROFILE_BATCH_SIZE) 1190 + .map(|chunk| chunk.to_vec()) 1191 + .collect::<Vec<_>>(); 1192 + let total_batches = chunks.len(); 1193 + let semaphore = Arc::new(Semaphore::new(FOLLOW_AUDIT_PROFILE_BATCH_CONCURRENCY)); 1194 + let mut join_set = JoinSet::new(); 1195 + 1196 + for did_chunk in chunks { 1197 + let session = session.clone(); 1198 + let semaphore = semaphore.clone(); 1199 + join_set.spawn(async move { 1200 + let _permit = semaphore.acquire_owned().await.map_err(|error| { 1201 + log::error!("follow hygiene semaphore acquisition failed: {error}"); 1202 + AppError::validation("Couldn't scan your follows right now.") 1203 + })?; 1204 + let profiles = get_profiles_batch_with_retry(&session, &did_chunk).await?; 1205 + sleep(FOLLOW_AUDIT_INTER_BATCH_DELAY).await; 1206 + Ok::<(Vec<String>, Vec<ProfileViewDetailed<'static>>), AppError>((did_chunk, profiles)) 1207 + }); 1208 + } 1209 + 1210 + let mut missing = dids.into_iter().collect::<HashSet<_>>(); 1211 + let mut completed = 0usize; 1212 + 1213 + while let Some(joined) = join_set.join_next().await { 1214 + let (requested_dids, profiles) = joined.map_err(|error| { 1215 + log::error!("follow hygiene profile batch task failed: {error}"); 1216 + AppError::validation("Couldn't scan your follows right now.") 1217 + })??; 1218 + let mut found_dids = HashSet::new(); 1219 + 1220 + for profile in profiles { 1221 + let did = profile.did.to_string(); 1222 + found_dids.insert(did.clone()); 1223 + let status = follow_status_from_profile(&profile, active_did); 1224 + if status != 0 { 1225 + resolved.insert( 1226 + did.clone(), 1227 + FollowStatusInfo { handle: profile.handle.to_string(), status }, 1228 + ); 1229 + } 1230 + missing.remove(&did); 1231 + } 1232 + 1233 + for did in requested_dids { 1234 + if !found_dids.contains(&did) { 1235 + missing.insert(did); 1236 + } else { 1237 + missing.remove(&did); 1238 + } 1239 + } 1240 + 1241 + completed += 1; 1242 + app.emit( 1243 + FOLLOW_HYGIENE_PROGRESS_EVENT, 1244 + FollowHygieneProgress { current: completed, total: total_batches }, 1245 + )?; 1246 + } 1247 + 1248 + for did in dedupe_preserve_order(missing.into_iter().collect()) { 1249 + if let Some(status) = resolve_missing_follow_status(session, &did, active_did).await { 1250 + resolved.insert(did, status); 1251 + } 1252 + } 1253 + 1254 + Ok(resolved) 1255 + } 1256 + 1257 + async fn get_profiles_batch_with_retry( 1258 + session: &Arc<LazuriteOAuthSession>, dids: &[String], 1259 + ) -> Result<Vec<ProfileViewDetailed<'static>>> { 1260 + let actors = dids 1261 + .iter() 1262 + .map(|did| Did::new(did).map(|parsed| AtIdentifier::Did(parsed.into_static()))) 1263 + .collect::<std::result::Result<Vec<_>, _>>()?; 1264 + let mut retries = 0usize; 1265 + 1266 + loop { 1267 + let response = session 1268 + .send(GetProfiles::new().actors(actors.clone()).build()) 1269 + .await 1270 + .map_err(|error| { 1271 + log::error!("follow hygiene getProfiles request failed: {error}"); 1272 + AppError::validation("Couldn't scan your follows right now.") 1273 + })?; 1274 + 1275 + if response.status() == StatusCode::TOO_MANY_REQUESTS { 1276 + retries += 1; 1277 + if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1278 + log::warn!("follow hygiene getProfiles exceeded max rate-limit retries"); 1279 + return Err(AppError::validation("Couldn't scan your follows right now.")); 1280 + } 1281 + 1282 + let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1283 + log::warn!( 1284 + "follow hygiene getProfiles rate-limited (attempt {retries}/{}), retrying in {}s", 1285 + FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1286 + delay.as_secs() 1287 + ); 1288 + sleep(delay).await; 1289 + continue; 1290 + } 1291 + 1292 + let output = response.into_output().map_err(|error| { 1293 + log::error!("follow hygiene getProfiles output decode failed: {error}"); 1294 + AppError::validation("Couldn't scan your follows right now.") 1295 + })?; 1296 + return Ok(output.profiles.into_iter().map(IntoStatic::into_static).collect()); 1297 + } 1298 + } 1299 + 1300 + async fn resolve_missing_follow_status( 1301 + session: &Arc<LazuriteOAuthSession>, did: &str, active_did: &str, 1302 + ) -> Option<FollowStatusInfo> { 1303 + let did_value = Did::new(did).ok()?.into_static(); 1304 + let self_follow = if did == active_did { FOLLOW_STATUS_SELF_FOLLOW } else { 0 }; 1305 + 1306 + match get_profile_for_did_with_retry(session, &did_value).await { 1307 + Ok(profile) => { 1308 + let status = follow_status_from_profile(&profile, active_did); 1309 + if status == 0 { 1310 + None 1311 + } else { 1312 + Some(FollowStatusInfo { handle: profile.handle.to_string(), status }) 1313 + } 1314 + } 1315 + Err(error_message) => { 1316 + let mut status = follow_status_from_unavailability_reason(classify_actor_unavailability(&error_message)); 1317 + status |= self_follow; 1318 + 1319 + if status == 0 { 1320 + log::warn!("follow hygiene missing DID fallback unclassified for {did}: {error_message}"); 1321 + return None; 1322 + } 1323 + 1324 + let handle = resolve_handle_from_did_document(session, &did_value) 1325 + .await 1326 + .unwrap_or_else(|| did.to_string()); 1327 + Some(FollowStatusInfo { handle, status }) 1328 + } 1329 + } 1330 + } 1331 + 1332 + async fn get_profile_for_did_with_retry( 1333 + session: &Arc<LazuriteOAuthSession>, did: &Did<'_>, 1334 + ) -> std::result::Result<ProfileViewDetailed<'static>, String> { 1335 + let actor = AtIdentifier::Did(did.clone().into_static()); 1336 + let mut retries = 0usize; 1337 + 1338 + loop { 1339 + let response = session 1340 + .send(GetProfile::new().actor(actor.clone()).build()) 1341 + .await 1342 + .map_err(|error| error.to_string())?; 1343 + 1344 + if response.status() == StatusCode::TOO_MANY_REQUESTS { 1345 + retries += 1; 1346 + if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1347 + return Err("rate limit retries exhausted".into()); 1348 + } 1349 + 1350 + let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1351 + log::warn!( 1352 + "follow hygiene getProfile rate-limited for {} (attempt {retries}/{}), retrying in {}s", 1353 + did.as_ref(), 1354 + FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1355 + delay.as_secs() 1356 + ); 1357 + sleep(delay).await; 1358 + continue; 1359 + } 1360 + 1361 + let output = response.into_output().map_err(|error| error.to_string())?; 1362 + return Ok(output.value.into_static()); 1363 + } 1364 + } 1365 + 1366 + async fn resolve_handle_from_did_document(session: &Arc<LazuriteOAuthSession>, did: &Did<'_>) -> Option<String> { 1367 + let did_doc = session.resolve_did_doc(did).await.ok()?.into_owned().ok()?; 1368 + 1369 + did_doc.also_known_as.as_ref().and_then(|aliases| { 1370 + aliases.iter().find_map(|alias| { 1371 + alias 1372 + .as_ref() 1373 + .strip_prefix("at://") 1374 + .and_then(|candidate| Handle::new(candidate).ok().map(|handle| handle.to_string())) 1375 + }) 1376 + }) 1377 + } 1378 + 1379 + fn follow_status_from_unavailability_reason(reason: Option<ActorAvailabilityReason>) -> u8 { 1380 + match reason { 1381 + Some(ActorAvailabilityReason::NotFound) => FOLLOW_STATUS_DELETED, 1382 + Some(ActorAvailabilityReason::Deactivated) => FOLLOW_STATUS_DEACTIVATED, 1383 + Some(ActorAvailabilityReason::Suspended) => FOLLOW_STATUS_SUSPENDED, 1384 + _ => 0, 1385 + } 1386 + } 1387 + 1388 + fn follow_status_from_profile(profile: &ProfileViewDetailed<'_>, active_did: &str) -> u8 { 1389 + let mut status = 0u8; 1390 + 1391 + if profile.did.as_ref() == active_did { 1392 + status |= FOLLOW_STATUS_SELF_FOLLOW; 1393 + } 1394 + 1395 + if profile 1396 + .viewer 1397 + .as_ref() 1398 + .and_then(|viewer| viewer.blocked_by) 1399 + .unwrap_or(false) 1400 + { 1401 + status |= FOLLOW_STATUS_BLOCKED_BY; 1402 + } 1403 + 1404 + let is_blocking = profile 1405 + .viewer 1406 + .as_ref() 1407 + .and_then(|viewer| viewer.blocking.as_ref()) 1408 + .is_some() 1409 + || profile 1410 + .viewer 1411 + .as_ref() 1412 + .and_then(|viewer| viewer.blocking_by_list.as_ref()) 1413 + .is_some(); 1414 + if is_blocking { 1415 + status |= FOLLOW_STATUS_BLOCKING; 1416 + } 1417 + 1418 + if has_active_hide_label(profile.labels.as_deref()) { 1419 + status |= FOLLOW_STATUS_HIDDEN; 1420 + } 1421 + 1422 + status 1423 + } 1424 + 1425 + fn has_active_hide_label(labels: Option<&[Label<'_>]>) -> bool { 1426 + labels.is_some_and(|labels| { 1427 + labels 1428 + .iter() 1429 + .any(|label| label.val.as_ref() == "!hide" && !label.neg.unwrap_or(false)) 1430 + }) 1431 + } 1432 + 1433 + fn build_flagged_follow(record: FollowRecordEntry, status: FollowStatusInfo) -> FlaggedFollow { 1434 + FlaggedFollow { 1435 + did: record.did, 1436 + handle: status.handle, 1437 + follow_uri: record.follow_uri, 1438 + status: status.status, 1439 + status_label: follow_status_label(status.status), 1440 + } 1441 + } 1442 + 1443 + fn follow_status_label(status: u8) -> String { 1444 + if status == 0 { 1445 + return "Unknown".to_string(); 1446 + } 1447 + 1448 + let mut labels = Vec::new(); 1449 + 1450 + if status & FOLLOW_STATUS_DELETED != 0 { 1451 + labels.push("Deleted"); 1452 + } 1453 + if status & FOLLOW_STATUS_DEACTIVATED != 0 { 1454 + labels.push("Deactivated"); 1455 + } 1456 + if status & FOLLOW_STATUS_SUSPENDED != 0 { 1457 + labels.push("Suspended"); 1458 + } 1459 + 1460 + let has_blocked_by = status & FOLLOW_STATUS_BLOCKED_BY != 0; 1461 + let has_blocking = status & FOLLOW_STATUS_BLOCKING != 0; 1462 + if has_blocked_by && has_blocking { 1463 + labels.push("Mutual Block"); 1464 + } else if has_blocked_by { 1465 + labels.push("Blocked By"); 1466 + } else if has_blocking { 1467 + labels.push("Blocking"); 1468 + } 1469 + 1470 + if status & FOLLOW_STATUS_HIDDEN != 0 { 1471 + labels.push("Hidden"); 1472 + } 1473 + if status & FOLLOW_STATUS_SELF_FOLLOW != 0 { 1474 + labels.push("Self-Follow"); 1475 + } 1476 + 1477 + labels.join(", ") 1478 + } 1479 + 1480 + fn follow_record_entry_from_list_record(record: &RepoListRecord<'_>) -> Option<FollowRecordEntry> { 1481 + let follow_uri = record.uri.to_string(); 1482 + let did = match record.value.get_at_path("subject").and_then(Data::as_str) { 1483 + Some(subject) => match Did::new(subject) { 1484 + Ok(did) => did.to_string(), 1485 + Err(error) => { 1486 + log::warn!("follow hygiene skipped invalid follow subject DID in {follow_uri}: {error}"); 1487 + return None; 1488 + } 1489 + }, 1490 + None => { 1491 + log::warn!("follow hygiene skipped follow record with missing subject in {follow_uri}"); 1492 + return None; 1493 + } 1494 + }; 1495 + 1496 + Some(FollowRecordEntry { did, follow_uri }) 1497 + } 1498 + 1499 + fn retry_after_delay(buffer: &[u8]) -> Option<Duration> { 1500 + let payload = serde_json::from_slice::<serde_json::Value>(buffer).ok(); 1501 + if let Some(seconds) = payload 1502 + .as_ref() 1503 + .and_then(|value| value.get("retryAfter")) 1504 + .and_then(serde_json::Value::as_u64) 1505 + { 1506 + return Some(Duration::from_secs(seconds)); 1507 + } 1508 + 1509 + let text = payload 1510 + .as_ref() 1511 + .and_then(|value| value.get("message")) 1512 + .and_then(serde_json::Value::as_str) 1513 + .or_else(|| std::str::from_utf8(buffer).ok()) 1514 + .unwrap_or_default(); 1515 + 1516 + let lowered = text.to_ascii_lowercase(); 1517 + for marker in ["retry-after", "retry after", "retry_after"] { 1518 + if let Some(index) = lowered.find(marker) { 1519 + let seconds = lowered[index..] 1520 + .chars() 1521 + .skip_while(|ch| !ch.is_ascii_digit()) 1522 + .take_while(char::is_ascii_digit) 1523 + .collect::<String>() 1524 + .parse::<u64>() 1525 + .ok()?; 1526 + return Some(Duration::from_secs(seconds)); 1527 + } 1528 + } 1529 + 1530 + None 1531 + } 1532 + 1533 + fn parse_follow_delete_target(uri: &str) -> std::result::Result<FollowDeleteTarget, &'static str> { 1534 + let at_uri = AtUri::new(uri).map_err(|_| "invalid URI")?; 1535 + let collection = at_uri.collection().map(|value| value.to_string()); 1536 + if collection.as_deref() != Some(FOLLOW_COLLECTION_NSID) { 1537 + return Err("URI does not point to follow collection"); 1538 + } 1539 + 1540 + let rkey = at_uri 1541 + .rkey() 1542 + .map(|value| value.as_ref().to_string()) 1543 + .ok_or("URI missing rkey")?; 1544 + 1545 + Ok(FollowDeleteTarget { uri: uri.to_string(), rkey }) 1546 + } 1547 + 1548 + fn build_delete_writes( 1549 + targets: &[FollowDeleteTarget], 1550 + ) -> (Vec<ApplyWritesWritesItem<'static>>, Vec<String>, Vec<String>) { 1551 + let mut writes = Vec::with_capacity(targets.len()); 1552 + let mut chunk_uris = Vec::with_capacity(targets.len()); 1553 + let mut chunk_failed = Vec::new(); 1554 + let collection = match Nsid::new(FOLLOW_COLLECTION_NSID) { 1555 + Ok(collection) => collection.into_static(), 1556 + Err(_) => { 1557 + return ( 1558 + writes, 1559 + chunk_uris, 1560 + targets.iter().map(|target| target.uri.clone()).collect(), 1561 + ) 1562 + } 1563 + }; 1564 + 1565 + for target in targets { 1566 + let rkey = match RecordKey::any(&target.rkey) { 1567 + Ok(rkey) => rkey.into_static(), 1568 + Err(error) => { 1569 + log::warn!("failed to parse follow rkey from URI {}: {error}", target.uri); 1570 + chunk_failed.push(target.uri.clone()); 1571 + continue; 1572 + } 1573 + }; 1574 + 1575 + writes.push(ApplyWritesWritesItem::Delete(Box::new( 1576 + Delete::new().collection(collection.clone()).rkey(rkey).build(), 1577 + ))); 1578 + chunk_uris.push(target.uri.clone()); 1579 + } 1580 + 1581 + (writes, chunk_uris, chunk_failed) 1582 + } 1583 + 1584 + fn summarize_apply_writes_result(chunk_uris: &[String], output: &ApplyWritesOutput<'_>) -> (usize, Vec<String>) { 1585 + let Some(results) = output.results.as_ref() else { 1586 + return (chunk_uris.len(), Vec::new()); 1587 + }; 1588 + 1589 + let mut deleted = 0usize; 1590 + let mut failed = Vec::new(); 1591 + 1592 + for (idx, uri) in chunk_uris.iter().enumerate() { 1593 + match results.get(idx) { 1594 + Some(ApplyWritesOutputResultsItem::DeleteResult(_)) => deleted += 1, 1595 + _ => failed.push(uri.clone()), 1596 + } 1597 + } 1598 + 1599 + (deleted, failed) 1600 + } 1601 + 1602 + async fn send_apply_writes_chunk_with_retry( 1603 + session: &Arc<LazuriteOAuthSession>, active_did: &str, writes: Vec<ApplyWritesWritesItem<'static>>, 1604 + ) -> Result<ApplyWritesOutput<'static>> { 1605 + let repo = AtIdentifier::Did(Did::new(active_did)?.into_static()); 1606 + let mut retries = 0usize; 1607 + 1608 + loop { 1609 + let response = session 1610 + .send(ApplyWrites::new().repo(repo.clone()).writes(writes.clone()).build()) 1611 + .await 1612 + .map_err(|error| { 1613 + log::warn!("follow hygiene applyWrites request failed: {error}"); 1614 + AppError::validation("Couldn't unfollow selected accounts right now.") 1615 + })?; 1616 + 1617 + if response.status() == StatusCode::TOO_MANY_REQUESTS { 1618 + retries += 1; 1619 + if retries > FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES { 1620 + return Err(AppError::validation("Couldn't unfollow selected accounts right now.")); 1621 + } 1622 + 1623 + let delay = retry_after_delay(response.buffer()).unwrap_or(FOLLOW_AUDIT_RETRY_AFTER_DEFAULT); 1624 + log::warn!( 1625 + "follow hygiene applyWrites rate-limited (attempt {retries}/{}), retrying in {}s", 1626 + FOLLOW_AUDIT_MAX_RATE_LIMIT_RETRIES, 1627 + delay.as_secs() 1628 + ); 1629 + sleep(delay).await; 1630 + continue; 1631 + } 1632 + 1633 + return response.into_output().map(IntoStatic::into_static).map_err(|error| { 1634 + log::warn!("follow hygiene applyWrites output decode failed: {error}"); 1635 + AppError::validation("Couldn't unfollow selected accounts right now.") 1636 + }); 1637 + } 1638 + } 1639 + 1640 + fn dedupe_preserve_order(values: Vec<String>) -> Vec<String> { 1641 + let mut seen = HashSet::new(); 1642 + values.into_iter().filter(|value| seen.insert(value.clone())).collect() 1643 + } 1644 + 958 1645 fn strong_ref_from_input(input: &StrongRefInput) -> Result<StrongRef<'static>> { 959 1646 Ok(StrongRef::new() 960 1647 .uri( ··· 997 1684 #[cfg(test)] 998 1685 mod tests { 999 1686 use super::{ 1000 - accepts_empty_bookmark_response, accepts_empty_put_preferences_response, merge_feed_view_preferences, 1001 - merge_saved_feeds_preferences, user_preferences_from_items, FeedViewPrefItem, SavedFeedItem, 1687 + accepts_empty_bookmark_response, accepts_empty_put_preferences_response, build_delete_writes, 1688 + follow_status_from_profile, follow_status_label, merge_feed_view_preferences, merge_saved_feeds_preferences, 1689 + parse_follow_delete_target, retry_after_delay, summarize_apply_writes_result, user_preferences_from_items, 1690 + FeedViewPrefItem, FollowDeleteTarget, SavedFeedItem, FOLLOW_STATUS_BLOCKED_BY, FOLLOW_STATUS_BLOCKING, 1691 + FOLLOW_STATUS_HIDDEN, FOLLOW_STATUS_SELF_FOLLOW, 1002 1692 }; 1693 + use jacquard::api::app_bsky::actor::ProfileViewDetailed; 1003 1694 use jacquard::api::app_bsky::actor::{AdultContentPref, FeedViewPref, PreferencesItem}; 1004 1695 use jacquard::api::app_bsky::richtext::facet::FacetFeaturesItem; 1696 + use jacquard::api::com_atproto::repo::apply_writes::{ 1697 + ApplyWritesOutput, ApplyWritesOutputResultsItem, DeleteResult, 1698 + }; 1005 1699 use jacquard::richtext; 1700 + use jacquard::types::aturi::AtUri; 1701 + use jacquard::types::did::Did; 1702 + use jacquard::types::handle::Handle; 1703 + use jacquard::IntoStatic; 1006 1704 use reqwest::StatusCode; 1705 + use std::time::Duration; 1007 1706 1008 1707 fn adult_content_pref_item() -> PreferencesItem<'static> { 1009 1708 PreferencesItem::AdultContentPref(Box::new(AdultContentPref::new().enabled(true).build())) ··· 1099 1798 assert!(accepts_empty_bookmark_response(StatusCode::OK, b"")); 1100 1799 assert!(!accepts_empty_bookmark_response(StatusCode::OK, b"{}")); 1101 1800 assert!(!accepts_empty_bookmark_response(StatusCode::BAD_REQUEST, b"")); 1801 + } 1802 + 1803 + #[test] 1804 + fn follow_status_label_collapses_mutual_block() { 1805 + let status = FOLLOW_STATUS_BLOCKED_BY | FOLLOW_STATUS_BLOCKING | FOLLOW_STATUS_HIDDEN; 1806 + assert_eq!(follow_status_label(status), "Mutual Block, Hidden"); 1807 + } 1808 + 1809 + #[test] 1810 + fn follow_status_from_profile_sets_expected_flags() { 1811 + let mut viewer = jacquard::api::app_bsky::actor::ViewerState::default(); 1812 + viewer.blocked_by = Some(true); 1813 + viewer.blocking = Some( 1814 + AtUri::new("at://did:plc:me/app.bsky.graph.block/abc123") 1815 + .expect("uri should parse") 1816 + .into_static(), 1817 + ); 1818 + 1819 + let profile = ProfileViewDetailed::new() 1820 + .did(Did::new("did:plc:alice").expect("did should parse").into_static()) 1821 + .handle(Handle::new("alice.test").expect("handle should parse").into_static()) 1822 + .viewer(Some(viewer)) 1823 + .build(); 1824 + 1825 + let status = follow_status_from_profile(&profile, "did:plc:alice"); 1826 + 1827 + assert_ne!(status & FOLLOW_STATUS_BLOCKED_BY, 0); 1828 + assert_ne!(status & FOLLOW_STATUS_BLOCKING, 0); 1829 + assert_ne!(status & FOLLOW_STATUS_SELF_FOLLOW, 0); 1830 + } 1831 + 1832 + #[test] 1833 + fn parse_follow_delete_target_rejects_invalid_inputs() { 1834 + assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.graph.follow/abc123").is_ok()); 1835 + assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.feed.like/abc123").is_err()); 1836 + assert!(parse_follow_delete_target("at://did:plc:alice/app.bsky.graph.follow").is_err()); 1837 + assert!(parse_follow_delete_target("not-a-uri").is_err()); 1838 + } 1839 + 1840 + #[test] 1841 + fn build_delete_writes_skips_invalid_rkeys() { 1842 + let targets = vec![ 1843 + FollowDeleteTarget { uri: "at://did:plc:alice/app.bsky.graph.follow/abc123".into(), rkey: "abc123".into() }, 1844 + FollowDeleteTarget { uri: "at://did:plc:alice/app.bsky.graph.follow/bad".into(), rkey: "bad key".into() }, 1845 + ]; 1846 + 1847 + let (writes, chunk_uris, failed) = build_delete_writes(&targets); 1848 + assert_eq!(writes.len(), 1); 1849 + assert_eq!(chunk_uris, vec!["at://did:plc:alice/app.bsky.graph.follow/abc123"]); 1850 + assert_eq!(failed, vec!["at://did:plc:alice/app.bsky.graph.follow/bad"]); 1851 + } 1852 + 1853 + #[test] 1854 + fn summarize_apply_writes_result_handles_missing_entries_as_failures() { 1855 + let output: ApplyWritesOutput<'_> = ApplyWritesOutput { 1856 + results: Some(vec![ApplyWritesOutputResultsItem::DeleteResult(Box::new( 1857 + DeleteResult::default(), 1858 + ))]), 1859 + ..Default::default() 1860 + }; 1861 + let chunk_uris = vec![ 1862 + "at://did:plc:a/app.bsky.graph.follow/1".to_string(), 1863 + "at://did:plc:b/app.bsky.graph.follow/2".to_string(), 1864 + ]; 1865 + 1866 + let (deleted, failed) = summarize_apply_writes_result(&chunk_uris, &output); 1867 + assert_eq!(deleted, 1); 1868 + assert_eq!(failed, vec!["at://did:plc:b/app.bsky.graph.follow/2"]); 1869 + } 1870 + 1871 + #[test] 1872 + fn summarize_apply_writes_result_treats_missing_results_as_all_successful() { 1873 + let output: ApplyWritesOutput<'_> = ApplyWritesOutput::default(); 1874 + let chunk_uris = vec![ 1875 + "at://did:plc:a/app.bsky.graph.follow/1".to_string(), 1876 + "at://did:plc:b/app.bsky.graph.follow/2".to_string(), 1877 + ]; 1878 + 1879 + let (deleted, failed) = summarize_apply_writes_result(&chunk_uris, &output); 1880 + assert_eq!(deleted, 2); 1881 + assert!(failed.is_empty()); 1882 + } 1883 + 1884 + #[test] 1885 + fn retry_after_delay_reads_numeric_seconds_from_payload() { 1886 + let body = br#"{"message":"rate limited, retry after 7 seconds"}"#; 1887 + assert_eq!(retry_after_delay(body), Some(Duration::from_secs(7))); 1888 + 1889 + assert_eq!(retry_after_delay(br#"{"message":"slow down"}"#), None); 1102 1890 } 1103 1891 1104 1892 #[test]
+2
src-tauri/src/lib.rs
··· 108 108 cmd::unfollow_actor, 109 109 cmd::get_followers, 110 110 cmd::get_follows, 111 + cmd::audit_follows, 112 + cmd::batch_unfollow, 111 113 cmd::update_saved_feeds, 112 114 cmd::update_feed_view_pref, 113 115 cmd::list_notifications,
+20 -13
src-tauri/src/moderation.rs
··· 494 494 let definitions = if !definitions_from_view.is_empty() { 495 495 definitions_from_view 496 496 } else if let Ok(parsed) = Did::new(&did) { 497 - defs.get(&parsed) 498 - .map(normalize_label_definitions) 499 - .unwrap_or_default() 497 + defs.get(&parsed).map(normalize_label_definitions).unwrap_or_default() 500 498 } else { 501 499 Vec::new() 502 500 }; 503 501 504 502 let reason_types = view.map(|value| { 505 - value 506 - .reason_types 507 - .as_ref() 508 - .map(|types| types.iter().map(|item| item.as_ref().to_string()).collect::<Vec<String>>()) 503 + value.reason_types.as_ref().map(|types| { 504 + types 505 + .iter() 506 + .map(|item| item.as_ref().to_string()) 507 + .collect::<Vec<String>>() 508 + }) 509 509 }); 510 510 let subject_types = view.map(|value| { 511 - value 512 - .subject_types 513 - .as_ref() 514 - .map(|types| types.iter().map(|item| item.as_ref().to_string()).collect::<Vec<String>>()) 511 + value.subject_types.as_ref().map(|types| { 512 + types 513 + .iter() 514 + .map(|item| item.as_ref().to_string()) 515 + .collect::<Vec<String>>() 516 + }) 515 517 }); 516 518 let subject_collections = view.map(|value| { 517 519 value.subject_collections.as_ref().map(|collections| { ··· 525 527 policies.push(ModerationLabelerPolicyDefinition { 526 528 labeler_did: did, 527 529 labeler_handle: view.map(|value| value.creator.handle.as_ref().to_string()), 528 - labeler_display_name: view 529 - .and_then(|value| value.creator.display_name.as_ref().map(|name| name.as_ref().to_string())), 530 + labeler_display_name: view.and_then(|value| { 531 + value 532 + .creator 533 + .display_name 534 + .as_ref() 535 + .map(|name| name.as_ref().to_string()) 536 + }), 530 537 reason_types: reason_types.flatten(), 531 538 subject_types: subject_types.flatten(), 532 539 subject_collections: subject_collections.flatten(),