A lexicon-driven AppView for ATProto. happyview.dev
backfill firehose jetstream atproto appview oauth lexicon
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: remove public record aggregation from permissioned spaces

Trezy bef43611 3af84706

-585
-116
src/spaces/db.rs
··· 662 662 created_at: r.9, 663 663 }) 664 664 } 665 - 666 - // --------------------------------------------------------------------------- 667 - // Space Sync State 668 - // --------------------------------------------------------------------------- 669 - 670 - pub async fn get_sync_state( 671 - pool: &sqlx::AnyPool, 672 - backend: DatabaseBackend, 673 - space_id: &str, 674 - member_did: &str, 675 - ) -> Result<Option<SpaceSyncState>, AppError> { 676 - let sql = adapt_sql( 677 - "SELECT id, space_id, member_did, cursor, last_synced_at, status, error FROM space_sync_state WHERE space_id = ? AND member_did = ?", 678 - backend, 679 - ); 680 - 681 - let row: Option<SyncStateRow> = sqlx::query_as(&sql) 682 - .bind(space_id) 683 - .bind(member_did) 684 - .fetch_optional(pool) 685 - .await 686 - .map_err(|e| AppError::Internal(format!("failed to get sync state: {e}")))?; 687 - 688 - row.map(parse_sync_state_row).transpose() 689 - } 690 - 691 - pub async fn upsert_sync_state( 692 - pool: &sqlx::AnyPool, 693 - backend: DatabaseBackend, 694 - state: &SpaceSyncState, 695 - ) -> Result<(), AppError> { 696 - let sql = match backend { 697 - DatabaseBackend::Sqlite => { 698 - "INSERT OR REPLACE INTO space_sync_state (id, space_id, member_did, cursor, last_synced_at, status, error) VALUES (?, ?, ?, ?, ?, ?, ?)".to_string() 699 - } 700 - DatabaseBackend::Postgres => adapt_sql( 701 - "INSERT INTO space_sync_state (id, space_id, member_did, cursor, last_synced_at, status, error) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT (space_id, member_did) DO UPDATE SET cursor = EXCLUDED.cursor, last_synced_at = EXCLUDED.last_synced_at, status = EXCLUDED.status, error = EXCLUDED.error", 702 - backend, 703 - ), 704 - }; 705 - 706 - sqlx::query(&sql) 707 - .bind(&state.id) 708 - .bind(&state.space_id) 709 - .bind(&state.member_did) 710 - .bind(&state.cursor) 711 - .bind(&state.last_synced_at) 712 - .bind(state.status.as_str()) 713 - .bind(&state.error) 714 - .execute(pool) 715 - .await 716 - .map_err(|e| AppError::Internal(format!("failed to upsert sync state: {e}")))?; 717 - 718 - Ok(()) 719 - } 720 - 721 - pub async fn list_sync_states_for_space( 722 - pool: &sqlx::AnyPool, 723 - backend: DatabaseBackend, 724 - space_id: &str, 725 - ) -> Result<Vec<SpaceSyncState>, AppError> { 726 - let sql = adapt_sql( 727 - "SELECT id, space_id, member_did, cursor, last_synced_at, status, error FROM space_sync_state WHERE space_id = ? ORDER BY member_did ASC", 728 - backend, 729 - ); 730 - 731 - let rows: Vec<SyncStateRow> = sqlx::query_as(&sql) 732 - .bind(space_id) 733 - .fetch_all(pool) 734 - .await 735 - .map_err(|e| AppError::Internal(format!("failed to list sync states: {e}")))?; 736 - 737 - rows.into_iter().map(parse_sync_state_row).collect() 738 - } 739 - 740 - pub async fn list_pending_syncs( 741 - pool: &sqlx::AnyPool, 742 - backend: DatabaseBackend, 743 - ) -> Result<Vec<SpaceSyncState>, AppError> { 744 - let sql = adapt_sql( 745 - "SELECT id, space_id, member_did, cursor, last_synced_at, status, error FROM space_sync_state WHERE status = 'pending' OR status = 'error' ORDER BY last_synced_at ASC NULLS FIRST LIMIT 50", 746 - backend, 747 - ); 748 - 749 - let rows: Vec<SyncStateRow> = sqlx::query_as(&sql) 750 - .fetch_all(pool) 751 - .await 752 - .map_err(|e| AppError::Internal(format!("failed to list pending syncs: {e}")))?; 753 - 754 - rows.into_iter().map(parse_sync_state_row).collect() 755 - } 756 - 757 - type SyncStateRow = ( 758 - String, 759 - String, 760 - String, 761 - Option<String>, 762 - Option<String>, 763 - String, 764 - Option<String>, 765 - ); 766 - 767 - fn parse_sync_state_row(r: SyncStateRow) -> Result<SpaceSyncState, AppError> { 768 - let status = SyncStatus::parse(&r.5) 769 - .ok_or_else(|| AppError::Internal(format!("invalid sync status: {}", r.5)))?; 770 - 771 - Ok(SpaceSyncState { 772 - id: r.0, 773 - space_id: r.1, 774 - member_did: r.2, 775 - cursor: r.3, 776 - last_synced_at: r.4, 777 - status, 778 - error: r.6, 779 - }) 780 - }
-2
src/spaces/mod.rs
··· 2 2 pub mod credential; 3 3 pub mod db; 4 4 pub mod members; 5 - pub mod notifications; 6 5 pub mod routes; 7 - pub mod sync; 8 6 pub mod types; 9 7 10 8 use crate::error::AppError;
-86
src/spaces/notifications.rs
··· 1 - use serde::Deserialize; 2 - use uuid::Uuid; 3 - 4 - use crate::db::DatabaseBackend; 5 - use crate::error::AppError; 6 - use crate::spaces::db; 7 - use crate::spaces::types::*; 8 - 9 - #[derive(Debug, Deserialize)] 10 - #[serde(rename_all = "camelCase")] 11 - pub struct WriteNotification { 12 - pub space_uri: String, 13 - pub author_did: String, 14 - pub collection: String, 15 - pub rkey: String, 16 - pub action: WriteAction, 17 - } 18 - 19 - #[derive(Debug, Deserialize)] 20 - #[serde(rename_all = "lowercase")] 21 - pub enum WriteAction { 22 - Create, 23 - Update, 24 - Delete, 25 - } 26 - 27 - /// Process a write notification by queuing a sync pull for the affected member. 28 - /// 29 - /// This marks the member's sync state as pending so the next sync pass picks it up. 30 - pub async fn handle_write_notification( 31 - pool: &sqlx::AnyPool, 32 - backend: DatabaseBackend, 33 - space_id: &str, 34 - notification: &WriteNotification, 35 - ) -> Result<(), AppError> { 36 - let existing = db::get_sync_state(pool, backend, space_id, &notification.author_did).await?; 37 - 38 - let state = SpaceSyncState { 39 - id: existing 40 - .map(|s| s.id) 41 - .unwrap_or_else(|| Uuid::new_v4().to_string()), 42 - space_id: space_id.to_string(), 43 - member_did: notification.author_did.clone(), 44 - cursor: None, 45 - last_synced_at: None, 46 - status: SyncStatus::Pending, 47 - error: None, 48 - }; 49 - 50 - db::upsert_sync_state(pool, backend, &state).await?; 51 - 52 - Ok(()) 53 - } 54 - 55 - #[cfg(test)] 56 - mod tests { 57 - use super::*; 58 - 59 - #[test] 60 - fn write_action_deserializes() { 61 - let action: WriteAction = serde_json::from_str("\"create\"").unwrap(); 62 - assert!(matches!(action, WriteAction::Create)); 63 - 64 - let action: WriteAction = serde_json::from_str("\"update\"").unwrap(); 65 - assert!(matches!(action, WriteAction::Update)); 66 - 67 - let action: WriteAction = serde_json::from_str("\"delete\"").unwrap(); 68 - assert!(matches!(action, WriteAction::Delete)); 69 - } 70 - 71 - #[test] 72 - fn write_notification_deserializes() { 73 - let json = r#"{ 74 - "spaceUri": "ats://did:plc:owner/com.example.forum/main", 75 - "authorDid": "did:plc:alice", 76 - "collection": "com.example.forum.post", 77 - "rkey": "3k2abc", 78 - "action": "create" 79 - }"#; 80 - 81 - let notif: WriteNotification = serde_json::from_str(json).unwrap(); 82 - assert_eq!(notif.author_did, "did:plc:alice"); 83 - assert_eq!(notif.collection, "com.example.forum.post"); 84 - assert!(matches!(notif.action, WriteAction::Create)); 85 - } 86 - }
-47
src/spaces/routes.rs
··· 146 146 credential: String, 147 147 } 148 148 149 - #[derive(Deserialize)] 150 - #[serde(rename_all = "camelCase")] 151 - struct WriteNotificationInput { 152 - space_uri: String, 153 - author_did: String, 154 - collection: String, 155 - rkey: String, 156 - action: crate::spaces::notifications::WriteAction, 157 - } 158 - 159 149 // --------------------------------------------------------------------------- 160 150 // Route registration 161 151 // --------------------------------------------------------------------------- ··· 207 197 .route( 208 198 &format!("/xrpc/{NS}.space.refreshCredential"), 209 199 post(refresh_credential), 210 - ) 211 - // Notifications 212 - .route( 213 - &format!("/xrpc/{NS}.space.writeNotification"), 214 - post(write_notification), 215 200 ) 216 201 } 217 202 ··· 948 933 "expiresAt": issued.expires_at, 949 934 }))) 950 935 } 951 - 952 - // --------------------------------------------------------------------------- 953 - // Notification handlers 954 - // --------------------------------------------------------------------------- 955 - 956 - async fn write_notification( 957 - State(state): State<AppState>, 958 - xrpc_claims: XrpcClaims, 959 - Json(input): Json<WriteNotificationInput>, 960 - ) -> Result<Json<serde_json::Value>, AppError> { 961 - let claims = require_auth(&xrpc_claims)?; 962 - let space = resolve_space(&state, &input.space_uri).await?; 963 - require_space_admin(&state, &space, claims.did()).await?; 964 - 965 - let notification = crate::spaces::notifications::WriteNotification { 966 - space_uri: input.space_uri, 967 - author_did: input.author_did, 968 - collection: input.collection, 969 - rkey: input.rkey, 970 - action: input.action, 971 - }; 972 - 973 - crate::spaces::notifications::handle_write_notification( 974 - &state.db, 975 - state.db_backend, 976 - &space.id, 977 - &notification, 978 - ) 979 - .await?; 980 - 981 - Ok(Json(serde_json::json!({ "success": true }))) 982 - }
-293
src/spaces/sync.rs
··· 1 - use uuid::Uuid; 2 - 3 - use crate::db::DatabaseBackend; 4 - use crate::db::now_rfc3339; 5 - use crate::error::AppError; 6 - use crate::profile::resolve_pds_endpoint; 7 - use crate::spaces::types::*; 8 - use crate::spaces::{db, members}; 9 - 10 - /// Sync all members of a space by pulling records from their PDSes. 11 - pub async fn sync_space( 12 - http: &reqwest::Client, 13 - pool: &sqlx::AnyPool, 14 - backend: DatabaseBackend, 15 - plc_url: &str, 16 - space_id: &str, 17 - collections: &[String], 18 - ) -> Result<SyncSpaceResult, AppError> { 19 - let resolved = members::resolve_members(pool, backend, space_id).await?; 20 - let mut results = Vec::new(); 21 - 22 - for member in &resolved { 23 - let result = sync_member( 24 - http, 25 - pool, 26 - backend, 27 - plc_url, 28 - space_id, 29 - &member.did, 30 - collections, 31 - ) 32 - .await; 33 - 34 - results.push(MemberSyncResult { 35 - did: member.did.clone(), 36 - records_synced: result.as_ref().map(|r| r.records_synced).unwrap_or(0), 37 - error: result.err().map(|e| e.to_string()), 38 - }); 39 - } 40 - 41 - let total = results.iter().map(|r| r.records_synced).sum(); 42 - 43 - Ok(SyncSpaceResult { 44 - members_processed: results.len(), 45 - total_records_synced: total, 46 - member_results: results, 47 - }) 48 - } 49 - 50 - /// Sync records from a single member's PDS for a given space. 51 - pub async fn sync_member( 52 - http: &reqwest::Client, 53 - pool: &sqlx::AnyPool, 54 - backend: DatabaseBackend, 55 - plc_url: &str, 56 - space_id: &str, 57 - member_did: &str, 58 - collections: &[String], 59 - ) -> Result<MemberSyncSummary, AppError> { 60 - let state_id = match db::get_sync_state(pool, backend, space_id, member_did).await? { 61 - Some(s) => s.id, 62 - None => { 63 - let id = Uuid::new_v4().to_string(); 64 - let initial = SpaceSyncState { 65 - id: id.clone(), 66 - space_id: space_id.to_string(), 67 - member_did: member_did.to_string(), 68 - cursor: None, 69 - last_synced_at: None, 70 - status: SyncStatus::Pending, 71 - error: None, 72 - }; 73 - db::upsert_sync_state(pool, backend, &initial).await?; 74 - id 75 - } 76 - }; 77 - 78 - // Mark as syncing 79 - let syncing_state = SpaceSyncState { 80 - id: state_id.clone(), 81 - space_id: space_id.to_string(), 82 - member_did: member_did.to_string(), 83 - cursor: None, 84 - last_synced_at: None, 85 - status: SyncStatus::Syncing, 86 - error: None, 87 - }; 88 - db::upsert_sync_state(pool, backend, &syncing_state).await?; 89 - 90 - let result = pull_member_records( 91 - http, 92 - pool, 93 - backend, 94 - plc_url, 95 - space_id, 96 - member_did, 97 - collections, 98 - ) 99 - .await; 100 - 101 - match result { 102 - Ok(summary) => { 103 - let done = SpaceSyncState { 104 - id: state_id, 105 - space_id: space_id.to_string(), 106 - member_did: member_did.to_string(), 107 - cursor: summary.cursor.clone(), 108 - last_synced_at: Some(now_rfc3339()), 109 - status: SyncStatus::Synced, 110 - error: None, 111 - }; 112 - db::upsert_sync_state(pool, backend, &done).await?; 113 - Ok(summary) 114 - } 115 - Err(e) => { 116 - let err_state = SpaceSyncState { 117 - id: state_id, 118 - space_id: space_id.to_string(), 119 - member_did: member_did.to_string(), 120 - cursor: None, 121 - last_synced_at: Some(now_rfc3339()), 122 - status: SyncStatus::Error, 123 - error: Some(e.to_string()), 124 - }; 125 - db::upsert_sync_state(pool, backend, &err_state).await?; 126 - Err(e) 127 - } 128 - } 129 - } 130 - 131 - async fn pull_member_records( 132 - http: &reqwest::Client, 133 - pool: &sqlx::AnyPool, 134 - backend: DatabaseBackend, 135 - plc_url: &str, 136 - space_id: &str, 137 - member_did: &str, 138 - collections: &[String], 139 - ) -> Result<MemberSyncSummary, AppError> { 140 - let pds_url = resolve_pds_endpoint(http, plc_url, member_did).await?; 141 - let mut total_records = 0usize; 142 - let mut last_cursor = None; 143 - 144 - for collection in collections { 145 - let mut cursor: Option<String> = None; 146 - loop { 147 - let (records, next_cursor) = fetch_records_page( 148 - http, 149 - &pds_url, 150 - member_did, 151 - collection, 152 - cursor.as_deref(), 153 - 100, 154 - ) 155 - .await?; 156 - 157 - if records.is_empty() { 158 - break; 159 - } 160 - 161 - for record in &records { 162 - let uri = record["uri"].as_str().unwrap_or(""); 163 - let rkey = extract_rkey(uri); 164 - let cid = record["cid"].as_str().unwrap_or("").to_string(); 165 - let value = record 166 - .get("value") 167 - .cloned() 168 - .unwrap_or(serde_json::Value::Null); 169 - 170 - let space_record_uri = format!("ats://{space_id}/{member_did}/{collection}/{rkey}"); 171 - 172 - let space_record = SpaceRecord { 173 - uri: space_record_uri, 174 - space_id: space_id.to_string(), 175 - author_did: member_did.to_string(), 176 - collection: collection.clone(), 177 - rkey: rkey.to_string(), 178 - record: value, 179 - cid, 180 - indexed_at: now_rfc3339(), 181 - }; 182 - 183 - db::upsert_space_record(pool, backend, &space_record).await?; 184 - total_records += 1; 185 - } 186 - 187 - last_cursor = next_cursor.clone(); 188 - cursor = next_cursor; 189 - 190 - if cursor.is_none() { 191 - break; 192 - } 193 - } 194 - } 195 - 196 - Ok(MemberSyncSummary { 197 - records_synced: total_records, 198 - cursor: last_cursor, 199 - }) 200 - } 201 - 202 - async fn fetch_records_page( 203 - http: &reqwest::Client, 204 - pds_url: &str, 205 - repo: &str, 206 - collection: &str, 207 - cursor: Option<&str>, 208 - limit: u32, 209 - ) -> Result<(Vec<serde_json::Value>, Option<String>), AppError> { 210 - let mut url = format!( 211 - "{}/xrpc/com.atproto.repo.listRecords?repo={}&collection={}&limit={}", 212 - pds_url.trim_end_matches('/'), 213 - repo, 214 - collection, 215 - limit, 216 - ); 217 - 218 - if let Some(c) = cursor { 219 - url.push_str(&format!("&cursor={c}")); 220 - } 221 - 222 - let resp = http 223 - .get(&url) 224 - .send() 225 - .await 226 - .map_err(|e| AppError::Internal(format!("PDS request failed: {e}")))?; 227 - 228 - if !resp.status().is_success() { 229 - let status = resp.status(); 230 - return Err(AppError::Internal(format!( 231 - "PDS listRecords failed with {status} for {repo}/{collection}" 232 - ))); 233 - } 234 - 235 - let body: serde_json::Value = resp 236 - .json() 237 - .await 238 - .map_err(|e| AppError::Internal(format!("invalid PDS response: {e}")))?; 239 - 240 - let records = body["records"].as_array().cloned().unwrap_or_default(); 241 - 242 - let next_cursor = body["cursor"].as_str().map(|s| s.to_string()); 243 - 244 - Ok((records, next_cursor)) 245 - } 246 - 247 - fn extract_rkey(uri: &str) -> &str { 248 - uri.rsplit('/').next().unwrap_or("") 249 - } 250 - 251 - // --------------------------------------------------------------------------- 252 - // Result types 253 - // --------------------------------------------------------------------------- 254 - 255 - pub struct SyncSpaceResult { 256 - pub members_processed: usize, 257 - pub total_records_synced: usize, 258 - pub member_results: Vec<MemberSyncResult>, 259 - } 260 - 261 - pub struct MemberSyncResult { 262 - pub did: String, 263 - pub records_synced: usize, 264 - pub error: Option<String>, 265 - } 266 - 267 - pub struct MemberSyncSummary { 268 - pub records_synced: usize, 269 - pub cursor: Option<String>, 270 - } 271 - 272 - #[cfg(test)] 273 - mod tests { 274 - use super::*; 275 - 276 - #[test] 277 - fn extract_rkey_from_at_uri() { 278 - assert_eq!( 279 - extract_rkey("at://did:plc:abc/app.bsky.feed.post/3k2abc"), 280 - "3k2abc" 281 - ); 282 - } 283 - 284 - #[test] 285 - fn extract_rkey_from_empty() { 286 - assert_eq!(extract_rkey(""), ""); 287 - } 288 - 289 - #[test] 290 - fn extract_rkey_no_slash() { 291 - assert_eq!(extract_rkey("singlevalue"), "singlevalue"); 292 - } 293 - }
-41
src/spaces/types.rs
··· 139 139 pub created_at: String, 140 140 } 141 141 142 - #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] 143 - #[serde(rename_all = "lowercase")] 144 - pub enum SyncStatus { 145 - Pending, 146 - Syncing, 147 - Synced, 148 - Error, 149 - } 150 - 151 - impl SyncStatus { 152 - pub fn as_str(&self) -> &'static str { 153 - match self { 154 - SyncStatus::Pending => "pending", 155 - SyncStatus::Syncing => "syncing", 156 - SyncStatus::Synced => "synced", 157 - SyncStatus::Error => "error", 158 - } 159 - } 160 - 161 - pub fn parse(s: &str) -> Option<Self> { 162 - match s { 163 - "pending" => Some(SyncStatus::Pending), 164 - "syncing" => Some(SyncStatus::Syncing), 165 - "synced" => Some(SyncStatus::Synced), 166 - "error" => Some(SyncStatus::Error), 167 - _ => None, 168 - } 169 - } 170 - } 171 - 172 - #[derive(Debug, Clone, Serialize, Deserialize)] 173 - pub struct SpaceSyncState { 174 - pub id: String, 175 - pub space_id: String, 176 - pub member_did: String, 177 - pub cursor: Option<String>, 178 - pub last_synced_at: Option<String>, 179 - pub status: SyncStatus, 180 - pub error: Option<String>, 181 - } 182 - 183 142 #[cfg(test)] 184 143 mod tests { 185 144 use super::*;