slop slop slop sahuuuurrr
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 4030 lines 138 kB view raw
1use axum::{ 2 Json, Router, 3 extract::{Request, State}, 4 http::StatusCode, 5 response::{IntoResponse, Response}, 6 routing::{get, post}, 7}; 8use hydrant::config::Config; 9use hydrant::control::Hydrant; 10use hydrant::deps::futures::StreamExt; 11use jacquard_common::IntoStatic; 12use jacquard_common::types::ident::AtIdentifier; 13use serde::Deserialize; 14use tracing; 15use tracing_subscriber::EnvFilter; 16 17#[derive(Clone)] 18struct AppState { 19 hydrant: Hydrant, 20 bookmarks: fjall::Keyspace, 21 drafts: fjall::Keyspace, 22 preferences: fjall::Keyspace, 23 mutes: fjall::Keyspace, 24 notifications: fjall::Keyspace, 25 seen: fjall::Keyspace, 26 cdn_url: String, 27} 28 29impl AppState { 30 fn cdn(&self, category: &str, did: &str, link: &str) -> String { 31 format!( 32 "{}/img/{}/plain/{}/{}@jpeg", 33 self.cdn_url, category, did, link 34 ) 35 } 36} 37 38#[tokio::main] 39async fn main() -> miette::Result<()> { 40 tracing_subscriber::fmt() 41 .with_env_filter( 42 EnvFilter::from_default_env().add_directive("appview=info".parse().unwrap()), 43 ) 44 .init(); 45 46 hydrant::deps::rustls::crypto::aws_lc_rs::default_provider() 47 .install_default() 48 .ok(); 49 50 let db = fjall::Database::builder(std::path::Path::new("./data")) 51 .open() 52 .unwrap(); 53 let bookmarks = db 54 .keyspace("bookmarks", || fjall::KeyspaceCreateOptions::default()) 55 .unwrap(); 56 let drafts = db 57 .keyspace("drafts", || fjall::KeyspaceCreateOptions::default()) 58 .unwrap(); 59 let preferences = db 60 .keyspace("preferences", || fjall::KeyspaceCreateOptions::default()) 61 .unwrap(); 62 let mutes = db 63 .keyspace("mutes", || fjall::KeyspaceCreateOptions::default()) 64 .unwrap(); 65 let notifications = db 66 .keyspace("notifications", || fjall::KeyspaceCreateOptions::default()) 67 .unwrap(); 68 let seen = db 69 .keyspace("seen", || fjall::KeyspaceCreateOptions::default()) 70 .unwrap(); 71 72 let mut cfg = Config::from_env()?; 73 74 // Enable backlinks 75 cfg.enable_backlinks = true; 76 77 // By default filter mode is used, so we only track explicitly added repos. 78 cfg.full_network = false; 79 80 cfg.filter_collections = Some(vec![ 81 "app.bsky.actor.profile".to_string(), 82 "app.bsky.feed.post".to_string(), 83 "app.bsky.feed.repost".to_string(), 84 "app.bsky.feed.like".to_string(), 85 "app.bsky.feed.generator".to_string(), 86 "app.bsky.graph.follow".to_string(), 87 "app.bsky.graph.block".to_string(), 88 "app.bsky.graph.list".to_string(), 89 "app.bsky.graph.listitem".to_string(), 90 "app.bsky.labeler.service".to_string(), 91 ]); 92 93 let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://cdn.bsky.app".to_string()); 94 95 let hydrant = Hydrant::new(cfg).await?; 96 let hydrant_clone = hydrant.clone(); 97 let app_state = AppState { 98 hydrant: hydrant.clone(), 99 bookmarks, 100 drafts, 101 preferences, 102 mutes, 103 notifications, 104 seen, 105 cdn_url, 106 }; 107 108 if let Ok(seed_account) = std::env::var("SEED_ACCOUNT") { 109 let h = hydrant.clone(); 110 tokio::spawn(async move { 111 seed_account_follows(h, seed_account).await; 112 }); 113 } 114 115 let indexer_state = app_state.clone(); 116 tokio::spawn(async move { 117 notification_indexer(indexer_state).await; 118 }); 119 120 let app = Router::new() 121 .route("/xrpc/app.bsky.feed.getPostThread", get(get_post_thread)) 122 .route("/xrpc/app.bsky.actor.getProfile", get(get_profile)) 123 .route("/xrpc/app.bsky.actor.getProfiles", get(get_profiles)) 124 .route("/xrpc/app.bsky.actor.getPreferences", get(get_preferences)) 125 .route("/xrpc/app.bsky.actor.putPreferences", post(put_preferences)) 126 .route("/xrpc/app.bsky.feed.getAuthorFeed", get(get_author_feed)) 127 .route("/xrpc/app.bsky.feed.getTimeline", get(get_timeline)) 128 .route("/xrpc/app.bsky.feed.getActorLikes", get(get_actor_likes)) 129 .route("/xrpc/app.bsky.feed.getQuotes", get(get_quotes)) 130 .route( 131 "/xrpc/app.bsky.graph.getRelationships", 132 get(get_relationships), 133 ) 134 .route( 135 "/xrpc/app.bsky.graph.getKnownFollowers", 136 get(get_known_followers), 137 ) 138 .route("/xrpc/app.bsky.graph.getBlocks", get(get_blocks)) 139 .route("/xrpc/app.bsky.graph.getMutes", get(get_mutes)) 140 .route("/xrpc/app.bsky.graph.muteActor", post(mute_actor)) 141 .route("/xrpc/app.bsky.graph.unmuteActor", post(unmute_actor)) 142 .route("/xrpc/app.bsky.graph.getList", get(get_list)) 143 .route("/xrpc/app.bsky.graph.getLists", get(get_lists)) 144 .route( 145 "/xrpc/app.bsky.graph.getListsWithMembership", 146 get(get_lists_with_membership), 147 ) 148 .route( 149 "/xrpc/app.bsky.notification.listNotifications", 150 get(list_notifications), 151 ) 152 .route( 153 "/xrpc/app.bsky.notification.getUnreadCount", 154 get(get_unread_count), 155 ) 156 .route("/xrpc/app.bsky.notification.updateSeen", post(update_seen)) 157 .route("/.well-known/did.json", get(get_well_known_did)) 158 .route("/xrpc/app.bsky.feed.getActorFeeds", get(get_actor_feeds)) 159 .route("/xrpc/app.bsky.feed.getFeed", get(get_feed)) 160 .route( 161 "/xrpc/app.bsky.feed.getFeedGenerator", 162 get(get_feed_generator), 163 ) 164 .route( 165 "/xrpc/app.bsky.feed.getFeedGenerators", 166 get(get_feed_generators), 167 ) 168 .route("/xrpc/app.bsky.feed.getLikes", get(get_likes)) 169 .route("/xrpc/app.bsky.feed.getRepostedBy", get(get_reposted_by)) 170 .route("/xrpc/app.bsky.feed.getPosts", get(get_posts)) 171 .route("/xrpc/app.bsky.graph.getFollows", get(get_follows)) 172 .route("/xrpc/app.bsky.graph.getFollowers", get(get_followers)) 173 .route( 174 "/xrpc/app.bsky.unspecced.getAgeAssuranceState", 175 get(get_age_assurance_state), 176 ) 177 .route("/xrpc/app.bsky.bookmark.getBookmarks", get(get_bookmarks)) 178 .route( 179 "/xrpc/app.bsky.bookmark.createBookmark", 180 post(create_bookmark), 181 ) 182 .route( 183 "/xrpc/app.bsky.bookmark.deleteBookmark", 184 post(delete_bookmark), 185 ) 186 .route("/xrpc/app.bsky.draft.getDrafts", get(get_drafts)) 187 .route("/xrpc/app.bsky.draft.createDraft", post(create_draft)) 188 .route("/xrpc/app.bsky.draft.updateDraft", post(update_draft)) 189 .route("/xrpc/app.bsky.draft.deleteDraft", post(delete_draft)) 190 .fallback(proxy_request) 191 .layer(tower_http::cors::CorsLayer::permissive()) 192 .layer(tower_http::trace::TraceLayer::new_for_http()) 193 .with_state(app_state); 194 195 let port = std::env::var("PORT") 196 .ok() 197 .and_then(|s| s.parse().ok()) 198 .unwrap_or(8000); 199 let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)) 200 .await 201 .unwrap(); 202 tracing::info!("appview listening on {}", listener.local_addr().unwrap()); 203 204 tokio::select! { 205 _ = axum::serve(listener, app) => {}, 206 r = hydrant_clone.run()? => { r?; }, 207 } 208 209 Ok(()) 210} 211 212async fn proxy_request(req: Request) -> Result<Response, StatusCode> { 213 tracing::info!("Proxying request: {} {}", req.method(), req.uri()); 214 let client = reqwest::Client::new(); 215 let uri = req.uri(); 216 let mut url = format!("https://public.api.bsky.app{}", uri.path()); 217 if let Some(query) = uri.query() { 218 url.push('?'); 219 url.push_str(query); 220 } 221 222 let mut req_builder = client.request(req.method().clone(), url); 223 for (name, value) in req.headers() { 224 if name != reqwest::header::HOST { 225 req_builder = req_builder.header(name.clone(), value.clone()); 226 } 227 } 228 229 // Also proxy the body if it exists 230 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 231 .await 232 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 233 if !body_bytes.is_empty() { 234 req_builder = req_builder.body(body_bytes); 235 } 236 237 let req = req_builder 238 .build() 239 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 240 let res = client 241 .execute(req) 242 .await 243 .map_err(|_| StatusCode::BAD_GATEWAY)?; 244 245 let mut axum_res = Response::builder().status(res.status()); 246 for (name, value) in res.headers() { 247 axum_res = axum_res.header(name, value); 248 } 249 let body = axum::body::Body::from_stream(res.bytes_stream()); 250 axum_res 251 .body(body) 252 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) 253} 254 255async fn seed_account_follows(hydrant: Hydrant, seed_account: String) { 256 tracing::info!("Seeding account follows for {}", seed_account); 257 let ident = match AtIdentifier::new(&seed_account) { 258 Ok(i) => i, 259 Err(e) => { 260 tracing::error!("Invalid seed account identifier {}: {}", seed_account, e); 261 return; 262 } 263 }; 264 let repo = match hydrant.repos.resolve(&ident).await { 265 Ok(r) => r, 266 Err(e) => { 267 tracing::error!("Could not resolve seed account {}: {}", seed_account, e); 268 return; 269 } 270 }; 271 let seed_did = repo.did.clone(); 272 273 // Use the now-public resolver to get the PDS endpoint 274 let doc = match hydrant.resolver().resolve_doc(&seed_did).await { 275 Ok(d) => d, 276 Err(e) => { 277 tracing::warn!( 278 "Could not resolve DID doc for seed account {}: {}", 279 seed_did, 280 e 281 ); 282 return; 283 } 284 }; 285 286 let pds_url = doc.pds.as_str(); 287 288 let mut cursor = None; 289 let mut dids_to_track = Vec::new(); 290 dids_to_track.push(seed_did.clone().into_static()); 291 292 loop { 293 let mut list_url = format!( 294 "{pds_url}xrpc/com.atproto.repo.listRecords?repo={seed_did}&collection=app.bsky.graph.follow&limit=100", 295 ); 296 if let Some(ref c) = cursor { 297 list_url.push_str(&format!("&cursor={}", c)); 298 } 299 300 let res = match reqwest::get(&list_url).await { 301 Ok(r) => r, 302 Err(e) => { 303 tracing::error!("Failed to fetch follows from {list_url}: {e}"); 304 break; 305 } 306 }; 307 let json = match res.json::<serde_json::Value>().await { 308 Ok(j) => j, 309 Err(e) => { 310 tracing::error!("Failed to parse follows JSON from {list_url}: {e}"); 311 break; 312 } 313 }; 314 315 if let Some(records) = json.get("records").and_then(|r| r.as_array()) { 316 for record in records { 317 if let Some(did_str) = record 318 .get("value") 319 .and_then(|v| v.get("subject")) 320 .and_then(|s| s.as_str()) 321 { 322 if let Ok(did) = jacquard_common::types::string::Did::new(did_str) { 323 dids_to_track.push(did.into_static()); 324 } 325 } 326 } 327 } 328 329 if let Some(c) = json.get("cursor").and_then(|c| c.as_str()) { 330 cursor = Some(c.to_string()); 331 } else { 332 break; 333 } 334 335 if dids_to_track.len() > 5000 { 336 break; 337 } 338 } 339 340 tracing::info!("Tracking {} repos from seed", dids_to_track.len()); 341 let iter: Vec<_> = dids_to_track 342 .iter() 343 .map(|d| jacquard_common::types::string::Did::new(d.as_str()).unwrap()) 344 .collect(); 345 let _ = hydrant.repos.track(iter).await; 346} 347 348#[derive(Deserialize)] 349struct GetPostThreadParams { 350 uri: String, 351 #[serde(default)] 352 depth: Option<usize>, 353 #[serde(default, rename = "parentHeight")] 354 parent_height: Option<usize>, 355} 356 357async fn get_post_thread( 358 State(app_state): State<AppState>, 359 req: Request, 360) -> Result<Response, StatusCode> { 361 // let hydrant = &app_state.hydrant; 362 let query_str = req.uri().query().unwrap_or(""); 363 let params = serde_urlencoded::from_str::<GetPostThreadParams>(query_str).map_err(|e| { 364 tracing::error!("failed to parse query params: {e}"); 365 StatusCode::BAD_REQUEST 366 })?; 367 368 let viewer_did = get_auth_did(&req); 369 match get_thread_view_post( 370 &app_state, 371 &params.uri, 372 params.depth.unwrap_or(6), 373 params.parent_height.unwrap_or(0), 374 viewer_did.as_deref(), 375 ) 376 .await 377 { 378 Ok(thread) => Ok(Json(serde_json::json!({ "thread": thread })).into_response()), 379 Err(_) => proxy_request(req).await, 380 } 381} 382 383fn profile_to_basic(full: serde_json::Value) -> serde_json::Value { 384 let mut basic = serde_json::json!({ 385 "did": full["did"], 386 "handle": full["handle"], 387 }); 388 if let Some(v) = full.get("displayName") { 389 basic["displayName"] = v.clone(); 390 } 391 if let Some(v) = full.get("avatar") { 392 basic["avatar"] = v.clone(); 393 } 394 if let Some(v) = full.get("viewer") { 395 basic["viewer"] = v.clone(); 396 } 397 if let Some(v) = full.get("labels") { 398 basic["labels"] = v.clone(); 399 } 400 if let Some(v) = full.get("associated") { 401 basic["associated"] = v.clone(); 402 } 403 if let Some(v) = full.get("createdAt") { 404 basic["createdAt"] = v.clone(); 405 } 406 basic 407} 408 409async fn get_post_view( 410 app_state: &AppState, 411 uri_str: &str, 412 viewer_did: Option<&str>, 413) -> Result<serde_json::Value, StatusCode> { 414 let uri = 415 jacquard_common::types::string::AtUri::new(uri_str).map_err(|_| StatusCode::BAD_REQUEST)?; 416 let author_ident = uri.authority(); 417 let collection = uri 418 .collection() 419 .ok_or(StatusCode::BAD_REQUEST)? 420 .as_str() 421 .to_string(); 422 let rkey = uri 423 .rkey() 424 .ok_or(StatusCode::BAD_REQUEST)? 425 .0 426 .as_str() 427 .to_string(); 428 429 let repo = app_state 430 .hydrant 431 .repos 432 .resolve(author_ident) 433 .await 434 .map_err(|_| StatusCode::NOT_FOUND)?; 435 let record = repo 436 .get_record(&collection, &rkey) 437 .await 438 .map_err(|_| StatusCode::NOT_FOUND)? 439 .ok_or(StatusCode::NOT_FOUND)?; 440 441 let author_profile = get_profile_internal(&app_state, repo.did.as_str(), viewer_did).await?; 442 443 let mut viewer_state = serde_json::json!({ 444 "muted": false, 445 "blockedBy": false, 446 }); 447 if let Some(viewer) = viewer_did { 448 let like = app_state 449 .hydrant 450 .backlinks 451 .fetch(uri_str.to_string()) 452 .source("app.bsky.feed.like") 453 .dids(vec![viewer.to_string()]) 454 .limit(1) 455 .run() 456 .await 457 .ok() 458 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); 459 460 let repost = app_state 461 .hydrant 462 .backlinks 463 .fetch(uri_str.to_string()) 464 .source("app.bsky.feed.repost") 465 .dids(vec![viewer.to_string()]) 466 .limit(1) 467 .run() 468 .await 469 .ok() 470 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); 471 472 if let Some(l) = like { 473 viewer_state["like"] = serde_json::json!(l); 474 } 475 if let Some(r) = repost { 476 viewer_state["repost"] = serde_json::json!(r); 477 } 478 } 479 480 let mut val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); 481 if val_json.get("$type").is_none() { 482 val_json["$type"] = serde_json::json!("app.bsky.feed.post"); 483 } 484 485 let created_at = val_json 486 .get("createdAt") 487 .and_then(|v| v.as_str()) 488 .unwrap_or(""); 489 490 let mut post_view = serde_json::json!({ 491 "$type": "app.bsky.feed.defs#postView", 492 "uri": uri_str, 493 "cid": record.cid.to_string(), 494 "author": profile_to_basic(author_profile), 495 "record": val_json, 496 "replyCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.post").run().await.unwrap_or(0), 497 "repostCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.repost").run().await.unwrap_or(0), 498 "likeCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.like").run().await.unwrap_or(0), 499 "quoteCount": 0, 500 "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at.to_string() }, 501 "viewer": viewer_state, 502 "labels": [], 503 }); 504 505 if let Some(record_embed) = val_json.get("embed") { 506 let t = record_embed 507 .get("$type") 508 .and_then(|v| v.as_str()) 509 .unwrap_or(""); 510 if t == "app.bsky.embed.images" { 511 if let Some(images) = record_embed.get("images").and_then(|v| v.as_array()) { 512 let mut view_images = Vec::new(); 513 for img in images { 514 let link = img 515 .get("image") 516 .and_then(|v| v.get("ref")) 517 .and_then(|v| v.get("$link")) 518 .and_then(|v| v.as_str()) 519 .unwrap_or(""); 520 let thumb = app_state.cdn("feed_thumbnail", repo.did.as_str(), link); 521 let fullsize = app_state.cdn("feed_fullsize", repo.did.as_str(), link); 522 view_images.push(serde_json::json!({ 523 "thumb": thumb, 524 "fullsize": fullsize, 525 "alt": img.get("alt").and_then(|v| v.as_str()).unwrap_or(""), 526 "aspectRatio": img.get("aspectRatio"), 527 })); 528 } 529 post_view["embed"] = serde_json::json!({ 530 "$type": "app.bsky.embed.images#view", 531 "images": view_images, 532 }); 533 } 534 } else if t == "app.bsky.embed.record" { 535 if let Some(quoted_uri) = record_embed 536 .get("record") 537 .and_then(|v| v.get("uri")) 538 .and_then(|v| v.as_str()) 539 { 540 // For now, let's just do a shallow hydration of the quoted post to avoid infinite recursion 541 // We use None for viewer_did here to simplify and avoid auth-loops 542 if let Ok(quoted_post) = Box::pin(get_post_view(app_state, quoted_uri, None)).await 543 { 544 post_view["embed"] = serde_json::json!({ 545 "$type": "app.bsky.embed.record#view", 546 "record": { 547 "$type": "app.bsky.embed.record#viewRecord", 548 "uri": quoted_post["uri"], 549 "cid": quoted_post["cid"], 550 "author": quoted_post["author"], 551 "value": quoted_post["record"], 552 "labels": quoted_post["labels"], 553 "indexedAt": quoted_post["indexedAt"], 554 "embeds": quoted_post.get("embed").map(|e| vec![e]).unwrap_or_default(), 555 "likeCount": quoted_post["likeCount"], 556 "repostCount": quoted_post["repostCount"], 557 "replyCount": quoted_post["replyCount"], 558 "quoteCount": quoted_post["quoteCount"], 559 } 560 }); 561 } 562 } 563 } 564 } 565 566 Ok(post_view) 567} 568 569async fn get_thread_view_post( 570 app_state: &AppState, 571 uri_str: &str, 572 depth: usize, 573 parent_height: usize, 574 viewer_did: Option<&str>, 575) -> Result<serde_json::Value, StatusCode> { 576 let post = get_post_view(&app_state, uri_str, viewer_did).await?; 577 let mut thread = serde_json::json!({ 578 "$type": "app.bsky.feed.defs#threadViewPost", 579 "post": post, 580 }); 581 582 if parent_height > 0 { 583 if let Some(reply) = post.get("record").and_then(|r| r.get("reply")) { 584 if let Some(parent_uri) = reply 585 .get("parent") 586 .and_then(|p| p.get("uri")) 587 .and_then(|u| u.as_str()) 588 { 589 match Box::pin(get_thread_view_post( 590 app_state, 591 parent_uri, 592 0, 593 parent_height - 1, 594 viewer_did, 595 )) 596 .await 597 { 598 Ok(parent_thread) => { 599 thread 600 .as_object_mut() 601 .unwrap() 602 .insert("parent".to_string(), parent_thread); 603 } 604 Err(e) => { 605 tracing::warn!("failed to fetch parent thread {parent_uri}: {e}"); 606 } 607 } 608 } 609 } 610 } 611 612 if depth > 0 { 613 let replies_page = app_state 614 .hydrant 615 .backlinks 616 .fetch(uri_str.to_string()) 617 .source("app.bsky.feed.post") 618 .limit(50) 619 .run() 620 .await 621 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 622 let mut replies = Vec::new(); 623 for bl in replies_page.backlinks { 624 match Box::pin(get_thread_view_post( 625 app_state, 626 bl.uri.as_str(), 627 depth - 1, 628 0, 629 viewer_did, 630 )) 631 .await 632 { 633 Ok(reply_thread) => { 634 replies.push(reply_thread); 635 } 636 Err(e) => { 637 tracing::warn!("failed to fetch reply thread {}: {e}", bl.uri); 638 } 639 } 640 } 641 thread 642 .as_object_mut() 643 .unwrap() 644 .insert("replies".to_string(), serde_json::Value::Array(replies)); 645 } 646 647 Ok(thread) 648} 649 650#[derive(Deserialize)] 651struct GetProfileParams { 652 actor: String, 653} 654 655async fn get_profile( 656 State(app_state): State<AppState>, 657 req: Request, 658) -> Result<Response, StatusCode> { 659 // let hydrant = &app_state.hydrant; 660 let query_str = req.uri().query().unwrap_or(""); 661 let params = serde_urlencoded::from_str::<GetProfileParams>(query_str).map_err(|e| { 662 tracing::error!("failed to parse query params: {e}"); 663 StatusCode::BAD_REQUEST 664 })?; 665 666 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 667 tracing::error!("failed to parse actor identifier: {e}"); 668 StatusCode::BAD_REQUEST 669 })?; 670 671 let viewer_did = get_auth_did(&req); 672 match app_state.hydrant.repos.resolve(&ident).await { 673 Ok(repo) => { 674 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 675 Ok(profile) => return Ok(Json(profile).into_response()), 676 Err(e) => { 677 tracing::warn!("failed to get profile for {}: {e}", repo.did); 678 } 679 } 680 } 681 Err(e) => { 682 tracing::error!("failed to resolve actor {}: {e}", params.actor); 683 } 684 } 685 686 proxy_request(req).await 687} 688 689async fn get_profile_internal( 690 app_state: &AppState, 691 did_str: &str, 692 viewer_did: Option<&str>, 693) -> Result<serde_json::Value, StatusCode> { 694 let did = 695 jacquard_common::types::string::Did::new(did_str).map_err(|_| StatusCode::BAD_REQUEST)?; 696 let repo = app_state.hydrant.repos.get(&did); 697 698 let info = repo 699 .info() 700 .await 701 .map_err(|_| StatusCode::NOT_FOUND)? 702 .ok_or(StatusCode::NOT_FOUND)?; 703 if !info.tracked { 704 return Err(StatusCode::NOT_FOUND); 705 } 706 707 let profile_record = repo 708 .get_record("app.bsky.actor.profile", "self") 709 .await 710 .ok() 711 .flatten(); 712 713 let followers_count = app_state 714 .hydrant 715 .backlinks 716 .count(did_str.to_string()) 717 .source("app.bsky.graph.follow") 718 .run() 719 .await 720 .unwrap_or(0); 721 let follows_count = repo 722 .count_records("app.bsky.graph.follow") 723 .await 724 .unwrap_or(0); 725 let posts_count = repo.count_records("app.bsky.feed.post").await.unwrap_or(0); 726 727 let handle = info 728 .handle 729 .map(|h| h.to_string()) 730 .unwrap_or_else(|| did_str.to_string()); 731 732 let mut viewer_state = serde_json::json!({ 733 "muted": false, 734 "blockedBy": false, 735 }); 736 if let Some(viewer) = viewer_did { 737 if viewer != did_str { 738 // following: does viewer follow did_str? 739 let following = app_state 740 .hydrant 741 .backlinks 742 .fetch(did_str.to_string()) 743 .source("app.bsky.graph.follow") 744 .dids(vec![viewer.to_string()]) 745 .limit(1) 746 .run() 747 .await 748 .ok() 749 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); 750 751 // followedBy: does did_str follow viewer? 752 let followed_by = app_state 753 .hydrant 754 .backlinks 755 .fetch(viewer.to_string()) 756 .source("app.bsky.graph.follow") 757 .dids(vec![did_str.to_string()]) 758 .limit(1) 759 .run() 760 .await 761 .ok() 762 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); 763 764 if let Some(f) = following { 765 viewer_state["following"] = serde_json::json!(f); 766 } 767 if let Some(fb) = followed_by { 768 viewer_state["followedBy"] = serde_json::json!(fb); 769 } 770 771 // blocking: URI of viewer's block record targeting did_str 772 let blocking = app_state 773 .hydrant 774 .backlinks 775 .fetch(did_str.to_string()) 776 .source("app.bsky.graph.block") 777 .dids(vec![viewer.to_string()]) 778 .limit(1) 779 .run() 780 .await 781 .ok() 782 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); 783 if let Some(b) = blocking { 784 viewer_state["blocking"] = serde_json::json!(b); 785 } 786 787 // blockedBy: does did_str block viewer? 788 let blocked_by = app_state 789 .hydrant 790 .backlinks 791 .count(viewer.to_string()) 792 .source("app.bsky.graph.block") 793 .dids(vec![did_str.to_string()]) 794 .run() 795 .await 796 .unwrap_or(0) 797 > 0; 798 if blocked_by { 799 viewer_state["blockedBy"] = serde_json::json!(true); 800 } 801 802 // muted: check local fjall mutes 803 let mute_key = format!("mute:{}:{}", viewer, did_str); 804 if app_state 805 .mutes 806 .get(mute_key.as_bytes()) 807 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 808 .is_some() 809 { 810 viewer_state["muted"] = serde_json::json!(true); 811 } 812 } 813 } 814 815 let mut profile = serde_json::json!({ 816 "did": did_str, 817 "handle": handle, 818 "followersCount": followers_count, 819 "followsCount": follows_count, 820 "postsCount": posts_count, 821 "indexedAt": chrono::Utc::now().to_rfc3339(), 822 "viewer": viewer_state, 823 "labels": [], 824 "associated": { 825 "chat": { "allowIncoming": "all" } 826 } 827 }); 828 829 if let Some(rec) = profile_record { 830 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 831 if let Some(obj) = profile.as_object_mut() { 832 if let Some(display_name) = value.get("displayName") { 833 obj.insert("displayName".to_string(), display_name.clone()); 834 } 835 if let Some(description) = value.get("description") { 836 obj.insert("description".to_string(), description.clone()); 837 } 838 if let Some(avatar) = value.get("avatar") { 839 let link = avatar 840 .get("ref") 841 .and_then(|v| v.get("$link")) 842 .and_then(|v| v.as_str()) 843 .unwrap_or(""); 844 obj.insert( 845 "avatar".to_string(), 846 serde_json::json!(app_state.cdn("avatar", did_str, link)), 847 ); 848 } 849 if let Some(banner) = value.get("banner") { 850 let link = banner 851 .get("ref") 852 .and_then(|v| v.get("$link")) 853 .and_then(|v| v.as_str()) 854 .unwrap_or(""); 855 obj.insert( 856 "banner".to_string(), 857 serde_json::json!(app_state.cdn("banner", did_str, link)), 858 ); 859 } 860 if let Some(created_at) = value.get("createdAt") { 861 obj.insert("createdAt".to_string(), created_at.clone()); 862 } 863 } 864 } 865 866 Ok(profile) 867} 868#[derive(Deserialize)] 869struct GetAuthorFeedParams { 870 actor: String, 871 #[serde(default)] 872 limit: Option<usize>, 873 #[serde(default)] 874 cursor: Option<String>, 875 #[serde(default)] 876 filter: Option<String>, 877 #[serde(default, rename = "includePins")] 878 include_pins: Option<bool>, 879} 880 881async fn get_author_feed( 882 State(app_state): State<AppState>, 883 req: Request, 884) -> Result<Response, StatusCode> { 885 // let hydrant = &app_state.hydrant; 886 let query_str = req.uri().query().unwrap_or(""); 887 let params = serde_urlencoded::from_str::<GetAuthorFeedParams>(query_str).map_err(|e| { 888 tracing::error!("failed to parse query params: {e}"); 889 StatusCode::BAD_REQUEST 890 })?; 891 892 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 893 tracing::error!("failed to parse actor identifier: {e}"); 894 StatusCode::BAD_REQUEST 895 })?; 896 897 let repo = match app_state.hydrant.repos.resolve(&ident).await { 898 Ok(repo) => repo, 899 Err(e) => { 900 tracing::error!("failed to resolve actor {}: {e}", params.actor); 901 return proxy_request(req).await; 902 } 903 }; 904 let did = repo.did.clone(); 905 906 let limit = params.limit.unwrap_or(50).min(100); 907 908 // Note: Official AppView uses timestamp cursors. 909 // Our local index uses rkeys for list_records. 910 let rkey_cursor = if let Some(c) = &params.cursor { 911 if c.len() < 20 { Some(c.as_str()) } else { None } 912 } else { 913 None 914 }; 915 916 // Get posts 917 let posts_list = match repo 918 .list_records("app.bsky.feed.post", limit, false, rkey_cursor) 919 .await 920 { 921 Ok(rl) => rl, 922 Err(e) => { 923 tracing::error!("failed to list posts for {}: {e}", did); 924 return proxy_request(req).await; 925 } 926 }; 927 928 // Get reposts 929 let reposts_list = match repo 930 .list_records("app.bsky.feed.repost", limit, false, rkey_cursor) 931 .await 932 { 933 Ok(rl) => rl, 934 Err(e) => { 935 tracing::error!("failed to list reposts for {}: {e}", did); 936 return proxy_request(req).await; 937 } 938 }; 939 940 let viewer_did = get_auth_did(&req); 941 let author_profile = 942 match get_profile_internal(&app_state, did.as_str(), viewer_did.as_deref()).await { 943 Ok(p) => p, 944 Err(e) => { 945 tracing::error!("failed to get profile for {}: {e}", did); 946 return proxy_request(req).await; 947 } 948 }; 949 950 let filter = params.filter.as_deref().unwrap_or("posts_with_replies"); 951 952 let mut all_items = Vec::new(); 953 for rec in posts_list.records { 954 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 955 let created_at = val_json 956 .get("createdAt") 957 .and_then(|v| v.as_str()) 958 .unwrap_or("") 959 .to_string(); 960 961 // Filter logic 962 if filter == "posts_no_replies" && val_json.get("reply").is_some() { 963 continue; 964 } 965 966 if filter == "posts_and_author_threads" { 967 if let Some(reply) = val_json.get("reply") { 968 let root_uri_str = reply 969 .get("root") 970 .and_then(|r| r.get("uri")) 971 .and_then(|u| u.as_str()) 972 .unwrap_or(""); 973 if let Ok(root_uri) = jacquard_common::types::string::AtUri::new(root_uri_str) { 974 if root_uri.authority().as_str() != did.as_str() { 975 continue; 976 } 977 } else { 978 continue; 979 } 980 } 981 } 982 983 all_items.push((created_at, "post", rec)); 984 } 985 for rec in reposts_list.records { 986 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 987 let created_at = val_json 988 .get("createdAt") 989 .and_then(|v| v.as_str()) 990 .unwrap_or("") 991 .to_string(); 992 all_items.push((created_at, "repost", rec)); 993 } 994 995 // Sort by createdAt descending 996 all_items.sort_by(|a, b| b.0.cmp(&a.0)); 997 all_items.truncate(limit); 998 999 let mut feed = Vec::new(); 1000 1001 // Handle Pins 1002 let mut pinned_uri_opt = None; 1003 if params.include_pins.unwrap_or(false) && params.cursor.is_none() { 1004 if let Ok(Some(profile_rec)) = repo.get_record("app.bsky.actor.profile", "self").await { 1005 let prof_val = serde_json::to_value(profile_rec.value).unwrap_or(serde_json::json!({})); 1006 if let Some(pinned_uri) = prof_val.get("pinnedPost").and_then(|v| v.as_str()) { 1007 if let Ok(post_view) = 1008 get_post_view(&app_state, pinned_uri, viewer_did.as_deref()).await 1009 { 1010 feed.push(serde_json::json!({ 1011 "post": post_view, 1012 })); 1013 pinned_uri_opt = Some(pinned_uri.to_string()); 1014 } 1015 } 1016 } 1017 } 1018 1019 for (created_at, kind, rec) in &all_items { 1020 if *kind == "post" { 1021 let uri = format!( 1022 "at://{}/app.bsky.feed.post/{}", 1023 did.as_str(), 1024 rec.rkey.as_str() 1025 ); 1026 1027 // Skip if it was already included as a pin 1028 if Some(&uri) == pinned_uri_opt.as_ref() { 1029 continue; 1030 } 1031 1032 let post = match get_post_view(&app_state, &uri, viewer_did.as_deref()).await { 1033 Ok(mut p) => { 1034 p["author"] = profile_to_basic(author_profile.clone()); 1035 p 1036 } 1037 Err(_) => { 1038 let mut val_json = 1039 serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 1040 if val_json.get("$type").is_none() { 1041 val_json["$type"] = serde_json::json!("app.bsky.feed.post"); 1042 } 1043 serde_json::json!({ 1044 "$type": "app.bsky.feed.defs#postView", 1045 "uri": uri, 1046 "cid": rec.cid.to_string(), 1047 "author": profile_to_basic(author_profile.clone()), 1048 "record": val_json, 1049 "replyCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.post").run().await.unwrap_or(0), 1050 "repostCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.repost").run().await.unwrap_or(0), 1051 "likeCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0), 1052 "quoteCount": 0, 1053 "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at.clone() }, 1054 "viewer": { "muted": false, "blockedBy": false }, 1055 "labels": [], 1056 }) 1057 } 1058 }; 1059 1060 let mut feed_item = serde_json::json!({ 1061 "post": post, 1062 }); 1063 1064 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 1065 // Add reply context if it's a reply 1066 if let Some(reply) = val_json.get("reply") { 1067 if let Some(parent_uri) = reply 1068 .get("parent") 1069 .and_then(|p| p.get("uri")) 1070 .and_then(|u| u.as_str()) 1071 { 1072 if let Ok(parent_post) = 1073 get_post_view(&app_state, parent_uri, viewer_did.as_deref()).await 1074 { 1075 let root_uri = reply 1076 .get("root") 1077 .and_then(|p| p.get("uri")) 1078 .and_then(|u| u.as_str()) 1079 .unwrap_or(parent_uri); 1080 if let Ok(root_post) = 1081 get_post_view(&app_state, root_uri, viewer_did.as_deref()).await 1082 { 1083 feed_item["reply"] = serde_json::json!({ 1084 "root": root_post, 1085 "parent": parent_post, 1086 }); 1087 } 1088 } 1089 } 1090 } 1091 1092 feed.push(feed_item); 1093 } else if *kind == "repost" { 1094 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 1095 if let Some(subject_uri) = val_json 1096 .get("subject") 1097 .and_then(|s| s.get("uri")) 1098 .and_then(|u| u.as_str()) 1099 { 1100 if let Ok(post_view) = 1101 get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await 1102 { 1103 feed.push(serde_json::json!({ 1104 "post": post_view, 1105 "reason": { 1106 "$type": "app.bsky.feed.defs#reasonRepost", 1107 "by": profile_to_basic(author_profile.clone()), 1108 "indexedAt": created_at, 1109 } 1110 })); 1111 } 1112 } 1113 } 1114 } 1115 1116 let next_cursor = all_items.last().map(|i| i.2.rkey.as_str().to_string()); 1117 1118 feed.reverse(); 1119 1120 Ok(Json(serde_json::json!({ 1121 "feed": feed, 1122 "cursor": next_cursor 1123 })) 1124 .into_response()) 1125} 1126 1127#[derive(Deserialize)] 1128struct GetLikesParams { 1129 uri: String, 1130 #[serde(default)] 1131 limit: Option<usize>, 1132 #[serde(default)] 1133 cursor: Option<String>, 1134} 1135 1136async fn get_likes( 1137 State(app_state): State<AppState>, 1138 req: Request, 1139) -> Result<Response, StatusCode> { 1140 // let hydrant = &app_state.hydrant; 1141 let query_str = req.uri().query().unwrap_or(""); 1142 let params = serde_urlencoded::from_str::<GetLikesParams>(query_str).map_err(|e| { 1143 tracing::error!("failed to parse query params: {e}"); 1144 StatusCode::BAD_REQUEST 1145 })?; 1146 1147 let limit = params.limit.unwrap_or(50).min(100); 1148 1149 let mut fetch = app_state 1150 .hydrant 1151 .backlinks 1152 .fetch(params.uri.clone()) 1153 .source("app.bsky.feed.like") 1154 .limit(limit); 1155 if let Some(cursor_str) = params.cursor { 1156 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { 1157 Ok(c) => { 1158 fetch = fetch.cursor(c); 1159 } 1160 Err(e) => { 1161 tracing::warn!("failed to decode cursor {cursor_str}: {e}"); 1162 } 1163 } 1164 } 1165 1166 let backlinks_page = match fetch.run().await { 1167 Ok(bp) => bp, 1168 Err(e) => { 1169 tracing::error!("failed to fetch backlinks for {}: {e}", params.uri); 1170 return proxy_request(req).await; 1171 } 1172 }; 1173 1174 let mut likes = Vec::new(); 1175 let viewer_did = get_auth_did(&req); 1176 for bl in backlinks_page.backlinks { 1177 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { 1178 Ok(uri) => uri, 1179 Err(e) => { 1180 tracing::warn!("failed to parse uri {}: {e}", bl.uri); 1181 continue; 1182 } 1183 }; 1184 let author_ident = uri.authority(); 1185 1186 let profile = 1187 match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref()) 1188 .await 1189 { 1190 Ok(p) => p, 1191 Err(e) => { 1192 tracing::warn!("failed to get profile for {author_ident}: {e}"); 1193 continue; 1194 } 1195 }; 1196 let repo = match app_state.hydrant.repos.resolve(author_ident).await { 1197 Ok(repo) => repo, 1198 Err(e) => { 1199 tracing::warn!("failed to resolve actor {author_ident}: {e}"); 1200 continue; 1201 } 1202 }; 1203 let collection = uri.collection().unwrap().as_str(); 1204 let rkey = uri.rkey().unwrap().0.as_str(); 1205 1206 let record = match repo.get_record(collection, rkey).await { 1207 Ok(Some(record)) => record, 1208 Ok(None) => { 1209 tracing::warn!("record not found: {collection}/{rkey}"); 1210 continue; 1211 } 1212 Err(e) => { 1213 tracing::warn!("failed to get record {collection}/{rkey}: {e}"); 1214 continue; 1215 } 1216 }; 1217 let value = serde_json::to_value(record.value).unwrap_or(serde_json::json!({})); 1218 let created_at = value 1219 .get("createdAt") 1220 .and_then(|v| v.as_str()) 1221 .unwrap_or("") 1222 .to_string(); 1223 1224 likes.push(serde_json::json!({ 1225 "actor": profile, 1226 "createdAt": created_at, 1227 "indexedAt": chrono::Utc::now().to_rfc3339(), 1228 })); 1229 } 1230 1231 let cursor = backlinks_page 1232 .next_cursor 1233 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); 1234 1235 Ok(Json(serde_json::json!({ 1236 "uri": params.uri, 1237 "likes": likes, 1238 "cursor": cursor, 1239 })) 1240 .into_response()) 1241} 1242 1243async fn get_reposted_by( 1244 State(app_state): State<AppState>, 1245 req: Request, 1246) -> Result<Response, StatusCode> { 1247 // let hydrant = &app_state.hydrant; 1248 let query_str = req.uri().query().unwrap_or(""); 1249 let params = serde_urlencoded::from_str::<GetLikesParams>(query_str).map_err(|e| { 1250 tracing::error!("failed to parse query params: {e}"); 1251 StatusCode::BAD_REQUEST 1252 })?; 1253 1254 let limit = params.limit.unwrap_or(50).min(100); 1255 1256 let mut fetch = app_state 1257 .hydrant 1258 .backlinks 1259 .fetch(params.uri.clone()) 1260 .source("app.bsky.feed.repost") 1261 .limit(limit); 1262 if let Some(cursor_str) = params.cursor { 1263 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { 1264 Ok(c) => { 1265 fetch = fetch.cursor(c); 1266 } 1267 Err(e) => { 1268 tracing::warn!("failed to decode cursor {cursor_str}: {e}"); 1269 } 1270 } 1271 } 1272 1273 let backlinks_page = match fetch.run().await { 1274 Ok(bp) => bp, 1275 Err(e) => { 1276 tracing::error!("failed to fetch backlinks for {}: {e}", params.uri); 1277 return proxy_request(req).await; 1278 } 1279 }; 1280 1281 let mut reposted_by = Vec::new(); 1282 let viewer_did = get_auth_did(&req); 1283 for bl in backlinks_page.backlinks { 1284 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { 1285 Ok(uri) => uri, 1286 Err(e) => { 1287 tracing::warn!("failed to parse uri {}: {e}", bl.uri); 1288 continue; 1289 } 1290 }; 1291 let author_ident = uri.authority(); 1292 let profile = 1293 match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref()) 1294 .await 1295 { 1296 Ok(p) => p, 1297 Err(e) => { 1298 tracing::warn!("failed to get profile for {author_ident}: {e}"); 1299 continue; 1300 } 1301 }; 1302 reposted_by.push(profile); 1303 } 1304 1305 let cursor = backlinks_page 1306 .next_cursor 1307 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); 1308 1309 Ok(Json(serde_json::json!({ 1310 "uri": params.uri, 1311 "repostedBy": reposted_by, 1312 "cursor": cursor, 1313 })) 1314 .into_response()) 1315} 1316 1317fn extract_query_array(query: &str, key: &str) -> Vec<String> { 1318 let mut res = Vec::new(); 1319 let prefix1 = format!("{}=", key); 1320 let prefix2 = format!("{}[]=", key); 1321 for part in query.split('&') { 1322 let val = part 1323 .strip_prefix(&prefix1) 1324 .or_else(|| part.strip_prefix(&prefix2)); 1325 if let Some(v) = val { 1326 if let Ok(decoded) = urlencoding::decode(v) { 1327 res.push(decoded.into_owned()); 1328 } 1329 } 1330 } 1331 res 1332} 1333 1334async fn get_profiles( 1335 State(app_state): State<AppState>, 1336 req: Request, 1337) -> Result<Response, StatusCode> { 1338 // let hydrant = &app_state.hydrant; 1339 let query_str = req.uri().query().unwrap_or(""); 1340 let actors = extract_query_array(query_str, "actors"); 1341 if actors.is_empty() { 1342 return proxy_request(req).await; 1343 } 1344 1345 let viewer_did = get_auth_did(&req); 1346 let mut profiles = Vec::new(); 1347 for actor in actors { 1348 let ident = match AtIdentifier::new(&actor) { 1349 Ok(ident) => ident, 1350 Err(e) => { 1351 tracing::warn!("failed to parse actor identifier {actor}: {e}"); 1352 continue; 1353 } 1354 }; 1355 match app_state.hydrant.repos.resolve(&ident).await { 1356 Ok(repo) => { 1357 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()) 1358 .await 1359 { 1360 Ok(profile) => profiles.push(profile), 1361 Err(e) => tracing::warn!("failed to get profile for {}: {e}", repo.did), 1362 } 1363 } 1364 Err(e) => tracing::warn!("failed to resolve actor {actor}: {e}"), 1365 } 1366 } 1367 1368 Ok(Json(serde_json::json!({ "profiles": profiles })).into_response()) 1369} 1370 1371async fn get_posts( 1372 State(app_state): State<AppState>, 1373 req: Request, 1374) -> Result<Response, StatusCode> { 1375 // let hydrant = &app_state.hydrant; 1376 let query_str = req.uri().query().unwrap_or(""); 1377 let uris = extract_query_array(query_str, "uris"); 1378 if uris.is_empty() { 1379 return proxy_request(req).await; 1380 } 1381 1382 let viewer_did = get_auth_did(&req); 1383 let mut posts = Vec::new(); 1384 for uri in uris { 1385 match get_post_view(&app_state, &uri, viewer_did.as_deref()).await { 1386 Ok(post) => posts.push(post), 1387 Err(e) => tracing::warn!("failed to get post view for {uri}: {e}"), 1388 } 1389 } 1390 1391 Ok(Json(serde_json::json!({ "posts": posts })).into_response()) 1392} 1393 1394#[derive(Deserialize)] 1395struct GetFollowsParams { 1396 actor: String, 1397 #[serde(default)] 1398 limit: Option<usize>, 1399 #[serde(default)] 1400 cursor: Option<String>, 1401} 1402 1403async fn get_follows( 1404 State(app_state): State<AppState>, 1405 req: Request, 1406) -> Result<Response, StatusCode> { 1407 // let hydrant = &app_state.hydrant; 1408 let query_str = req.uri().query().unwrap_or(""); 1409 let params: GetFollowsParams = serde_urlencoded::from_str(query_str).map_err(|e| { 1410 tracing::error!("failed to parse query params: {e}"); 1411 StatusCode::BAD_REQUEST 1412 })?; 1413 1414 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 1415 tracing::error!("failed to parse actor identifier: {e}"); 1416 StatusCode::BAD_REQUEST 1417 })?; 1418 let repo = match app_state.hydrant.repos.resolve(&ident).await { 1419 Ok(repo) => repo, 1420 Err(e) => { 1421 tracing::error!("failed to resolve actor {}: {e}", params.actor); 1422 return proxy_request(req).await; 1423 } 1424 }; 1425 1426 let limit = params.limit.unwrap_or(50).min(100); 1427 let record_list = match repo 1428 .list_records( 1429 "app.bsky.graph.follow", 1430 limit, 1431 true, 1432 params.cursor.as_deref(), 1433 ) 1434 .await 1435 { 1436 Ok(rl) => rl, 1437 Err(e) => { 1438 tracing::error!("failed to list follows for {}: {e}", repo.did); 1439 return proxy_request(req).await; 1440 } 1441 }; 1442 1443 let mut follows = Vec::new(); 1444 let viewer_did = get_auth_did(&req); 1445 for rec in record_list.records { 1446 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 1447 if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) { 1448 match get_profile_internal(&app_state, subject_did, viewer_did.as_deref()).await { 1449 Ok(profile) => follows.push(profile), 1450 Err(e) => tracing::warn!("failed to get profile for {subject_did}: {e}"), 1451 } 1452 } 1453 } 1454 1455 let subject_profile = 1456 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 1457 Ok(p) => p, 1458 Err(e) => { 1459 tracing::error!("failed to get profile for {}: {e}", repo.did); 1460 return proxy_request(req).await; 1461 } 1462 }; 1463 1464 Ok(Json(serde_json::json!({ 1465 "subject": subject_profile, 1466 "follows": follows, 1467 "cursor": record_list.cursor.map(|c| c.to_string()) 1468 })) 1469 .into_response()) 1470} 1471 1472async fn get_followers( 1473 State(app_state): State<AppState>, 1474 req: Request, 1475) -> Result<Response, StatusCode> { 1476 // let hydrant = &app_state.hydrant; 1477 let query_str = req.uri().query().unwrap_or(""); 1478 let params: GetFollowsParams = serde_urlencoded::from_str(query_str).map_err(|e| { 1479 tracing::error!("failed to parse query params: {e}"); 1480 StatusCode::BAD_REQUEST 1481 })?; 1482 1483 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 1484 tracing::error!("failed to parse actor identifier: {e}"); 1485 StatusCode::BAD_REQUEST 1486 })?; 1487 let repo = match app_state.hydrant.repos.resolve(&ident).await { 1488 Ok(repo) => repo, 1489 Err(e) => { 1490 tracing::error!("failed to resolve actor {}: {e}", params.actor); 1491 return proxy_request(req).await; 1492 } 1493 }; 1494 1495 let limit = params.limit.unwrap_or(50).min(100); 1496 let mut fetch = app_state 1497 .hydrant 1498 .backlinks 1499 .fetch(repo.did.as_str().to_string()) 1500 .source("app.bsky.graph.follow") 1501 .limit(limit); 1502 if let Some(cursor_str) = params.cursor { 1503 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { 1504 Ok(c) => fetch = fetch.cursor(c), 1505 Err(e) => tracing::warn!("failed to decode cursor {cursor_str}: {e}"), 1506 } 1507 } 1508 1509 let backlinks_page = match fetch.run().await { 1510 Ok(bp) => bp, 1511 Err(e) => { 1512 tracing::error!("failed to fetch backlinks for {}: {e}", repo.did); 1513 return proxy_request(req).await; 1514 } 1515 }; 1516 1517 let mut followers = Vec::new(); 1518 let viewer_did = get_auth_did(&req); 1519 for bl in backlinks_page.backlinks { 1520 match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { 1521 Ok(uri) => { 1522 let author_ident = uri.authority(); 1523 match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref()) 1524 .await 1525 { 1526 Ok(profile) => followers.push(profile), 1527 Err(e) => tracing::warn!("failed to get profile for {author_ident}: {e}"), 1528 } 1529 } 1530 Err(e) => tracing::warn!("failed to parse uri {}: {e}", bl.uri), 1531 } 1532 } 1533 1534 let subject_profile = 1535 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 1536 Ok(p) => p, 1537 Err(e) => { 1538 tracing::error!("failed to get profile for {}: {e}", repo.did); 1539 return proxy_request(req).await; 1540 } 1541 }; 1542 1543 let cursor = backlinks_page 1544 .next_cursor 1545 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); 1546 1547 Ok(Json(serde_json::json!({ 1548 "subject": subject_profile, 1549 "followers": followers, 1550 "cursor": cursor, 1551 })) 1552 .into_response()) 1553} 1554 1555async fn get_age_assurance_state() -> Result<Response, StatusCode> { 1556 Ok(Json(serde_json::json!({ 1557 "status": "assured" 1558 })) 1559 .into_response()) 1560} 1561 1562fn get_auth_did(req: &Request) -> Option<String> { 1563 let auth = req.headers().get("authorization")?.to_str().ok()?; 1564 let token = auth.strip_prefix("Bearer ")?; 1565 let parts: Vec<&str> = token.split('.').collect(); 1566 if parts.len() != 3 { 1567 return None; 1568 } 1569 use base64::Engine; 1570 let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD 1571 .decode(parts[1]) 1572 .ok()?; 1573 let json: serde_json::Value = serde_json::from_slice(&payload).ok()?; 1574 json.get("sub") 1575 .and_then(|s| s.as_str()) 1576 .map(|s| s.to_string()) 1577} 1578 1579#[derive(Deserialize)] 1580struct CreateBookmarkReq { 1581 uri: String, 1582 cid: String, 1583} 1584 1585async fn create_bookmark( 1586 State(app_state): State<AppState>, 1587 req: Request, 1588) -> Result<Response, StatusCode> { 1589 let Some(did) = get_auth_did(&req) else { 1590 return Err(StatusCode::UNAUTHORIZED); 1591 }; 1592 1593 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 1594 .await 1595 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 1596 let payload: CreateBookmarkReq = serde_json::from_slice(&body_bytes).map_err(|e| { 1597 tracing::error!("failed to parse create bookmark request: {e}"); 1598 StatusCode::BAD_REQUEST 1599 })?; 1600 1601 let key = format!("bookmark:{}:{}", did, payload.uri); 1602 app_state 1603 .bookmarks 1604 .insert(key.as_bytes(), payload.cid.as_bytes()) 1605 .map_err(|e| { 1606 tracing::error!("failed to insert bookmark: {e}"); 1607 StatusCode::INTERNAL_SERVER_ERROR 1608 })?; 1609 1610 Ok(Json(serde_json::json!({})).into_response()) 1611} 1612 1613#[derive(Deserialize)] 1614struct DeleteBookmarkReq { 1615 uri: String, 1616} 1617 1618async fn delete_bookmark( 1619 State(app_state): State<AppState>, 1620 req: Request, 1621) -> Result<Response, StatusCode> { 1622 let Some(did) = get_auth_did(&req) else { 1623 return Err(StatusCode::UNAUTHORIZED); 1624 }; 1625 1626 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 1627 .await 1628 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 1629 let payload: DeleteBookmarkReq = serde_json::from_slice(&body_bytes).map_err(|e| { 1630 tracing::error!("failed to parse delete bookmark request: {e}"); 1631 StatusCode::BAD_REQUEST 1632 })?; 1633 1634 let key = format!("bookmark:{}:{}", did, payload.uri); 1635 app_state.bookmarks.remove(key.as_bytes()).map_err(|e| { 1636 tracing::error!("failed to remove bookmark: {e}"); 1637 StatusCode::INTERNAL_SERVER_ERROR 1638 })?; 1639 1640 Ok(Json(serde_json::json!({})).into_response()) 1641} 1642 1643#[derive(Deserialize)] 1644struct GetBookmarksParams { 1645 #[serde(default)] 1646 limit: Option<usize>, 1647 #[serde(default)] 1648 cursor: Option<String>, 1649} 1650 1651async fn get_bookmarks( 1652 State(app_state): State<AppState>, 1653 req: Request, 1654) -> Result<Response, StatusCode> { 1655 // let hydrant = &app_state.hydrant; 1656 let Some(did) = get_auth_did(&req) else { 1657 return Err(StatusCode::UNAUTHORIZED); 1658 }; 1659 1660 let query_str = req.uri().query().unwrap_or(""); 1661 let params = 1662 serde_urlencoded::from_str::<GetBookmarksParams>(query_str).unwrap_or(GetBookmarksParams { 1663 limit: None, 1664 cursor: None, 1665 }); 1666 let limit = params.limit.unwrap_or(50).min(100); 1667 1668 let prefix = format!("bookmark:{}:", did); 1669 1670 let iter: Box<dyn Iterator<Item = _>> = if let Some(cursor) = params.cursor { 1671 Box::new(app_state.bookmarks.range::<&[u8], _>(( 1672 std::ops::Bound::Included(cursor.as_bytes()), 1673 std::ops::Bound::Unbounded, 1674 ))) 1675 } else { 1676 Box::new(app_state.bookmarks.prefix(prefix.as_bytes())) 1677 }; 1678 1679 let mut fetched_items = Vec::new(); 1680 let mut next_cursor = None; 1681 for item in iter { 1682 let (key, cid_bytes) = match item.into_inner() { 1683 Ok(inner) => inner, 1684 Err(e) => { 1685 tracing::warn!("failed to get item from bookmarks iterator: {e}"); 1686 continue; 1687 } 1688 }; 1689 if !key.starts_with(prefix.as_bytes()) { 1690 break; 1691 } 1692 fetched_items.push((key.to_vec(), cid_bytes.to_vec())); 1693 if fetched_items.len() >= limit { 1694 next_cursor = Some(String::from_utf8_lossy(&key).to_string()); 1695 break; 1696 } 1697 } 1698 1699 let mut bookmarks = Vec::new(); 1700 for (key, cid_bytes) in fetched_items { 1701 let key_str = String::from_utf8_lossy(&key); 1702 let uri_str = key_str.strip_prefix(&prefix).unwrap_or(&key_str); 1703 let cid_str = String::from_utf8_lossy(&cid_bytes); 1704 1705 match jacquard_common::types::string::AtUri::new(uri_str) { 1706 Ok(uri) => { 1707 let author_ident = uri.authority(); 1708 let collection = uri.collection().unwrap().as_str(); 1709 let rkey = uri.rkey().unwrap().0.as_str(); 1710 1711 match app_state.hydrant.repos.resolve(author_ident).await { 1712 Ok(repo) => match repo.get_record(collection, rkey).await { 1713 Ok(Some(record)) => { 1714 match get_profile_internal(&app_state, repo.did.as_str(), None).await { 1715 Ok(author_profile) => { 1716 bookmarks.push(serde_json::json!({ 1717 "uri": uri_str, 1718 "cid": cid_str.to_string(), 1719 "author": author_profile, 1720 "record": record.value, 1721 "replyCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.post").run().await.unwrap_or(0), 1722 "repostCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.repost").run().await.unwrap_or(0), 1723 "likeCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.like").run().await.unwrap_or(0), 1724 "indexedAt": chrono::Utc::now().to_rfc3339(), 1725 })); 1726 } 1727 Err(e) => { 1728 tracing::warn!("failed to get profile for {}: {e}", repo.did) 1729 } 1730 } 1731 } 1732 Ok(None) => tracing::warn!("bookmark record not found: {uri_str}"), 1733 Err(e) => tracing::warn!("failed to get bookmark record {uri_str}: {e}"), 1734 }, 1735 Err(e) => tracing::warn!("failed to resolve actor {author_ident}: {e}"), 1736 } 1737 } 1738 Err(e) => tracing::warn!("failed to parse bookmark uri {uri_str}: {e}"), 1739 } 1740 1741 if bookmarks.len() >= limit { 1742 next_cursor = Some(String::from_utf8_lossy(&key).to_string()); 1743 break; 1744 } 1745 } 1746 1747 Ok(Json(serde_json::json!({ 1748 "bookmarks": bookmarks, 1749 "cursor": next_cursor, 1750 })) 1751 .into_response()) 1752} 1753 1754#[derive(Deserialize)] 1755struct CreateDraftReq { 1756 draft: serde_json::Value, 1757} 1758 1759async fn create_draft( 1760 State(app_state): State<AppState>, 1761 req: Request, 1762) -> Result<Response, StatusCode> { 1763 let Some(did) = get_auth_did(&req) else { 1764 return Err(StatusCode::UNAUTHORIZED); 1765 }; 1766 1767 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 1768 .await 1769 .map_err(|e| { 1770 tracing::error!("failed to read request body: {e}"); 1771 StatusCode::INTERNAL_SERVER_ERROR 1772 })?; 1773 let payload = serde_json::from_slice::<CreateDraftReq>(&body_bytes).map_err(|e| { 1774 tracing::error!("failed to deserialize create_draft request: {e}"); 1775 StatusCode::BAD_REQUEST 1776 })?; 1777 1778 let tid = jacquard_common::types::tid::Tid::now(1.try_into().unwrap()).to_string(); 1779 let key = format!("draft:{}:{}", did, tid); 1780 let draft_obj = payload.draft.clone(); 1781 1782 let now = chrono::Utc::now().to_rfc3339(); 1783 let store_obj = serde_json::json!({ 1784 "id": tid, 1785 "draft": draft_obj, 1786 "createdAt": now, 1787 "updatedAt": now, 1788 }); 1789 1790 let val = serde_json::to_vec(&store_obj).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 1791 app_state 1792 .drafts 1793 .insert(key.as_bytes(), val) 1794 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 1795 1796 Ok(Json(serde_json::json!({ "id": tid })).into_response()) 1797} 1798 1799#[derive(Deserialize)] 1800struct UpdateDraftReq { 1801 draft: DraftWithId, 1802} 1803 1804#[derive(Deserialize)] 1805struct DraftWithId { 1806 id: String, 1807 draft: serde_json::Value, 1808} 1809 1810async fn update_draft( 1811 State(app_state): State<AppState>, 1812 req: Request, 1813) -> Result<Response, StatusCode> { 1814 let Some(did) = get_auth_did(&req) else { 1815 return Err(StatusCode::UNAUTHORIZED); 1816 }; 1817 1818 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 1819 .await 1820 .map_err(|e| { 1821 tracing::error!("failed to read request body: {e}"); 1822 StatusCode::INTERNAL_SERVER_ERROR 1823 })?; 1824 let payload = serde_json::from_slice::<UpdateDraftReq>(&body_bytes).map_err(|e| { 1825 tracing::error!("failed to deserialize update_draft request: {e}"); 1826 StatusCode::BAD_REQUEST 1827 })?; 1828 1829 let tid = payload.draft.id; 1830 let key = format!("draft:{}:{}", did, tid); 1831 1832 let existing = app_state.drafts.get(key.as_bytes()).map_err(|e| { 1833 tracing::error!("failed to get draft from db: {e}"); 1834 StatusCode::INTERNAL_SERVER_ERROR 1835 })?; 1836 if let Some(existing_bytes) = existing.as_deref() { 1837 match serde_json::from_slice::<serde_json::Value>(&existing_bytes) { 1838 Ok(mut existing_obj) => { 1839 let now = chrono::Utc::now().to_rfc3339(); 1840 existing_obj["draft"] = payload.draft.draft; 1841 existing_obj["updatedAt"] = serde_json::json!(now); 1842 1843 let val = serde_json::to_vec(&existing_obj).map_err(|e| { 1844 tracing::error!("failed to serialize updated draft: {e}"); 1845 StatusCode::INTERNAL_SERVER_ERROR 1846 })?; 1847 app_state.drafts.insert(key.as_bytes(), val).map_err(|e| { 1848 tracing::error!("failed to insert updated draft to db: {e}"); 1849 StatusCode::INTERNAL_SERVER_ERROR 1850 })?; 1851 } 1852 Err(e) => { 1853 tracing::error!("failed to parse existing draft from db: {e}"); 1854 } 1855 } 1856 } 1857 1858 Ok(Json(serde_json::json!({})).into_response()) 1859} 1860 1861#[derive(Deserialize)] 1862struct DeleteDraftReq { 1863 id: String, 1864} 1865 1866async fn delete_draft( 1867 State(app_state): State<AppState>, 1868 req: Request, 1869) -> Result<Response, StatusCode> { 1870 let Some(did) = get_auth_did(&req) else { 1871 return Err(StatusCode::UNAUTHORIZED); 1872 }; 1873 1874 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 1875 .await 1876 .map_err(|e| { 1877 tracing::error!("failed to read request body: {e}"); 1878 StatusCode::INTERNAL_SERVER_ERROR 1879 })?; 1880 let payload = serde_json::from_slice::<DeleteDraftReq>(&body_bytes).map_err(|e| { 1881 tracing::error!("failed to deserialize delete_draft request: {e}"); 1882 StatusCode::BAD_REQUEST 1883 })?; 1884 1885 let key = format!("draft:{}:{}", did, payload.id); 1886 app_state.drafts.remove(key.as_bytes()).map_err(|e| { 1887 tracing::error!("failed to remove draft from db: {e}"); 1888 StatusCode::INTERNAL_SERVER_ERROR 1889 })?; 1890 1891 Ok(Json(serde_json::json!({})).into_response()) 1892} 1893 1894#[derive(Deserialize)] 1895struct GetDraftsParams { 1896 #[serde(default)] 1897 limit: Option<usize>, 1898 #[serde(default)] 1899 cursor: Option<String>, 1900} 1901 1902async fn get_drafts( 1903 State(app_state): State<AppState>, 1904 req: Request, 1905) -> Result<Response, StatusCode> { 1906 let Some(did) = get_auth_did(&req) else { 1907 return Err(StatusCode::UNAUTHORIZED); 1908 }; 1909 1910 let query_str = req.uri().query().unwrap_or(""); 1911 let params = 1912 serde_urlencoded::from_str::<GetDraftsParams>(query_str).unwrap_or(GetDraftsParams { 1913 limit: None, 1914 cursor: None, 1915 }); 1916 let limit = params.limit.unwrap_or(50).min(100); 1917 1918 let prefix = format!("draft:{}:", did); 1919 1920 let iter: Box<dyn Iterator<Item = _>> = if let Some(cursor) = params.cursor { 1921 Box::new(app_state.drafts.range::<&[u8], _>(( 1922 std::ops::Bound::Included(cursor.as_bytes()), 1923 std::ops::Bound::Unbounded, 1924 ))) 1925 } else { 1926 Box::new(app_state.drafts.prefix(prefix.as_bytes())) 1927 }; 1928 1929 let mut drafts = Vec::new(); 1930 let mut next_cursor = None; 1931 1932 for item in iter { 1933 let (key, val_bytes) = match item.into_inner() { 1934 Ok(v) => v, 1935 Err(e) => { 1936 tracing::error!("failed to get item from drafts iterator: {e}"); 1937 continue; 1938 } 1939 }; 1940 if !key.starts_with(prefix.as_bytes()) { 1941 break; 1942 } 1943 1944 match serde_json::from_slice::<serde_json::Value>(&val_bytes) { 1945 Ok(obj) => drafts.push(obj), 1946 Err(e) => tracing::error!("failed to parse draft from db: {e}"), 1947 } 1948 1949 if drafts.len() >= limit { 1950 next_cursor = Some(String::from_utf8_lossy(&key).to_string()); 1951 break; 1952 } 1953 } 1954 1955 Ok(Json(serde_json::json!({ 1956 "drafts": drafts, 1957 "cursor": next_cursor, 1958 })) 1959 .into_response()) 1960} 1961 1962#[derive(Deserialize)] 1963struct GetTimelineParams { 1964 #[serde(default)] 1965 algorithm: Option<String>, 1966 #[serde(default)] 1967 limit: Option<usize>, 1968 #[serde(default)] 1969 cursor: Option<String>, 1970} 1971 1972async fn get_timeline( 1973 State(app_state): State<AppState>, 1974 req: Request, 1975) -> Result<Response, StatusCode> { 1976 // let hydrant = &app_state.hydrant; 1977 let query_str = req.uri().query().unwrap_or(""); 1978 let params = serde_urlencoded::from_str::<GetTimelineParams>(query_str).map_err(|e| { 1979 tracing::error!("failed to parse get_timeline query params: {e}"); 1980 StatusCode::BAD_REQUEST 1981 })?; 1982 1983 let Some(did_str) = get_auth_did(&req) else { 1984 return Err(StatusCode::UNAUTHORIZED); 1985 }; 1986 1987 let ident = AtIdentifier::new(&did_str).map_err(|e| { 1988 tracing::error!("failed to create AtIdentifier from did_str {did_str}: {e}"); 1989 StatusCode::BAD_REQUEST 1990 })?; 1991 let repo = match app_state.hydrant.repos.resolve(&ident).await { 1992 Ok(r) => r, 1993 Err(e) => { 1994 tracing::error!("failed to resolve repo for {ident}: {e}"); 1995 return proxy_request(req).await; 1996 } 1997 }; 1998 1999 // Get all follows of auth user 2000 let mut follows = std::collections::HashSet::new(); 2001 follows.insert(repo.did.clone()); // Include self 2002 2003 let mut cursor = None; 2004 loop { 2005 let record_list = match repo 2006 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) 2007 .await 2008 { 2009 Ok(rl) => rl, 2010 Err(e) => { 2011 tracing::error!("failed to list follows for {}: {e}", repo.did); 2012 break; 2013 } 2014 }; 2015 for rec in &record_list.records { 2016 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2017 if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) { 2018 match jacquard_common::types::string::Did::new(subject_did) { 2019 Ok(did) => { 2020 follows.insert(did.into_static()); 2021 } 2022 Err(e) => { 2023 tracing::error!("failed to parse subject DID {subject_did}: {e}"); 2024 } 2025 } 2026 } 2027 } 2028 if record_list.cursor.is_none() { 2029 break; 2030 } 2031 cursor = record_list.cursor.map(|c| c.to_string()); 2032 } 2033 2034 let limit = params.limit.unwrap_or(50).min(100); 2035 2036 let mut all_items = Vec::new(); 2037 2038 for did in follows { 2039 match app_state.hydrant.repos.get(&did).info().await { 2040 Ok(Some(info)) => { 2041 if !info.tracked { 2042 continue; 2043 } 2044 } 2045 Ok(None) => continue, 2046 Err(e) => { 2047 tracing::error!("failed to get info for followed repo {did}: {e}"); 2048 continue; 2049 } 2050 } 2051 2052 let followed_repo = app_state.hydrant.repos.get(&did); 2053 2054 // get posts 2055 match followed_repo 2056 .list_records("app.bsky.feed.post", limit, true, None) 2057 .await 2058 { 2059 Ok(record_list) => { 2060 for rec in record_list.records { 2061 let rkey = rec.rkey.as_str().to_string(); 2062 let val_json = 2063 serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2064 all_items.push(( 2065 did.clone(), 2066 "app.bsky.feed.post".to_string(), 2067 rkey, 2068 val_json, 2069 )); 2070 } 2071 } 2072 Err(e) => { 2073 tracing::warn!("failed to list posts for {did}: {e}"); 2074 } 2075 } 2076 2077 // get reposts 2078 match followed_repo 2079 .list_records("app.bsky.feed.repost", limit, true, None) 2080 .await 2081 { 2082 Ok(record_list) => { 2083 for rec in record_list.records { 2084 let rkey = rec.rkey.as_str().to_string(); 2085 let val_json = 2086 serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2087 all_items.push(( 2088 did.clone(), 2089 "app.bsky.feed.repost".to_string(), 2090 rkey, 2091 val_json, 2092 )); 2093 } 2094 } 2095 Err(e) => { 2096 tracing::warn!("failed to list reposts for {did}: {e}"); 2097 } 2098 } 2099 } 2100 2101 // Sort by createdAt descending 2102 all_items.sort_by(|a, b| { 2103 let ca = a.3.get("createdAt").and_then(|v| v.as_str()).unwrap_or(""); 2104 let cb = b.3.get("createdAt").and_then(|v| v.as_str()).unwrap_or(""); 2105 cb.cmp(ca) 2106 }); 2107 2108 if let Some(c) = params.cursor { 2109 all_items.retain(|item| { 2110 let item_ca = item 2111 .3 2112 .get("createdAt") 2113 .and_then(|v| v.as_str()) 2114 .unwrap_or(""); 2115 item_ca < c.as_str() 2116 }); 2117 } 2118 2119 all_items.truncate(limit); 2120 2121 let mut feed = Vec::new(); 2122 let mut next_cursor = None; 2123 let viewer_did = get_auth_did(&req); 2124 for (did, col, rkey, value) in all_items { 2125 let created_at = value 2126 .get("createdAt") 2127 .and_then(|v| v.as_str()) 2128 .unwrap_or("") 2129 .to_string(); 2130 next_cursor = Some(created_at.clone()); 2131 let uri = format!("at://{}/{}/{}", did.as_str(), col, rkey); 2132 2133 if col == "app.bsky.feed.post" { 2134 match get_post_view(&app_state, &uri, viewer_did.as_deref()).await { 2135 Ok(mut post_view) => { 2136 post_view["author"] = profile_to_basic(post_view["author"].clone()); 2137 feed.push(serde_json::json!({ "post": post_view })); 2138 } 2139 Err(e) => tracing::warn!("failed to get post view for {uri}: {e}"), 2140 } 2141 } else if col == "app.bsky.feed.repost" { 2142 if let Some(subject_uri) = value 2143 .get("subject") 2144 .and_then(|s| s.get("uri")) 2145 .and_then(|u| u.as_str()) 2146 { 2147 match get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await { 2148 Ok(mut post_view) => { 2149 post_view["author"] = profile_to_basic(post_view["author"].clone()); 2150 match get_profile_internal(&app_state, did.as_str(), viewer_did.as_deref()) 2151 .await 2152 { 2153 Ok(reposter_profile) => { 2154 feed.push(serde_json::json!({ 2155 "post": post_view, 2156 "reason": { 2157 "$type": "app.bsky.feed.defs#reasonRepost", 2158 "by": profile_to_basic(reposter_profile), 2159 "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at }, 2160 } 2161 })); 2162 } 2163 Err(e) => { 2164 tracing::warn!("failed to get profile for reposter {did}: {e}"); 2165 } 2166 } 2167 } 2168 Err(e) => { 2169 tracing::warn!( 2170 "failed to get post view for reposted post {subject_uri}: {e}" 2171 ); 2172 } 2173 } 2174 } 2175 } 2176 } 2177 2178 feed.reverse(); 2179 2180 Ok(Json(serde_json::json!({ 2181 "feed": feed, 2182 "cursor": next_cursor 2183 })) 2184 .into_response()) 2185} 2186 2187#[derive(Deserialize)] 2188struct GetQuotesParams { 2189 uri: String, 2190 #[serde(default)] 2191 cid: Option<String>, 2192 #[serde(default)] 2193 limit: Option<usize>, 2194 #[serde(default)] 2195 cursor: Option<String>, 2196} 2197 2198async fn get_quotes( 2199 State(app_state): State<AppState>, 2200 req: Request, 2201) -> Result<Response, StatusCode> { 2202 // let hydrant = &app_state.hydrant; 2203 let query_str = req.uri().query().unwrap_or(""); 2204 let params = serde_urlencoded::from_str::<GetQuotesParams>(query_str).map_err(|e| { 2205 tracing::error!("failed to parse get_quotes query params: {e}"); 2206 StatusCode::BAD_REQUEST 2207 })?; 2208 2209 let limit = params.limit.unwrap_or(50).min(100); 2210 2211 let mut fetch = app_state 2212 .hydrant 2213 .backlinks 2214 .fetch(params.uri.clone()) 2215 .source("app.bsky.feed.post") 2216 .limit(limit * 3); 2217 if let Some(cursor_str) = params.cursor { 2218 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { 2219 Ok(c) => { 2220 fetch = fetch.cursor(c); 2221 } 2222 Err(e) => { 2223 tracing::error!("failed to decode cursor {cursor_str}: {e}"); 2224 } 2225 } 2226 } 2227 2228 let backlinks_page = match fetch.run().await { 2229 Ok(bp) => bp, 2230 Err(e) => { 2231 tracing::error!("failed to fetch backlinks for {}: {e}", params.uri); 2232 return proxy_request(req).await; 2233 } 2234 }; 2235 2236 let mut posts = Vec::new(); 2237 let mut found = 0; 2238 for bl in backlinks_page.backlinks { 2239 if found >= limit { 2240 break; 2241 } 2242 2243 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { 2244 Ok(u) => u, 2245 Err(e) => { 2246 tracing::error!("failed to parse backlink URI {}: {e}", bl.uri); 2247 continue; 2248 } 2249 }; 2250 let author_ident = uri.authority(); 2251 let collection = uri.collection().unwrap().as_str(); 2252 let rkey = uri.rkey().unwrap().0.as_str(); 2253 2254 let repo = match app_state.hydrant.repos.resolve(author_ident).await { 2255 Ok(r) => r, 2256 Err(e) => { 2257 tracing::error!("failed to resolve repo for {author_ident}: {e}"); 2258 continue; 2259 } 2260 }; 2261 let record = match repo.get_record(collection, rkey).await { 2262 Ok(Some(rec)) => rec, 2263 Ok(None) => continue, 2264 Err(e) => { 2265 tracing::error!( 2266 "failed to get record {collection}/{rkey} from repo {author_ident}: {e}" 2267 ); 2268 continue; 2269 } 2270 }; 2271 let value = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); 2272 2273 let is_quote = value.get("embed").map_or(false, |embed| { 2274 let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or(""); 2275 if t == "app.bsky.embed.record" { 2276 embed 2277 .get("record") 2278 .and_then(|r| r.get("uri")) 2279 .and_then(|u| u.as_str()) 2280 == Some(params.uri.as_str()) 2281 } else if t == "app.bsky.embed.recordWithMedia" { 2282 embed 2283 .get("record") 2284 .and_then(|r| r.get("record")) 2285 .and_then(|r| r.get("uri")) 2286 .and_then(|u| u.as_str()) 2287 == Some(params.uri.as_str()) 2288 } else { 2289 false 2290 } 2291 }); 2292 2293 if is_quote { 2294 let viewer_did = get_auth_did(&req); 2295 match get_post_view(&app_state, bl.uri.as_str(), viewer_did.as_deref()).await { 2296 Ok(post_view) => { 2297 posts.push(post_view); 2298 found += 1; 2299 } 2300 Err(e) => { 2301 tracing::warn!("failed to get post view for quoted post {}: {e}", bl.uri); 2302 } 2303 } 2304 } 2305 } 2306 2307 let cursor = backlinks_page 2308 .next_cursor 2309 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); 2310 2311 Ok(Json(serde_json::json!({ 2312 "uri": params.uri, 2313 "posts": posts, 2314 "cursor": cursor, 2315 })) 2316 .into_response()) 2317} 2318 2319#[derive(Deserialize)] 2320struct GetActorLikesParams { 2321 actor: String, 2322 #[serde(default)] 2323 limit: Option<usize>, 2324 #[serde(default)] 2325 cursor: Option<String>, 2326} 2327 2328async fn get_actor_likes( 2329 State(app_state): State<AppState>, 2330 req: Request, 2331) -> Result<Response, StatusCode> { 2332 // let hydrant = &app_state.hydrant; 2333 let query_str = req.uri().query().unwrap_or(""); 2334 let params = serde_urlencoded::from_str::<GetActorLikesParams>(query_str).map_err(|e| { 2335 tracing::error!("failed to parse get_actor_likes query params: {e}"); 2336 StatusCode::BAD_REQUEST 2337 })?; 2338 2339 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 2340 tracing::error!( 2341 "failed to create AtIdentifier for actor {}: {e}", 2342 params.actor 2343 ); 2344 StatusCode::BAD_REQUEST 2345 })?; 2346 let repo = match app_state.hydrant.repos.resolve(&ident).await { 2347 Ok(r) => r, 2348 Err(e) => { 2349 tracing::error!("failed to resolve repo for {ident}: {e}"); 2350 return proxy_request(req).await; 2351 } 2352 }; 2353 2354 let limit = params.limit.unwrap_or(50).min(100); 2355 2356 let record_list = match repo 2357 .list_records("app.bsky.feed.like", limit, true, params.cursor.as_deref()) 2358 .await 2359 { 2360 Ok(rl) => rl, 2361 Err(e) => { 2362 tracing::error!("failed to list likes for {}: {e}", repo.did); 2363 return proxy_request(req).await; 2364 } 2365 }; 2366 2367 let mut feed = Vec::new(); 2368 let viewer_did = get_auth_did(&req); 2369 for rec in record_list.records { 2370 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2371 if let Some(subject_uri) = value 2372 .get("subject") 2373 .and_then(|s| s.get("uri")) 2374 .and_then(|u| u.as_str()) 2375 { 2376 match get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await { 2377 Ok(post_view) => feed.push(serde_json::json!({ "post": post_view })), 2378 Err(e) => { 2379 tracing::warn!("failed to get post view for liked post {subject_uri}: {e}") 2380 } 2381 } 2382 } 2383 } 2384 2385 Ok(Json(serde_json::json!({ 2386 "feed": feed, 2387 "cursor": record_list.cursor.map(|c| c.to_string()) 2388 })) 2389 .into_response()) 2390} 2391 2392#[derive(Deserialize)] 2393struct GetRelationshipsParams { 2394 actor: String, 2395} 2396 2397async fn get_relationships( 2398 State(app_state): State<AppState>, 2399 req: Request, 2400) -> Result<Response, StatusCode> { 2401 // let hydrant = &app_state.hydrant; 2402 let query_str = req.uri().query().unwrap_or(""); 2403 let params = serde_urlencoded::from_str::<GetRelationshipsParams>(query_str).map_err(|e| { 2404 tracing::error!("failed to parse get_relationships query params: {e}"); 2405 StatusCode::BAD_REQUEST 2406 })?; 2407 2408 let others = extract_query_array(query_str, "others"); 2409 if others.is_empty() { 2410 return proxy_request(req).await; 2411 } 2412 2413 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 2414 tracing::error!( 2415 "failed to create AtIdentifier for actor {}: {e}", 2416 params.actor 2417 ); 2418 StatusCode::BAD_REQUEST 2419 })?; 2420 let repo = match app_state.hydrant.repos.resolve(&ident).await { 2421 Ok(r) => r, 2422 Err(e) => { 2423 tracing::error!("failed to resolve repo for {ident}: {e}"); 2424 return proxy_request(req).await; 2425 } 2426 }; 2427 let actor_did = repo.did.as_str().to_string(); 2428 2429 let mut relationships = Vec::new(); 2430 for other in others { 2431 let other_ident = match AtIdentifier::new(&other) { 2432 Ok(i) => i, 2433 Err(e) => { 2434 tracing::error!("failed to create AtIdentifier for other actor {other}: {e}"); 2435 continue; 2436 } 2437 }; 2438 let other_repo = match app_state.hydrant.repos.resolve(&other_ident).await { 2439 Ok(r) => r, 2440 Err(e) => { 2441 tracing::error!("failed to resolve repo for other actor {other_ident}: {e}"); 2442 continue; 2443 } 2444 }; 2445 let other_did = other_repo.did.as_str().to_string(); 2446 2447 let mut following = None; 2448 let mut followed_by = None; 2449 2450 let mut cursor = None; 2451 loop { 2452 match repo 2453 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) 2454 .await 2455 { 2456 Ok(rl) => { 2457 for rec in &rl.records { 2458 let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2459 if val.get("subject").and_then(|s| s.as_str()) == Some(&other_did) { 2460 following = Some(format!( 2461 "at://{}/app.bsky.graph.follow/{}", 2462 actor_did, 2463 rec.rkey.as_str() 2464 )); 2465 break; 2466 } 2467 } 2468 if following.is_some() || rl.cursor.is_none() { 2469 break; 2470 } 2471 cursor = rl.cursor.map(|c| c.to_string()); 2472 } 2473 Err(e) => { 2474 tracing::error!("failed to list follows for {actor_did}: {e}"); 2475 break; 2476 } 2477 } 2478 } 2479 2480 let mut cursor = None; 2481 loop { 2482 match other_repo 2483 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) 2484 .await 2485 { 2486 Ok(rl) => { 2487 for rec in &rl.records { 2488 let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2489 if val.get("subject").and_then(|s| s.as_str()) == Some(&actor_did) { 2490 followed_by = Some(format!( 2491 "at://{}/app.bsky.graph.follow/{}", 2492 other_did, 2493 rec.rkey.as_str() 2494 )); 2495 break; 2496 } 2497 } 2498 if followed_by.is_some() || rl.cursor.is_none() { 2499 break; 2500 } 2501 cursor = rl.cursor.map(|c| c.to_string()); 2502 } 2503 Err(e) => { 2504 tracing::error!("failed to list follows for {other_did}: {e}"); 2505 break; 2506 } 2507 } 2508 } 2509 2510 relationships.push(serde_json::json!({ 2511 "$type": "app.bsky.graph.defs#relationship", 2512 "did": other_did, 2513 "following": following, 2514 "followedBy": followed_by, 2515 })); 2516 } 2517 2518 Ok(Json(serde_json::json!({ 2519 "actor": actor_did, 2520 "relationships": relationships 2521 })) 2522 .into_response()) 2523} 2524 2525async fn get_known_followers( 2526 State(app_state): State<AppState>, 2527 req: Request, 2528) -> Result<Response, StatusCode> { 2529 // let hydrant = &app_state.hydrant; 2530 let query_str = req.uri().query().unwrap_or(""); 2531 let params = serde_urlencoded::from_str::<GetFollowsParams>(query_str).map_err(|e| { 2532 tracing::error!("failed to parse get_known_followers query params: {e}"); 2533 StatusCode::BAD_REQUEST 2534 })?; 2535 2536 let Some(did_str) = get_auth_did(&req) else { 2537 return Err(StatusCode::UNAUTHORIZED); 2538 }; 2539 let auth_ident = AtIdentifier::new(&did_str).map_err(|e| { 2540 tracing::error!("failed to create AtIdentifier for auth user {did_str}: {e}"); 2541 StatusCode::BAD_REQUEST 2542 })?; 2543 let auth_repo = match app_state.hydrant.repos.resolve(&auth_ident).await { 2544 Ok(r) => r, 2545 Err(e) => { 2546 tracing::error!("failed to resolve repo for auth user {auth_ident}: {e}"); 2547 return proxy_request(req).await; 2548 } 2549 }; 2550 2551 let mut auth_follows = std::collections::HashSet::new(); 2552 let mut cursor = None; 2553 loop { 2554 match auth_repo 2555 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) 2556 .await 2557 { 2558 Ok(rl) => { 2559 for rec in &rl.records { 2560 let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2561 if let Some(subj) = val.get("subject").and_then(|s| s.as_str()) { 2562 auth_follows.insert(subj.to_string()); 2563 } 2564 } 2565 if rl.cursor.is_none() { 2566 break; 2567 } 2568 cursor = rl.cursor.map(|c| c.to_string()); 2569 } 2570 Err(e) => { 2571 tracing::error!( 2572 "failed to list follows for auth user {}: {e}", 2573 auth_repo.did 2574 ); 2575 break; 2576 } 2577 } 2578 } 2579 2580 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 2581 tracing::error!( 2582 "failed to create AtIdentifier for actor {}: {e}", 2583 params.actor 2584 ); 2585 StatusCode::BAD_REQUEST 2586 })?; 2587 let repo = match app_state.hydrant.repos.resolve(&ident).await { 2588 Ok(r) => r, 2589 Err(e) => { 2590 tracing::error!("failed to resolve repo for {ident}: {e}"); 2591 return proxy_request(req).await; 2592 } 2593 }; 2594 2595 let limit = params.limit.unwrap_or(50).min(100); 2596 2597 let mut fetch = app_state 2598 .hydrant 2599 .backlinks 2600 .fetch(repo.did.as_str().to_string()) 2601 .source("app.bsky.graph.follow") 2602 .limit(limit * 3); 2603 if let Some(cursor_str) = params.cursor { 2604 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { 2605 Ok(c) => { 2606 fetch = fetch.cursor(c); 2607 } 2608 Err(e) => { 2609 tracing::error!("failed to decode cursor {cursor_str}: {e}"); 2610 } 2611 } 2612 } 2613 2614 let backlinks_page = match fetch.run().await { 2615 Ok(bp) => bp, 2616 Err(e) => { 2617 tracing::error!("failed to fetch backlinks for {}: {e}", repo.did); 2618 return proxy_request(req).await; 2619 } 2620 }; 2621 2622 let mut followers = Vec::new(); 2623 let mut found = 0; 2624 for bl in backlinks_page.backlinks { 2625 if found >= limit { 2626 break; 2627 } 2628 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { 2629 Ok(u) => u, 2630 Err(e) => { 2631 tracing::error!("failed to parse backlink URI {}: {e}", bl.uri); 2632 continue; 2633 } 2634 }; 2635 let author_ident = uri.authority().as_str().to_string(); 2636 2637 if auth_follows.contains(&author_ident) { 2638 match get_profile_internal(&app_state, author_ident.as_str(), None).await { 2639 Ok(profile) => { 2640 followers.push(profile); 2641 found += 1; 2642 } 2643 Err(e) => { 2644 tracing::warn!("failed to get profile for follower {author_ident}: {e}"); 2645 } 2646 } 2647 } 2648 } 2649 2650 let subject_profile = match get_profile_internal(&app_state, repo.did.as_str(), None).await { 2651 Ok(p) => p, 2652 Err(e) => { 2653 tracing::error!("failed to get subject profile for {}: {e}", repo.did); 2654 return proxy_request(req).await; 2655 } 2656 }; 2657 2658 let cursor = backlinks_page 2659 .next_cursor 2660 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); 2661 2662 Ok(Json(serde_json::json!({ 2663 "subject": subject_profile, 2664 "followers": followers, 2665 "cursor": cursor, 2666 })) 2667 .into_response()) 2668} 2669 2670async fn get_blocks( 2671 State(app_state): State<AppState>, 2672 req: Request, 2673) -> Result<Response, StatusCode> { 2674 // let hydrant = &app_state.hydrant; 2675 let query_str = req.uri().query().unwrap_or(""); 2676 let params = serde_urlencoded::from_str::<GetFollowsParams>(query_str).map_err(|e| { 2677 tracing::error!("failed to parse get_blocks query params: {e}"); 2678 StatusCode::BAD_REQUEST 2679 })?; 2680 2681 let Some(did_str) = get_auth_did(&req) else { 2682 return Err(StatusCode::UNAUTHORIZED); 2683 }; 2684 let ident = AtIdentifier::new(&did_str).map_err(|e| { 2685 tracing::error!("failed to create AtIdentifier for did_str {did_str}: {e}"); 2686 StatusCode::BAD_REQUEST 2687 })?; 2688 let repo = match app_state.hydrant.repos.resolve(&ident).await { 2689 Ok(r) => r, 2690 Err(e) => { 2691 tracing::error!("failed to resolve repo for {ident}: {e}"); 2692 return proxy_request(req).await; 2693 } 2694 }; 2695 2696 let limit = params.limit.unwrap_or(50).min(100); 2697 let record_list = match repo 2698 .list_records( 2699 "app.bsky.graph.block", 2700 limit, 2701 true, 2702 params.cursor.as_deref(), 2703 ) 2704 .await 2705 { 2706 Ok(rl) => rl, 2707 Err(e) => { 2708 tracing::error!("failed to list blocks for {}: {e}", repo.did); 2709 return proxy_request(req).await; 2710 } 2711 }; 2712 2713 let mut blocks = Vec::new(); 2714 for rec in record_list.records { 2715 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2716 if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) { 2717 match get_profile_internal(&app_state, subject_did, None).await { 2718 Ok(profile) => blocks.push(profile), 2719 Err(e) => { 2720 tracing::warn!("failed to get profile for blocked actor {subject_did}: {e}") 2721 } 2722 } 2723 } 2724 } 2725 2726 Ok(Json(serde_json::json!({ 2727 "blocks": blocks, 2728 "cursor": record_list.cursor.map(|c| c.to_string()) 2729 })) 2730 .into_response()) 2731} 2732 2733async fn get_list(State(app_state): State<AppState>, req: Request) -> Result<Response, StatusCode> { 2734 // let hydrant = &app_state.hydrant; 2735 let query_str = req.uri().query().unwrap_or(""); 2736 #[derive(Deserialize)] 2737 struct GetListParams { 2738 list: String, 2739 #[serde(default)] 2740 limit: Option<usize>, 2741 #[serde(default)] 2742 cursor: Option<String>, 2743 } 2744 let params = serde_urlencoded::from_str::<GetListParams>(query_str).map_err(|e| { 2745 tracing::error!("failed to parse get_list query params: {e}"); 2746 StatusCode::BAD_REQUEST 2747 })?; 2748 2749 let uri = jacquard_common::types::string::AtUri::new(&params.list).map_err(|e| { 2750 tracing::error!("failed to parse list URI {}: {e}", params.list); 2751 StatusCode::BAD_REQUEST 2752 })?; 2753 let author_ident = uri.authority(); 2754 let rkey = uri 2755 .rkey() 2756 .ok_or_else(|| { 2757 tracing::error!("missing rkey in list URI {}", params.list); 2758 StatusCode::BAD_REQUEST 2759 })? 2760 .0 2761 .as_str() 2762 .to_string(); 2763 2764 let repo = match app_state.hydrant.repos.resolve(author_ident).await { 2765 Ok(r) => r, 2766 Err(e) => { 2767 tracing::error!("failed to resolve repo for {author_ident}: {e}"); 2768 return proxy_request(req).await; 2769 } 2770 }; 2771 2772 let record = match repo.get_record("app.bsky.graph.list", &rkey).await { 2773 Ok(Some(rec)) => rec, 2774 Ok(None) => { 2775 tracing::error!("list record not found: {author_ident}/app.bsky.graph.list/{rkey}"); 2776 return proxy_request(req).await; 2777 } 2778 Err(e) => { 2779 tracing::error!("failed to get list record {author_ident}/{rkey}: {e}"); 2780 return proxy_request(req).await; 2781 } 2782 }; 2783 2784 let viewer_did = get_auth_did(&req); 2785 let author_profile = 2786 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 2787 Ok(p) => p, 2788 Err(e) => { 2789 tracing::error!("failed to get author profile for {}: {e}", repo.did); 2790 return proxy_request(req).await; 2791 } 2792 }; 2793 2794 let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); 2795 2796 let list_view = serde_json::json!({ 2797 "uri": params.list, 2798 "cid": record.cid.to_string(), 2799 "creator": author_profile, 2800 "name": val_json.get("name").and_then(|v| v.as_str()).unwrap_or(""), 2801 "purpose": val_json.get("purpose").and_then(|v| v.as_str()).unwrap_or("app.bsky.graph.defs#modlist"), 2802 "description": val_json.get("description").and_then(|v| v.as_str()), 2803 "avatar": val_json.get("avatar").and_then(|v| v.as_str()), 2804 "indexedAt": chrono::Utc::now().to_rfc3339(), 2805 }); 2806 2807 // items from backlinks 2808 let limit = params.limit.unwrap_or(50).min(100); 2809 let mut fetch = app_state 2810 .hydrant 2811 .backlinks 2812 .fetch(params.list.clone()) 2813 .source("app.bsky.graph.listitem") 2814 .limit(limit); 2815 if let Some(cursor_str) = params.cursor { 2816 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { 2817 Ok(c) => { 2818 fetch = fetch.cursor(c); 2819 } 2820 Err(e) => { 2821 tracing::error!("failed to decode cursor {cursor_str}: {e}"); 2822 } 2823 } 2824 } 2825 2826 let backlinks_page = match fetch.run().await { 2827 Ok(bp) => bp, 2828 Err(e) => { 2829 tracing::error!("failed to fetch backlinks for list {}: {e}", params.list); 2830 return proxy_request(req).await; 2831 } 2832 }; 2833 2834 let mut items = Vec::new(); 2835 for bl in backlinks_page.backlinks { 2836 let item_uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { 2837 Ok(u) => u, 2838 Err(e) => { 2839 tracing::error!("failed to parse listitem URI {}: {e}", bl.uri); 2840 continue; 2841 } 2842 }; 2843 let item_author = item_uri.authority(); 2844 let item_rkey = item_uri.rkey().unwrap().0.as_str(); 2845 2846 let item_repo = match app_state.hydrant.repos.resolve(item_author).await { 2847 Ok(r) => r, 2848 Err(e) => { 2849 tracing::error!("failed to resolve repo for listitem author {item_author}: {e}"); 2850 continue; 2851 } 2852 }; 2853 let item_rec = match item_repo 2854 .get_record("app.bsky.graph.listitem", item_rkey) 2855 .await 2856 { 2857 Ok(Some(rec)) => rec, 2858 Ok(None) => continue, 2859 Err(e) => { 2860 tracing::error!("failed to get listitem record {item_author}/{item_rkey}: {e}"); 2861 continue; 2862 } 2863 }; 2864 let item_val = serde_json::to_value(&item_rec.value).unwrap_or(serde_json::json!({})); 2865 2866 if let Some(subject_did) = item_val.get("subject").and_then(|s| s.as_str()) { 2867 match get_profile_internal(&app_state, subject_did, None).await { 2868 Ok(subject_profile) => { 2869 items.push(serde_json::json!({ 2870 "uri": bl.uri.as_str(), 2871 "subject": subject_profile, 2872 })); 2873 } 2874 Err(e) => { 2875 tracing::warn!("failed to get profile for listitem subject {subject_did}: {e}"); 2876 } 2877 } 2878 } 2879 } 2880 2881 Ok(Json(serde_json::json!({ 2882 "list": list_view, 2883 "items": items, 2884 "cursor": backlinks_page.next_cursor.map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)), 2885 })) 2886 .into_response()) 2887} 2888 2889async fn get_lists( 2890 State(app_state): State<AppState>, 2891 req: Request, 2892) -> Result<Response, StatusCode> { 2893 // let hydrant = &app_state.hydrant; 2894 let query_str = req.uri().query().unwrap_or(""); 2895 let params = serde_urlencoded::from_str::<GetAuthorFeedParams>(query_str).map_err(|e| { 2896 tracing::error!("failed to parse get_lists query params: {e}"); 2897 StatusCode::BAD_REQUEST 2898 })?; 2899 2900 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 2901 tracing::error!( 2902 "failed to create AtIdentifier for actor {}: {e}", 2903 params.actor 2904 ); 2905 StatusCode::BAD_REQUEST 2906 })?; 2907 let repo = match app_state.hydrant.repos.resolve(&ident).await { 2908 Ok(r) => r, 2909 Err(e) => { 2910 tracing::error!("failed to resolve repo for {ident}: {e}"); 2911 return proxy_request(req).await; 2912 } 2913 }; 2914 2915 let viewer_did = get_auth_did(&req); 2916 let author_profile = 2917 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 2918 Ok(p) => p, 2919 Err(e) => { 2920 tracing::error!("failed to get author profile for {}: {e}", repo.did); 2921 return proxy_request(req).await; 2922 } 2923 }; 2924 2925 let limit = params.limit.unwrap_or(50).min(100); 2926 let record_list = match repo 2927 .list_records("app.bsky.graph.list", limit, true, params.cursor.as_deref()) 2928 .await 2929 { 2930 Ok(rl) => rl, 2931 Err(e) => { 2932 tracing::error!("failed to list lists for {}: {e}", repo.did); 2933 return proxy_request(req).await; 2934 } 2935 }; 2936 2937 let mut lists = Vec::new(); 2938 for rec in record_list.records { 2939 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 2940 lists.push(serde_json::json!({ 2941 "uri": format!("at://{}/app.bsky.graph.list/{}", repo.did.as_str(), rec.rkey.as_str()), 2942 "cid": rec.cid.to_string(), 2943 "creator": author_profile.clone(), 2944 "name": val_json.get("name").and_then(|v| v.as_str()).unwrap_or(""), 2945 "purpose": val_json.get("purpose").and_then(|v| v.as_str()).unwrap_or("app.bsky.graph.defs#modlist"), 2946 "description": val_json.get("description").and_then(|v| v.as_str()), 2947 "avatar": val_json.get("avatar").and_then(|v| v.as_str()), 2948 "indexedAt": chrono::Utc::now().to_rfc3339(), 2949 })); 2950 } 2951 2952 Ok(Json(serde_json::json!({ 2953 "lists": lists, 2954 "cursor": record_list.cursor.map(|c| c.to_string()) 2955 })) 2956 .into_response()) 2957} 2958 2959async fn get_lists_with_membership( 2960 State(app_state): State<AppState>, 2961 req: Request, 2962) -> Result<Response, StatusCode> { 2963 // let hydrant = &app_state.hydrant; 2964 let query_str = req.uri().query().unwrap_or(""); 2965 let params = serde_urlencoded::from_str::<GetAuthorFeedParams>(query_str).map_err(|e| { 2966 tracing::error!("failed to parse get_lists_with_membership query params: {e}"); 2967 StatusCode::BAD_REQUEST 2968 })?; 2969 2970 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 2971 tracing::error!( 2972 "failed to create AtIdentifier for actor {}: {e}", 2973 params.actor 2974 ); 2975 StatusCode::BAD_REQUEST 2976 })?; 2977 let repo = match app_state.hydrant.repos.resolve(&ident).await { 2978 Ok(r) => r, 2979 Err(e) => { 2980 tracing::error!("failed to resolve repo for {ident}: {e}"); 2981 return proxy_request(req).await; 2982 } 2983 }; 2984 2985 // All lists created by actor 2986 let record_list = match repo 2987 .list_records("app.bsky.graph.list", 100, true, None) 2988 .await 2989 { 2990 Ok(rl) => rl, 2991 Err(e) => { 2992 tracing::error!("failed to list lists for {}: {e}", repo.did); 2993 return proxy_request(req).await; 2994 } 2995 }; 2996 2997 let lists: Vec<serde_json::Value> = Vec::new(); 2998 for rec in record_list.records { 2999 let _uri = format!( 3000 "at://{}/app.bsky.graph.list/{}", 3001 repo.did.as_str(), 3002 rec.rkey.as_str() 3003 ); 3004 3005 // Check if subject is in this list 3006 // This is tricky without reverse index on listitem subject. 3007 // We might just fallback here or implement a scan. 3008 // For now, proxy it if we can't efficiently answer. 3009 return proxy_request(req).await; 3010 } 3011 3012 Ok(Json(serde_json::json!({ "lists": lists })).into_response()) 3013} 3014 3015async fn get_actor_feeds( 3016 State(app_state): State<AppState>, 3017 req: Request, 3018) -> Result<Response, StatusCode> { 3019 // let hydrant = &app_state.hydrant; 3020 let query_str = req.uri().query().unwrap_or(""); 3021 let params: GetAuthorFeedParams = serde_urlencoded::from_str(query_str).map_err(|e| { 3022 tracing::error!("failed to parse get_actor_feeds query params: {e}"); 3023 StatusCode::BAD_REQUEST 3024 })?; 3025 3026 let ident = AtIdentifier::new(&params.actor).map_err(|e| { 3027 tracing::error!( 3028 "failed to create AtIdentifier for actor {}: {e}", 3029 params.actor 3030 ); 3031 StatusCode::BAD_REQUEST 3032 })?; 3033 let repo = match app_state.hydrant.repos.resolve(&ident).await { 3034 Ok(r) => r, 3035 Err(e) => { 3036 tracing::error!("failed to resolve repo for {ident}: {e}"); 3037 return proxy_request(req).await; 3038 } 3039 }; 3040 3041 let viewer_did = get_auth_did(&req); 3042 let author_profile = 3043 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 3044 Ok(p) => p, 3045 Err(e) => { 3046 tracing::error!("failed to get author profile for {}: {e}", repo.did); 3047 return proxy_request(req).await; 3048 } 3049 }; 3050 3051 let limit = params.limit.unwrap_or(50).min(100); 3052 let record_list = match repo 3053 .list_records( 3054 "app.bsky.feed.generator", 3055 limit, 3056 true, 3057 params.cursor.as_deref(), 3058 ) 3059 .await 3060 { 3061 Ok(rl) => rl, 3062 Err(e) => { 3063 tracing::error!("failed to list feed generators for {}: {e}", repo.did); 3064 return proxy_request(req).await; 3065 } 3066 }; 3067 3068 let mut feeds = Vec::new(); 3069 for rec in record_list.records { 3070 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); 3071 let uri = format!( 3072 "at://{}/app.bsky.feed.generator/{}", 3073 repo.did.as_str(), 3074 rec.rkey.as_str() 3075 ); 3076 feeds.push(serde_json::json!({ 3077 "uri": uri, 3078 "cid": rec.cid.to_string(), 3079 "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""), 3080 "creator": author_profile.clone(), 3081 "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""), 3082 "description": val_json.get("description").and_then(|v| v.as_str()), 3083 "avatar": val_json.get("avatar").and_then(|v| v.as_str()), 3084 "likeCount": app_state.hydrant.backlinks.count(uri).source("app.bsky.feed.like").run().await.unwrap_or(0), 3085 "indexedAt": chrono::Utc::now().to_rfc3339(), 3086 })); 3087 } 3088 3089 Ok(Json(serde_json::json!({ 3090 "feeds": feeds, 3091 "cursor": record_list.cursor.map(|c| c.to_string()) 3092 })) 3093 .into_response()) 3094} 3095 3096async fn get_feed_generator( 3097 State(app_state): State<AppState>, 3098 req: Request, 3099) -> Result<Response, StatusCode> { 3100 // let hydrant = &app_state.hydrant; 3101 let query_str = req.uri().query().unwrap_or(""); 3102 #[derive(Deserialize)] 3103 struct GetFeedGeneratorParams { 3104 feed: String, 3105 } 3106 let params: GetFeedGeneratorParams = serde_urlencoded::from_str(query_str).map_err(|e| { 3107 tracing::error!("failed to parse get_feed_generator query params: {e}"); 3108 StatusCode::BAD_REQUEST 3109 })?; 3110 3111 let uri = jacquard_common::types::string::AtUri::new(&params.feed).map_err(|e| { 3112 tracing::error!("failed to parse feed uri {}: {e}", params.feed); 3113 StatusCode::BAD_REQUEST 3114 })?; 3115 let author_ident = uri.authority(); 3116 let rkey = uri 3117 .rkey() 3118 .ok_or_else(|| { 3119 tracing::error!("missing rkey in feed uri {}", params.feed); 3120 StatusCode::BAD_REQUEST 3121 })? 3122 .0 3123 .as_str() 3124 .to_string(); 3125 3126 let repo = match app_state.hydrant.repos.resolve(author_ident).await { 3127 Ok(r) => r, 3128 Err(e) => { 3129 tracing::error!("failed to resolve repo for {author_ident}: {e}"); 3130 return proxy_request(req).await; 3131 } 3132 }; 3133 3134 let record = match repo.get_record("app.bsky.feed.generator", &rkey).await { 3135 Ok(Some(r)) => r, 3136 Ok(None) => { 3137 tracing::error!("feed generator record not found: {author_ident}/{rkey}"); 3138 return proxy_request(req).await; 3139 } 3140 Err(e) => { 3141 tracing::error!("failed to get feed generator record {author_ident}/{rkey}: {e}"); 3142 return proxy_request(req).await; 3143 } 3144 }; 3145 3146 let viewer_did = get_auth_did(&req); 3147 let author_profile = 3148 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { 3149 Ok(p) => p, 3150 Err(e) => { 3151 tracing::error!("failed to get author profile for {}: {e}", repo.did); 3152 return proxy_request(req).await; 3153 } 3154 }; 3155 3156 let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); 3157 3158 let view = serde_json::json!({ 3159 "uri": params.feed, 3160 "cid": record.cid.to_string(), 3161 "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""), 3162 "creator": author_profile, 3163 "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""), 3164 "description": val_json.get("description").and_then(|v| v.as_str()), 3165 "avatar": val_json.get("avatar").and_then(|v| v.as_str()), 3166 "likeCount": app_state.hydrant.backlinks.count(params.feed.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0), 3167 "indexedAt": chrono::Utc::now().to_rfc3339(), 3168 }); 3169 3170 Ok(Json(serde_json::json!({ 3171 "view": view, 3172 "isOnline": true, 3173 "isValid": true, 3174 })) 3175 .into_response()) 3176} 3177 3178async fn get_feed_generators( 3179 State(app_state): State<AppState>, 3180 req: Request, 3181) -> Result<Response, StatusCode> { 3182 // let hydrant = &app_state.hydrant; 3183 let query_str = req.uri().query().unwrap_or(""); 3184 let feeds = extract_query_array(query_str, "feeds"); 3185 if feeds.is_empty() { 3186 return proxy_request(req).await; 3187 } 3188 3189 let mut views = Vec::new(); 3190 let viewer_did = get_auth_did(&req); 3191 for feed_uri in feeds { 3192 let uri = match jacquard_common::types::string::AtUri::new(&feed_uri) { 3193 Ok(u) => u, 3194 Err(e) => { 3195 tracing::warn!("failed to parse feed uri {feed_uri}: {e}"); 3196 continue; 3197 } 3198 }; 3199 let author_ident = uri.authority(); 3200 let rkey = uri.rkey().unwrap().0.as_str().to_string(); 3201 3202 match app_state.hydrant.repos.resolve(author_ident).await { 3203 Ok(repo) => match repo.get_record("app.bsky.feed.generator", &rkey).await { 3204 Ok(Some(record)) => { 3205 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()) 3206 .await 3207 { 3208 Ok(author_profile) => { 3209 let val_json = serde_json::to_value(&record.value) 3210 .unwrap_or(serde_json::json!({})); 3211 views.push(serde_json::json!({ 3212 "uri": feed_uri, 3213 "cid": record.cid.to_string(), 3214 "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""), 3215 "creator": author_profile, 3216 "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""), 3217 "description": val_json.get("description").and_then(|v| v.as_str()), 3218 "avatar": val_json.get("avatar").and_then(|v| v.as_str()), 3219 "likeCount": app_state.hydrant.backlinks.count(feed_uri.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0), 3220 "indexedAt": chrono::Utc::now().to_rfc3339(), 3221 })); 3222 } 3223 Err(e) => { 3224 tracing::warn!("failed to get author profile for {}: {e}", repo.did) 3225 } 3226 } 3227 } 3228 Ok(None) => { 3229 tracing::warn!("feed generator record not found: {author_ident}/{rkey}") 3230 } 3231 Err(e) => { 3232 tracing::warn!("failed to get feed generator record {author_ident}/{rkey}: {e}") 3233 } 3234 }, 3235 Err(e) => tracing::warn!("failed to resolve repo for {author_ident}: {e}"), 3236 } 3237 } 3238 3239 Ok(Json(serde_json::json!({ "feeds": views })).into_response()) 3240} 3241 3242#[derive(Deserialize)] 3243struct PutPreferencesReq { 3244 preferences: Vec<serde_json::Value>, 3245} 3246 3247async fn put_preferences( 3248 State(app_state): State<AppState>, 3249 req: Request, 3250) -> Result<Response, StatusCode> { 3251 let Some(did) = get_auth_did(&req) else { 3252 return Err(StatusCode::UNAUTHORIZED); 3253 }; 3254 3255 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 3256 .await 3257 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3258 let payload: PutPreferencesReq = serde_json::from_slice(&body_bytes).map_err(|e| { 3259 tracing::error!("failed to parse put_preferences request: {e}"); 3260 StatusCode::BAD_REQUEST 3261 })?; 3262 3263 let key = format!("prefs:{}", did); 3264 let val = 3265 serde_json::to_vec(&payload.preferences).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3266 app_state 3267 .preferences 3268 .insert(key.as_bytes(), val) 3269 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3270 3271 Ok(Json(serde_json::json!({})).into_response()) 3272} 3273 3274async fn get_preferences( 3275 State(app_state): State<AppState>, 3276 req: Request, 3277) -> Result<Response, StatusCode> { 3278 let Some(did) = get_auth_did(&req) else { 3279 return Err(StatusCode::UNAUTHORIZED); 3280 }; 3281 3282 let key = format!("prefs:{}", did); 3283 let existing = app_state 3284 .preferences 3285 .get(key.as_bytes()) 3286 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3287 3288 let preferences = if let Some(bytes) = existing { 3289 serde_json::from_slice::<Vec<serde_json::Value>>(&bytes).unwrap_or_default() 3290 } else { 3291 Vec::new() 3292 }; 3293 3294 Ok(Json(serde_json::json!({ "preferences": preferences })).into_response()) 3295} 3296 3297#[derive(Deserialize)] 3298struct MuteActorReq { 3299 actor: String, 3300} 3301 3302async fn mute_actor( 3303 State(app_state): State<AppState>, 3304 req: Request, 3305) -> Result<Response, StatusCode> { 3306 // let hydrant = &app_state.hydrant; 3307 let Some(did) = get_auth_did(&req) else { 3308 return Err(StatusCode::UNAUTHORIZED); 3309 }; 3310 3311 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 3312 .await 3313 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3314 let payload: MuteActorReq = serde_json::from_slice(&body_bytes).map_err(|e| { 3315 tracing::error!("failed to parse mute_actor/unmute_actor request: {e}"); 3316 StatusCode::BAD_REQUEST 3317 })?; 3318 3319 let ident = AtIdentifier::new(&payload.actor).map_err(|e| { 3320 tracing::error!("failed to parse actor identifier {}: {e}", payload.actor); 3321 StatusCode::BAD_REQUEST 3322 })?; 3323 let repo = match app_state.hydrant.repos.resolve(&ident).await { 3324 Ok(r) => r, 3325 Err(e) => { 3326 tracing::error!("failed to resolve repo for {ident}: {e}"); 3327 return Err(StatusCode::NOT_FOUND); 3328 } 3329 }; 3330 3331 let key = format!("mute:{}:{}", did, repo.did.as_str()); 3332 app_state 3333 .mutes 3334 .insert(key.as_bytes(), b"") 3335 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3336 3337 Ok(Json(serde_json::json!({})).into_response()) 3338} 3339 3340async fn unmute_actor( 3341 State(app_state): State<AppState>, 3342 req: Request, 3343) -> Result<Response, StatusCode> { 3344 // let hydrant = &app_state.hydrant; 3345 let Some(did) = get_auth_did(&req) else { 3346 return Err(StatusCode::UNAUTHORIZED); 3347 }; 3348 3349 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 3350 .await 3351 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3352 let payload: MuteActorReq = serde_json::from_slice(&body_bytes).map_err(|e| { 3353 tracing::error!("failed to parse mute_actor/unmute_actor request: {e}"); 3354 StatusCode::BAD_REQUEST 3355 })?; 3356 3357 let ident = AtIdentifier::new(&payload.actor).map_err(|e| { 3358 tracing::error!("failed to parse actor identifier {}: {e}", payload.actor); 3359 StatusCode::BAD_REQUEST 3360 })?; 3361 let repo = match app_state.hydrant.repos.resolve(&ident).await { 3362 Ok(r) => r, 3363 Err(e) => { 3364 tracing::error!("failed to resolve repo for {ident}: {e}"); 3365 return Err(StatusCode::NOT_FOUND); 3366 } 3367 }; 3368 3369 let key = format!("mute:{}:{}", did, repo.did.as_str()); 3370 app_state 3371 .mutes 3372 .remove(key.as_bytes()) 3373 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3374 3375 Ok(Json(serde_json::json!({})).into_response()) 3376} 3377 3378async fn get_mutes( 3379 State(app_state): State<AppState>, 3380 req: Request, 3381) -> Result<Response, StatusCode> { 3382 // let hydrant = &app_state.hydrant; 3383 let Some(did) = get_auth_did(&req) else { 3384 return Err(StatusCode::UNAUTHORIZED); 3385 }; 3386 3387 let query_str = req.uri().query().unwrap_or(""); 3388 let params: GetBookmarksParams = serde_urlencoded::from_str(query_str).map_err(|e| { 3389 tracing::error!("failed to parse get_mutes query params: {e}"); 3390 StatusCode::BAD_REQUEST 3391 })?; 3392 3393 let limit = params.limit.unwrap_or(50).min(100); 3394 let prefix = format!("mute:{}:", did); 3395 3396 let iter: Box<dyn Iterator<Item = _>> = if let Some(cursor) = params.cursor { 3397 Box::new(app_state.mutes.range::<&[u8], _>(( 3398 std::ops::Bound::Included(cursor.as_bytes()), 3399 std::ops::Bound::Unbounded, 3400 ))) 3401 } else { 3402 Box::new(app_state.mutes.prefix(prefix.as_bytes())) 3403 }; 3404 3405 let mut fetched_keys = Vec::new(); 3406 for item in iter { 3407 let (key, _) = match item.into_inner() { 3408 Ok(inner) => inner, 3409 Err(e) => { 3410 tracing::warn!("failed to get item from mutes iterator: {e}"); 3411 continue; 3412 } 3413 }; 3414 if !key.starts_with(prefix.as_bytes()) { 3415 break; 3416 } 3417 fetched_keys.push(key.to_vec()); 3418 if fetched_keys.len() >= limit { 3419 break; 3420 } 3421 } 3422 3423 let mut mutes = Vec::new(); 3424 let mut next_cursor = None; 3425 for key in fetched_keys { 3426 let key_str = String::from_utf8_lossy(&key); 3427 let muted_did = key_str.strip_prefix(&prefix).unwrap_or(&key_str); 3428 match get_profile_internal(&app_state, muted_did, None).await { 3429 Ok(profile) => mutes.push(profile), 3430 Err(e) => tracing::warn!("failed to get profile for muted actor {muted_did}: {e}"), 3431 } 3432 next_cursor = Some(key_str.to_string()); 3433 } 3434 3435 Ok(Json(serde_json::json!({ 3436 "mutes": mutes, 3437 "cursor": next_cursor 3438 })) 3439 .into_response()) 3440} 3441 3442async fn notification_indexer(app_state: AppState) { 3443 let mut stream = app_state.hydrant.subscribe(None); 3444 while let Some(evt) = stream.next().await { 3445 let Some(rec) = evt.record else { continue }; 3446 if rec.action != "create" { 3447 continue; 3448 } 3449 3450 let author_did = rec.did.as_str(); 3451 let collection = rec.collection.as_str(); 3452 let rkey = rec.rkey.as_str(); 3453 let uri = format!("at://{}/{}/{}", author_did, collection, rkey); 3454 3455 let Some(val) = rec.record else { continue }; 3456 3457 let mut targets = Vec::new(); // (target_did, reason, reason_subject) 3458 3459 match collection { 3460 "app.bsky.feed.like" => { 3461 if let Some(subj_uri) = val 3462 .get("subject") 3463 .and_then(|s| s.get("uri")) 3464 .and_then(|u| u.as_str()) 3465 { 3466 match jacquard_common::types::string::AtUri::new(subj_uri) { 3467 Ok(u) => { 3468 targets.push(( 3469 u.authority().as_str().to_string(), 3470 "like", 3471 Some(subj_uri.to_string()), 3472 )); 3473 } 3474 Err(e) => { 3475 tracing::warn!("failed to parse notification like uri {subj_uri}: {e}") 3476 } 3477 } 3478 } 3479 } 3480 "app.bsky.feed.repost" => { 3481 if let Some(subj_uri) = val 3482 .get("subject") 3483 .and_then(|s| s.get("uri")) 3484 .and_then(|u| u.as_str()) 3485 { 3486 match jacquard_common::types::string::AtUri::new(subj_uri) { 3487 Ok(u) => { 3488 targets.push(( 3489 u.authority().as_str().to_string(), 3490 "repost", 3491 Some(subj_uri.to_string()), 3492 )); 3493 } 3494 Err(e) => tracing::warn!( 3495 "failed to parse notification repost uri {subj_uri}: {e}" 3496 ), 3497 } 3498 } 3499 } 3500 "app.bsky.graph.follow" => { 3501 if let Some(subj_did) = val.get("subject").and_then(|s| s.as_str()) { 3502 targets.push((subj_did.to_string(), "follow", None)); 3503 } 3504 } 3505 "app.bsky.feed.post" => { 3506 if let Some(parent_uri) = val 3507 .get("reply") 3508 .and_then(|r| r.get("parent")) 3509 .and_then(|p| p.get("uri")) 3510 .and_then(|u| u.as_str()) 3511 { 3512 match jacquard_common::types::string::AtUri::new(parent_uri) { 3513 Ok(u) => { 3514 targets.push(( 3515 u.authority().as_str().to_string(), 3516 "reply", 3517 Some(parent_uri.to_string()), 3518 )); 3519 } 3520 Err(e) => tracing::warn!( 3521 "failed to parse notification reply uri {parent_uri}: {e}" 3522 ), 3523 } 3524 } 3525 3526 let is_quote = val.get("embed").map_or(false, |embed| { 3527 let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or(""); 3528 if t == "app.bsky.embed.record" { 3529 embed 3530 .get("record") 3531 .and_then(|r| r.get("uri")) 3532 .and_then(|u| u.as_str()) 3533 .is_some() 3534 } else if t == "app.bsky.embed.recordWithMedia" { 3535 embed 3536 .get("record") 3537 .and_then(|r| r.get("record")) 3538 .and_then(|r| r.get("uri")) 3539 .and_then(|u| u.as_str()) 3540 .is_some() 3541 } else { 3542 false 3543 } 3544 }); 3545 3546 if is_quote { 3547 if let Some(embed) = val.get("embed") { 3548 let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or(""); 3549 let quote_uri = if t == "app.bsky.embed.record" { 3550 embed 3551 .get("record") 3552 .and_then(|r| r.get("uri")) 3553 .and_then(|u| u.as_str()) 3554 } else if t == "app.bsky.embed.recordWithMedia" { 3555 embed 3556 .get("record") 3557 .and_then(|r| r.get("record")) 3558 .and_then(|r| r.get("uri")) 3559 .and_then(|u| u.as_str()) 3560 } else { 3561 None 3562 }; 3563 3564 if let Some(qu) = quote_uri { 3565 match jacquard_common::types::string::AtUri::new(qu) { 3566 Ok(u) => { 3567 targets.push(( 3568 u.authority().as_str().to_string(), 3569 "quote", 3570 Some(qu.to_string()), 3571 )); 3572 } 3573 Err(e) => tracing::warn!( 3574 "failed to parse notification quote uri {qu}: {e}" 3575 ), 3576 } 3577 } 3578 } 3579 } 3580 3581 if let Some(facets) = val.get("facets").and_then(|f| f.as_array()) { 3582 for facet in facets { 3583 if let Some(features) = facet.get("features").and_then(|f| f.as_array()) { 3584 for feature in features { 3585 if feature.get("$type").and_then(|t| t.as_str()) 3586 == Some("app.bsky.richtext.facet#mention") 3587 { 3588 if let Some(mention_did) = 3589 feature.get("did").and_then(|d| d.as_str()) 3590 { 3591 targets.push((mention_did.to_string(), "mention", None)); 3592 } 3593 } 3594 } 3595 } 3596 } 3597 } 3598 } 3599 _ => {} 3600 } 3601 3602 let indexed_at = chrono::Utc::now().to_rfc3339(); 3603 for (target_did, reason, reason_subject) in targets { 3604 if target_did == author_did { 3605 continue; 3606 } 3607 3608 let key = format!("notif:{}:{:016x}", target_did, evt.id); 3609 let notif = serde_json::json!({ 3610 "uri": uri, 3611 "cid": rec.cid.map(|c| c.to_string()).unwrap_or_default(), 3612 "author_did": author_did, 3613 "reason": reason, 3614 "reasonSubject": reason_subject, 3615 "record": val, 3616 "indexedAt": indexed_at, 3617 "id": evt.id 3618 }); 3619 3620 let _ = app_state 3621 .notifications 3622 .insert(key.as_bytes(), serde_json::to_vec(&notif).unwrap()); 3623 } 3624 } 3625} 3626 3627#[derive(Deserialize)] 3628struct ListNotificationsParams { 3629 #[serde(default)] 3630 limit: Option<usize>, 3631 #[serde(default)] 3632 cursor: Option<String>, 3633} 3634 3635async fn list_notifications( 3636 State(app_state): State<AppState>, 3637 req: Request, 3638) -> Result<Response, StatusCode> { 3639 // let hydrant = &app_state.hydrant; 3640 let Some(did) = get_auth_did(&req) else { 3641 return Err(StatusCode::UNAUTHORIZED); 3642 }; 3643 3644 let query_str = req.uri().query().unwrap_or(""); 3645 let params = serde_urlencoded::from_str::<ListNotificationsParams>(query_str).unwrap_or( 3646 ListNotificationsParams { 3647 limit: None, 3648 cursor: None, 3649 }, 3650 ); 3651 let limit = params.limit.unwrap_or(50).min(100); 3652 3653 let prefix = format!("notif:{}:", did); 3654 3655 // Collect all notification keys into a vector so we can iterate in reverse 3656 let mut all_keys = Vec::new(); 3657 for item in app_state.notifications.prefix(prefix.as_bytes()) { 3658 match item.into_inner() { 3659 Ok((k, v)) => { 3660 all_keys.push((k, v)); 3661 } 3662 Err(e) => { 3663 tracing::warn!("failed to get notification item: {e}"); 3664 } 3665 } 3666 } 3667 3668 // Sort keys just in case and reverse 3669 all_keys.sort_by(|a, b| b.0.cmp(&a.0)); 3670 3671 // Pagination 3672 if let Some(cursor) = params.cursor { 3673 all_keys.retain(|(k, _)| String::from_utf8_lossy(k).as_ref() < cursor.as_str()); 3674 } 3675 3676 let seen_key = format!("seen:{}", did); 3677 let last_seen = app_state 3678 .seen 3679 .get(seen_key.as_bytes()) 3680 .ok() 3681 .flatten() 3682 .and_then(|b| String::from_utf8(b.to_vec()).ok()) 3683 .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); 3684 3685 let mut notifications = Vec::new(); 3686 let mut next_cursor = None; 3687 3688 for (key, val_bytes) in all_keys.into_iter().take(limit) { 3689 next_cursor = Some(String::from_utf8_lossy(&key).to_string()); 3690 3691 let val = match serde_json::from_slice::<serde_json::Value>(&val_bytes) { 3692 Ok(v) => v, 3693 Err(e) => { 3694 tracing::warn!("failed to parse notification json: {e}"); 3695 continue; 3696 } 3697 }; 3698 3699 let author_did = val.get("author_did").and_then(|a| a.as_str()).unwrap_or(""); 3700 let indexed_at = val.get("indexedAt").and_then(|a| a.as_str()).unwrap_or(""); 3701 3702 let is_read = indexed_at <= last_seen.as_str(); 3703 3704 let mut notif = val.clone(); 3705 match get_profile_internal(&app_state, author_did, None).await { 3706 Ok(author_profile) => { 3707 notif["author"] = author_profile; 3708 } 3709 Err(e) => { 3710 tracing::warn!( 3711 "failed to get author profile for notification from {author_did}: {e}" 3712 ); 3713 continue; 3714 } 3715 } 3716 3717 notif["isRead"] = serde_json::json!(is_read); 3718 3719 notif.as_object_mut().unwrap().remove("author_did"); 3720 notif.as_object_mut().unwrap().remove("id"); 3721 3722 notifications.push(notif); 3723 } 3724 3725 Ok(Json(serde_json::json!({ 3726 "notifications": notifications, 3727 "cursor": if notifications.len() == limit { next_cursor } else { None } 3728 })) 3729 .into_response()) 3730} 3731 3732async fn get_unread_count( 3733 State(app_state): State<AppState>, 3734 req: Request, 3735) -> Result<Response, StatusCode> { 3736 let Some(did) = get_auth_did(&req) else { 3737 return Err(StatusCode::UNAUTHORIZED); 3738 }; 3739 3740 let prefix = format!("notif:{}:", did); 3741 let seen_key = format!("seen:{}", did); 3742 let last_seen = app_state 3743 .seen 3744 .get(seen_key.as_bytes()) 3745 .ok() 3746 .flatten() 3747 .and_then(|b| String::from_utf8(b.to_vec()).ok()) 3748 .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); 3749 3750 let mut count = 0; 3751 for item in app_state.notifications.prefix(prefix.as_bytes()) { 3752 let (_key, val_bytes) = match item.into_inner() { 3753 Ok(inner) => inner, 3754 Err(e) => { 3755 tracing::warn!("failed to get notification item for count: {e}"); 3756 continue; 3757 } 3758 }; 3759 match serde_json::from_slice::<serde_json::Value>(&val_bytes) { 3760 Ok(val) => { 3761 if let Some(indexed_at) = val.get("indexedAt").and_then(|a| a.as_str()) { 3762 if indexed_at > last_seen.as_str() { 3763 count += 1; 3764 } 3765 } 3766 } 3767 Err(e) => tracing::warn!("failed to parse notification json for count: {e}"), 3768 } 3769 } 3770 3771 Ok(Json(serde_json::json!({ 3772 "count": count 3773 })) 3774 .into_response()) 3775} 3776 3777#[derive(Deserialize)] 3778struct UpdateSeenReq { 3779 #[serde(rename = "seenAt")] 3780 seen_at: String, 3781} 3782 3783async fn update_seen( 3784 State(app_state): State<AppState>, 3785 req: Request, 3786) -> Result<Response, StatusCode> { 3787 let Some(did) = get_auth_did(&req) else { 3788 return Err(StatusCode::UNAUTHORIZED); 3789 }; 3790 3791 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) 3792 .await 3793 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 3794 let payload: UpdateSeenReq = serde_json::from_slice(&body_bytes).map_err(|e| { 3795 tracing::error!("failed to parse update_seen request: {e}"); 3796 StatusCode::BAD_REQUEST 3797 })?; 3798 3799 let seen_key = format!("seen:{}", did); 3800 app_state 3801 .seen 3802 .insert(seen_key.as_bytes(), payload.seen_at.as_bytes()) 3803 .map_err(|e| { 3804 tracing::error!("failed to insert seen: {e}"); 3805 StatusCode::INTERNAL_SERVER_ERROR 3806 })?; 3807 3808 Ok(Json(serde_json::json!({})).into_response()) 3809} 3810 3811#[derive(Deserialize)] 3812struct GetFeedParams { 3813 feed: String, 3814 #[serde(default)] 3815 limit: Option<usize>, 3816 #[serde(default)] 3817 cursor: Option<String>, 3818} 3819 3820async fn get_feed(State(app_state): State<AppState>, req: Request) -> Result<Response, StatusCode> { 3821 // let hydrant = &app_state.hydrant; 3822 let query_str = req.uri().query().unwrap_or(""); 3823 let params: GetFeedParams = serde_urlencoded::from_str(query_str).map_err(|e| { 3824 tracing::error!("failed to parse get_feed query params: {e}"); 3825 StatusCode::BAD_REQUEST 3826 })?; 3827 3828 let uri = jacquard_common::types::string::AtUri::new(&params.feed).map_err(|e| { 3829 tracing::error!("failed to parse feed uri {}: {e}", params.feed); 3830 StatusCode::BAD_REQUEST 3831 })?; 3832 let author_ident = uri.authority(); 3833 let rkey = uri 3834 .rkey() 3835 .ok_or_else(|| { 3836 tracing::error!("missing rkey in feed uri {}", params.feed); 3837 StatusCode::BAD_REQUEST 3838 })? 3839 .0 3840 .as_str() 3841 .to_string(); 3842 3843 let repo = match app_state.hydrant.repos.resolve(author_ident).await { 3844 Ok(r) => r, 3845 Err(e) => { 3846 tracing::error!("failed to resolve repo for {author_ident}: {e}"); 3847 return proxy_request(req).await; 3848 } 3849 }; 3850 3851 let record = match repo.get_record("app.bsky.feed.generator", &rkey).await { 3852 Ok(Some(r)) => r, 3853 Ok(None) => { 3854 tracing::error!("feed generator record not found: {author_ident}/{rkey}"); 3855 return proxy_request(req).await; 3856 } 3857 Err(e) => { 3858 tracing::error!("failed to get feed generator record {author_ident}/{rkey}: {e}"); 3859 return proxy_request(req).await; 3860 } 3861 }; 3862 3863 let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); 3864 let Some(service_did) = val_json.get("did").and_then(|d| d.as_str()) else { 3865 tracing::error!("missing did in feed generator record {author_ident}/{rkey}"); 3866 return Err(StatusCode::BAD_REQUEST); 3867 }; 3868 3869 let service_did_parsed = 3870 jacquard_common::types::string::Did::new(service_did).map_err(|e| { 3871 tracing::error!("failed to parse service did {service_did}: {e}"); 3872 StatusCode::BAD_REQUEST 3873 })?; 3874 3875 let (doc_data, _) = match app_state 3876 .hydrant 3877 .resolver() 3878 .resolve_raw_doc(&service_did_parsed) 3879 .await 3880 { 3881 Ok(d) => d, 3882 Err(e) => { 3883 tracing::error!("failed to resolve service did {service_did}: {e}"); 3884 return proxy_request(req).await; 3885 } 3886 }; 3887 3888 let doc_json = serde_json::to_value(&doc_data).unwrap_or(serde_json::json!({})); 3889 let mut endpoint = None; 3890 3891 if let Some(services) = doc_json.get("service").and_then(|s| s.as_array()) { 3892 for srv in services { 3893 if let Some(typ) = srv.get("type") { 3894 let is_fg = if let Some(t_str) = typ.as_str() { 3895 t_str == "BskyFeedGenerator" 3896 } else if let Some(t_arr) = typ.as_array() { 3897 t_arr 3898 .iter() 3899 .any(|v| v.as_str() == Some("BskyFeedGenerator")) 3900 } else { 3901 false 3902 }; 3903 3904 if is_fg { 3905 if let Some(ep) = srv.get("serviceEndpoint").and_then(|e| e.as_str()) { 3906 endpoint = Some(ep.to_string()); 3907 break; 3908 } 3909 } 3910 } 3911 3912 // fallback check by ID 3913 if srv.get("id").and_then(|i| i.as_str()) == Some("#bsky_fg") { 3914 if let Some(ep) = srv.get("serviceEndpoint").and_then(|e| e.as_str()) { 3915 endpoint = Some(ep.to_string()); 3916 break; 3917 } 3918 } 3919 } 3920 } 3921 3922 let Some(ep) = endpoint else { 3923 return proxy_request(req).await; 3924 }; 3925 3926 let mut skeleton_url = format!( 3927 "{}/xrpc/app.bsky.feed.getFeedSkeleton?feed={}", 3928 ep, params.feed 3929 ); 3930 if let Some(limit) = params.limit { 3931 skeleton_url.push_str(&format!("&limit={}", limit)); 3932 } 3933 if let Some(cursor) = params.cursor { 3934 skeleton_url.push_str(&format!("&cursor={}", cursor)); 3935 } 3936 3937 let client = reqwest::Client::new(); 3938 let mut req_builder = client.get(&skeleton_url); 3939 if let Some(auth) = req.headers().get("authorization") { 3940 req_builder = req_builder.header("authorization", auth); 3941 } 3942 3943 let res = match req_builder.send().await { 3944 Ok(r) => r, 3945 Err(e) => { 3946 tracing::error!("failed to send request to feed generator {skeleton_url}: {e}"); 3947 return proxy_request(req).await; 3948 } 3949 }; 3950 3951 let skeleton = match res.json::<serde_json::Value>().await { 3952 Ok(s) => s, 3953 Err(e) => { 3954 tracing::error!("failed to parse feed skeleton json: {e}"); 3955 return proxy_request(req).await; 3956 } 3957 }; 3958 3959 let mut hydrated_feed = Vec::new(); 3960 if let Some(feed_items) = skeleton.get("feed").and_then(|f| f.as_array()) { 3961 for item in feed_items { 3962 if let Some(post_uri) = item.get("post").and_then(|p| p.as_str()) { 3963 match get_post_view(&app_state, post_uri, None).await { 3964 Ok(post_view) => { 3965 let mut feed_item = serde_json::json!({ 3966 "post": post_view 3967 }); 3968 if let Some(reason) = item.get("reason") { 3969 feed_item 3970 .as_object_mut() 3971 .unwrap() 3972 .insert("reason".to_string(), reason.clone()); 3973 } 3974 hydrated_feed.push(feed_item); 3975 } 3976 Err(e) => { 3977 tracing::warn!("failed to get post view for {post_uri} in feed: {e}"); 3978 } 3979 } 3980 } 3981 } 3982 } 3983 3984 Ok(Json(serde_json::json!({ 3985 "feed": hydrated_feed, 3986 "cursor": skeleton.get("cursor") 3987 })) 3988 .into_response()) 3989} 3990 3991async fn get_well_known_did(req: Request) -> Result<Response, StatusCode> { 3992 let host = req 3993 .headers() 3994 .get("host") 3995 .and_then(|h| h.to_str().ok()) 3996 .unwrap_or("localhost:8000"); 3997 3998 let did = if host.contains(':') { 3999 format!("did:web:{}", host.replace(':', "%3A")) 4000 } else { 4001 format!("did:web:{}", host) 4002 }; 4003 4004 let scheme = if host.starts_with("localhost") || host.starts_with("127.0.0.1") { 4005 "http" 4006 } else { 4007 "https" 4008 }; 4009 let service_endpoint = format!("{}://{}", scheme, host); 4010 4011 Ok(Json(serde_json::json!({ 4012 "@context": [ 4013 "https://www.w3.org/ns/did/v1" 4014 ], 4015 "id": did, 4016 "service": [ 4017 { 4018 "id": "#bsky_notif", 4019 "type": "BskyNotificationService", 4020 "serviceEndpoint": service_endpoint 4021 }, 4022 { 4023 "id": "#bsky_appview", 4024 "type": "BskyAppView", 4025 "serviceEndpoint": service_endpoint 4026 } 4027 ] 4028 })) 4029 .into_response()) 4030}