use axum::{ Json, Router, extract::{Request, State}, http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, }; use hydrant::config::Config; use hydrant::control::Hydrant; use hydrant::deps::futures::StreamExt; use jacquard_common::IntoStatic; use jacquard_common::types::ident::AtIdentifier; use serde::Deserialize; use tracing; use tracing_subscriber::EnvFilter; #[derive(Clone)] struct AppState { hydrant: Hydrant, bookmarks: fjall::Keyspace, drafts: fjall::Keyspace, preferences: fjall::Keyspace, mutes: fjall::Keyspace, notifications: fjall::Keyspace, seen: fjall::Keyspace, cdn_url: String, } impl AppState { fn cdn(&self, category: &str, did: &str, link: &str) -> String { format!( "{}/img/{}/plain/{}/{}@jpeg", self.cdn_url, category, did, link ) } } #[tokio::main] async fn main() -> miette::Result<()> { tracing_subscriber::fmt() .with_env_filter( EnvFilter::from_default_env().add_directive("appview=info".parse().unwrap()), ) .init(); hydrant::deps::rustls::crypto::aws_lc_rs::default_provider() .install_default() .ok(); let db = fjall::Database::builder(std::path::Path::new("./data")) .open() .unwrap(); let bookmarks = db .keyspace("bookmarks", || fjall::KeyspaceCreateOptions::default()) .unwrap(); let drafts = db .keyspace("drafts", || fjall::KeyspaceCreateOptions::default()) .unwrap(); let preferences = db .keyspace("preferences", || fjall::KeyspaceCreateOptions::default()) .unwrap(); let mutes = db .keyspace("mutes", || fjall::KeyspaceCreateOptions::default()) .unwrap(); let notifications = db .keyspace("notifications", || fjall::KeyspaceCreateOptions::default()) .unwrap(); let seen = db .keyspace("seen", || fjall::KeyspaceCreateOptions::default()) .unwrap(); let mut cfg = Config::from_env()?; // Enable backlinks cfg.enable_backlinks = true; // By default filter mode is used, so we only track explicitly added repos. cfg.full_network = false; cfg.filter_collections = Some(vec![ "app.bsky.actor.profile".to_string(), "app.bsky.feed.post".to_string(), "app.bsky.feed.repost".to_string(), "app.bsky.feed.like".to_string(), "app.bsky.feed.generator".to_string(), "app.bsky.graph.follow".to_string(), "app.bsky.graph.block".to_string(), "app.bsky.graph.list".to_string(), "app.bsky.graph.listitem".to_string(), "app.bsky.labeler.service".to_string(), ]); let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://cdn.bsky.app".to_string()); let hydrant = Hydrant::new(cfg).await?; let hydrant_clone = hydrant.clone(); let app_state = AppState { hydrant: hydrant.clone(), bookmarks, drafts, preferences, mutes, notifications, seen, cdn_url, }; if let Ok(seed_account) = std::env::var("SEED_ACCOUNT") { let h = hydrant.clone(); tokio::spawn(async move { seed_account_follows(h, seed_account).await; }); } let indexer_state = app_state.clone(); tokio::spawn(async move { notification_indexer(indexer_state).await; }); let app = Router::new() .route("/xrpc/app.bsky.feed.getPostThread", get(get_post_thread)) .route("/xrpc/app.bsky.actor.getProfile", get(get_profile)) .route("/xrpc/app.bsky.actor.getProfiles", get(get_profiles)) .route("/xrpc/app.bsky.actor.getPreferences", get(get_preferences)) .route("/xrpc/app.bsky.actor.putPreferences", post(put_preferences)) .route("/xrpc/app.bsky.feed.getAuthorFeed", get(get_author_feed)) .route("/xrpc/app.bsky.feed.getTimeline", get(get_timeline)) .route("/xrpc/app.bsky.feed.getActorLikes", get(get_actor_likes)) .route("/xrpc/app.bsky.feed.getQuotes", get(get_quotes)) .route( "/xrpc/app.bsky.graph.getRelationships", get(get_relationships), ) .route( "/xrpc/app.bsky.graph.getKnownFollowers", get(get_known_followers), ) .route("/xrpc/app.bsky.graph.getBlocks", get(get_blocks)) .route("/xrpc/app.bsky.graph.getMutes", get(get_mutes)) .route("/xrpc/app.bsky.graph.muteActor", post(mute_actor)) .route("/xrpc/app.bsky.graph.unmuteActor", post(unmute_actor)) .route("/xrpc/app.bsky.graph.getList", get(get_list)) .route("/xrpc/app.bsky.graph.getLists", get(get_lists)) .route( "/xrpc/app.bsky.graph.getListsWithMembership", get(get_lists_with_membership), ) .route( "/xrpc/app.bsky.notification.listNotifications", get(list_notifications), ) .route( "/xrpc/app.bsky.notification.getUnreadCount", get(get_unread_count), ) .route("/xrpc/app.bsky.notification.updateSeen", post(update_seen)) .route("/.well-known/did.json", get(get_well_known_did)) .route("/xrpc/app.bsky.feed.getActorFeeds", get(get_actor_feeds)) .route("/xrpc/app.bsky.feed.getFeed", get(get_feed)) .route( "/xrpc/app.bsky.feed.getFeedGenerator", get(get_feed_generator), ) .route( "/xrpc/app.bsky.feed.getFeedGenerators", get(get_feed_generators), ) .route("/xrpc/app.bsky.feed.getLikes", get(get_likes)) .route("/xrpc/app.bsky.feed.getRepostedBy", get(get_reposted_by)) .route("/xrpc/app.bsky.feed.getPosts", get(get_posts)) .route("/xrpc/app.bsky.graph.getFollows", get(get_follows)) .route("/xrpc/app.bsky.graph.getFollowers", get(get_followers)) .route( "/xrpc/app.bsky.unspecced.getAgeAssuranceState", get(get_age_assurance_state), ) .route("/xrpc/app.bsky.bookmark.getBookmarks", get(get_bookmarks)) .route( "/xrpc/app.bsky.bookmark.createBookmark", post(create_bookmark), ) .route( "/xrpc/app.bsky.bookmark.deleteBookmark", post(delete_bookmark), ) .route("/xrpc/app.bsky.draft.getDrafts", get(get_drafts)) .route("/xrpc/app.bsky.draft.createDraft", post(create_draft)) .route("/xrpc/app.bsky.draft.updateDraft", post(update_draft)) .route("/xrpc/app.bsky.draft.deleteDraft", post(delete_draft)) .fallback(proxy_request) .layer(tower_http::cors::CorsLayer::permissive()) .layer(tower_http::trace::TraceLayer::new_for_http()) .with_state(app_state); let port = std::env::var("PORT") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(8000); let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)) .await .unwrap(); tracing::info!("appview listening on {}", listener.local_addr().unwrap()); tokio::select! { _ = axum::serve(listener, app) => {}, r = hydrant_clone.run()? => { r?; }, } Ok(()) } async fn proxy_request(req: Request) -> Result { tracing::info!("Proxying request: {} {}", req.method(), req.uri()); let client = reqwest::Client::new(); let uri = req.uri(); let mut url = format!("https://public.api.bsky.app{}", uri.path()); if let Some(query) = uri.query() { url.push('?'); url.push_str(query); } let mut req_builder = client.request(req.method().clone(), url); for (name, value) in req.headers() { if name != reqwest::header::HOST { req_builder = req_builder.header(name.clone(), value.clone()); } } // Also proxy the body if it exists let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; if !body_bytes.is_empty() { req_builder = req_builder.body(body_bytes); } let req = req_builder .build() .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let res = client .execute(req) .await .map_err(|_| StatusCode::BAD_GATEWAY)?; let mut axum_res = Response::builder().status(res.status()); for (name, value) in res.headers() { axum_res = axum_res.header(name, value); } let body = axum::body::Body::from_stream(res.bytes_stream()); axum_res .body(body) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) } async fn seed_account_follows(hydrant: Hydrant, seed_account: String) { tracing::info!("Seeding account follows for {}", seed_account); let ident = match AtIdentifier::new(&seed_account) { Ok(i) => i, Err(e) => { tracing::error!("Invalid seed account identifier {}: {}", seed_account, e); return; } }; let repo = match hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("Could not resolve seed account {}: {}", seed_account, e); return; } }; let seed_did = repo.did.clone(); // Use the now-public resolver to get the PDS endpoint let doc = match hydrant.resolver().resolve_doc(&seed_did).await { Ok(d) => d, Err(e) => { tracing::warn!( "Could not resolve DID doc for seed account {}: {}", seed_did, e ); return; } }; let pds_url = doc.pds.as_str(); let mut cursor = None; let mut dids_to_track = Vec::new(); dids_to_track.push(seed_did.clone().into_static()); loop { let mut list_url = format!( "{pds_url}xrpc/com.atproto.repo.listRecords?repo={seed_did}&collection=app.bsky.graph.follow&limit=100", ); if let Some(ref c) = cursor { list_url.push_str(&format!("&cursor={}", c)); } let res = match reqwest::get(&list_url).await { Ok(r) => r, Err(e) => { tracing::error!("Failed to fetch follows from {list_url}: {e}"); break; } }; let json = match res.json::().await { Ok(j) => j, Err(e) => { tracing::error!("Failed to parse follows JSON from {list_url}: {e}"); break; } }; if let Some(records) = json.get("records").and_then(|r| r.as_array()) { for record in records { if let Some(did_str) = record .get("value") .and_then(|v| v.get("subject")) .and_then(|s| s.as_str()) { if let Ok(did) = jacquard_common::types::string::Did::new(did_str) { dids_to_track.push(did.into_static()); } } } } if let Some(c) = json.get("cursor").and_then(|c| c.as_str()) { cursor = Some(c.to_string()); } else { break; } if dids_to_track.len() > 5000 { break; } } tracing::info!("Tracking {} repos from seed", dids_to_track.len()); let iter: Vec<_> = dids_to_track .iter() .map(|d| jacquard_common::types::string::Did::new(d.as_str()).unwrap()) .collect(); let _ = hydrant.repos.track(iter).await; } #[derive(Deserialize)] struct GetPostThreadParams { uri: String, #[serde(default)] depth: Option, #[serde(default, rename = "parentHeight")] parent_height: Option, } async fn get_post_thread( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let viewer_did = get_auth_did(&req); match get_thread_view_post( &app_state, ¶ms.uri, params.depth.unwrap_or(6), params.parent_height.unwrap_or(0), viewer_did.as_deref(), ) .await { Ok(thread) => Ok(Json(serde_json::json!({ "thread": thread })).into_response()), Err(_) => proxy_request(req).await, } } fn profile_to_basic(full: serde_json::Value) -> serde_json::Value { let mut basic = serde_json::json!({ "did": full["did"], "handle": full["handle"], }); if let Some(v) = full.get("displayName") { basic["displayName"] = v.clone(); } if let Some(v) = full.get("avatar") { basic["avatar"] = v.clone(); } if let Some(v) = full.get("viewer") { basic["viewer"] = v.clone(); } if let Some(v) = full.get("labels") { basic["labels"] = v.clone(); } if let Some(v) = full.get("associated") { basic["associated"] = v.clone(); } if let Some(v) = full.get("createdAt") { basic["createdAt"] = v.clone(); } basic } async fn get_post_view( app_state: &AppState, uri_str: &str, viewer_did: Option<&str>, ) -> Result { let uri = jacquard_common::types::string::AtUri::new(uri_str).map_err(|_| StatusCode::BAD_REQUEST)?; let author_ident = uri.authority(); let collection = uri .collection() .ok_or(StatusCode::BAD_REQUEST)? .as_str() .to_string(); let rkey = uri .rkey() .ok_or(StatusCode::BAD_REQUEST)? .0 .as_str() .to_string(); let repo = app_state .hydrant .repos .resolve(author_ident) .await .map_err(|_| StatusCode::NOT_FOUND)?; let record = repo .get_record(&collection, &rkey) .await .map_err(|_| StatusCode::NOT_FOUND)? .ok_or(StatusCode::NOT_FOUND)?; let author_profile = get_profile_internal(&app_state, repo.did.as_str(), viewer_did).await?; let mut viewer_state = serde_json::json!({ "muted": false, "blockedBy": false, }); if let Some(viewer) = viewer_did { let like = app_state .hydrant .backlinks .fetch(uri_str.to_string()) .source("app.bsky.feed.like") .dids(vec![viewer.to_string()]) .limit(1) .run() .await .ok() .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); let repost = app_state .hydrant .backlinks .fetch(uri_str.to_string()) .source("app.bsky.feed.repost") .dids(vec![viewer.to_string()]) .limit(1) .run() .await .ok() .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); if let Some(l) = like { viewer_state["like"] = serde_json::json!(l); } if let Some(r) = repost { viewer_state["repost"] = serde_json::json!(r); } } let mut val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); if val_json.get("$type").is_none() { val_json["$type"] = serde_json::json!("app.bsky.feed.post"); } let created_at = val_json .get("createdAt") .and_then(|v| v.as_str()) .unwrap_or(""); let mut post_view = serde_json::json!({ "$type": "app.bsky.feed.defs#postView", "uri": uri_str, "cid": record.cid.to_string(), "author": profile_to_basic(author_profile), "record": val_json, "replyCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.post").run().await.unwrap_or(0), "repostCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.repost").run().await.unwrap_or(0), "likeCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.like").run().await.unwrap_or(0), "quoteCount": 0, "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at.to_string() }, "viewer": viewer_state, "labels": [], }); if let Some(record_embed) = val_json.get("embed") { let t = record_embed .get("$type") .and_then(|v| v.as_str()) .unwrap_or(""); if t == "app.bsky.embed.images" { if let Some(images) = record_embed.get("images").and_then(|v| v.as_array()) { let mut view_images = Vec::new(); for img in images { let link = img .get("image") .and_then(|v| v.get("ref")) .and_then(|v| v.get("$link")) .and_then(|v| v.as_str()) .unwrap_or(""); let thumb = app_state.cdn("feed_thumbnail", repo.did.as_str(), link); let fullsize = app_state.cdn("feed_fullsize", repo.did.as_str(), link); view_images.push(serde_json::json!({ "thumb": thumb, "fullsize": fullsize, "alt": img.get("alt").and_then(|v| v.as_str()).unwrap_or(""), "aspectRatio": img.get("aspectRatio"), })); } post_view["embed"] = serde_json::json!({ "$type": "app.bsky.embed.images#view", "images": view_images, }); } } else if t == "app.bsky.embed.record" { if let Some(quoted_uri) = record_embed .get("record") .and_then(|v| v.get("uri")) .and_then(|v| v.as_str()) { // For now, let's just do a shallow hydration of the quoted post to avoid infinite recursion // We use None for viewer_did here to simplify and avoid auth-loops if let Ok(quoted_post) = Box::pin(get_post_view(app_state, quoted_uri, None)).await { post_view["embed"] = serde_json::json!({ "$type": "app.bsky.embed.record#view", "record": { "$type": "app.bsky.embed.record#viewRecord", "uri": quoted_post["uri"], "cid": quoted_post["cid"], "author": quoted_post["author"], "value": quoted_post["record"], "labels": quoted_post["labels"], "indexedAt": quoted_post["indexedAt"], "embeds": quoted_post.get("embed").map(|e| vec![e]).unwrap_or_default(), "likeCount": quoted_post["likeCount"], "repostCount": quoted_post["repostCount"], "replyCount": quoted_post["replyCount"], "quoteCount": quoted_post["quoteCount"], } }); } } } } Ok(post_view) } async fn get_thread_view_post( app_state: &AppState, uri_str: &str, depth: usize, parent_height: usize, viewer_did: Option<&str>, ) -> Result { let post = get_post_view(&app_state, uri_str, viewer_did).await?; let mut thread = serde_json::json!({ "$type": "app.bsky.feed.defs#threadViewPost", "post": post, }); if parent_height > 0 { if let Some(reply) = post.get("record").and_then(|r| r.get("reply")) { if let Some(parent_uri) = reply .get("parent") .and_then(|p| p.get("uri")) .and_then(|u| u.as_str()) { match Box::pin(get_thread_view_post( app_state, parent_uri, 0, parent_height - 1, viewer_did, )) .await { Ok(parent_thread) => { thread .as_object_mut() .unwrap() .insert("parent".to_string(), parent_thread); } Err(e) => { tracing::warn!("failed to fetch parent thread {parent_uri}: {e}"); } } } } } if depth > 0 { let replies_page = app_state .hydrant .backlinks .fetch(uri_str.to_string()) .source("app.bsky.feed.post") .limit(50) .run() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let mut replies = Vec::new(); for bl in replies_page.backlinks { match Box::pin(get_thread_view_post( app_state, bl.uri.as_str(), depth - 1, 0, viewer_did, )) .await { Ok(reply_thread) => { replies.push(reply_thread); } Err(e) => { tracing::warn!("failed to fetch reply thread {}: {e}", bl.uri); } } } thread .as_object_mut() .unwrap() .insert("replies".to_string(), serde_json::Value::Array(replies)); } Ok(thread) } #[derive(Deserialize)] struct GetProfileParams { actor: String, } async fn get_profile( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!("failed to parse actor identifier: {e}"); StatusCode::BAD_REQUEST })?; let viewer_did = get_auth_did(&req); match app_state.hydrant.repos.resolve(&ident).await { Ok(repo) => { match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(profile) => return Ok(Json(profile).into_response()), Err(e) => { tracing::warn!("failed to get profile for {}: {e}", repo.did); } } } Err(e) => { tracing::error!("failed to resolve actor {}: {e}", params.actor); } } proxy_request(req).await } async fn get_profile_internal( app_state: &AppState, did_str: &str, viewer_did: Option<&str>, ) -> Result { let did = jacquard_common::types::string::Did::new(did_str).map_err(|_| StatusCode::BAD_REQUEST)?; let repo = app_state.hydrant.repos.get(&did); let info = repo .info() .await .map_err(|_| StatusCode::NOT_FOUND)? .ok_or(StatusCode::NOT_FOUND)?; if !info.tracked { return Err(StatusCode::NOT_FOUND); } let profile_record = repo .get_record("app.bsky.actor.profile", "self") .await .ok() .flatten(); let followers_count = app_state .hydrant .backlinks .count(did_str.to_string()) .source("app.bsky.graph.follow") .run() .await .unwrap_or(0); let follows_count = repo .count_records("app.bsky.graph.follow") .await .unwrap_or(0); let posts_count = repo.count_records("app.bsky.feed.post").await.unwrap_or(0); let handle = info .handle .map(|h| h.to_string()) .unwrap_or_else(|| did_str.to_string()); let mut viewer_state = serde_json::json!({ "muted": false, "blockedBy": false, }); if let Some(viewer) = viewer_did { if viewer != did_str { // following: does viewer follow did_str? let following = app_state .hydrant .backlinks .fetch(did_str.to_string()) .source("app.bsky.graph.follow") .dids(vec![viewer.to_string()]) .limit(1) .run() .await .ok() .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); // followedBy: does did_str follow viewer? let followed_by = app_state .hydrant .backlinks .fetch(viewer.to_string()) .source("app.bsky.graph.follow") .dids(vec![did_str.to_string()]) .limit(1) .run() .await .ok() .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); if let Some(f) = following { viewer_state["following"] = serde_json::json!(f); } if let Some(fb) = followed_by { viewer_state["followedBy"] = serde_json::json!(fb); } // blocking: URI of viewer's block record targeting did_str let blocking = app_state .hydrant .backlinks .fetch(did_str.to_string()) .source("app.bsky.graph.block") .dids(vec![viewer.to_string()]) .limit(1) .run() .await .ok() .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string())); if let Some(b) = blocking { viewer_state["blocking"] = serde_json::json!(b); } // blockedBy: does did_str block viewer? let blocked_by = app_state .hydrant .backlinks .count(viewer.to_string()) .source("app.bsky.graph.block") .dids(vec![did_str.to_string()]) .run() .await .unwrap_or(0) > 0; if blocked_by { viewer_state["blockedBy"] = serde_json::json!(true); } // muted: check local fjall mutes let mute_key = format!("mute:{}:{}", viewer, did_str); if app_state .mutes .get(mute_key.as_bytes()) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? .is_some() { viewer_state["muted"] = serde_json::json!(true); } } } let mut profile = serde_json::json!({ "did": did_str, "handle": handle, "followersCount": followers_count, "followsCount": follows_count, "postsCount": posts_count, "indexedAt": chrono::Utc::now().to_rfc3339(), "viewer": viewer_state, "labels": [], "associated": { "chat": { "allowIncoming": "all" } } }); if let Some(rec) = profile_record { let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(obj) = profile.as_object_mut() { if let Some(display_name) = value.get("displayName") { obj.insert("displayName".to_string(), display_name.clone()); } if let Some(description) = value.get("description") { obj.insert("description".to_string(), description.clone()); } if let Some(avatar) = value.get("avatar") { let link = avatar .get("ref") .and_then(|v| v.get("$link")) .and_then(|v| v.as_str()) .unwrap_or(""); obj.insert( "avatar".to_string(), serde_json::json!(app_state.cdn("avatar", did_str, link)), ); } if let Some(banner) = value.get("banner") { let link = banner .get("ref") .and_then(|v| v.get("$link")) .and_then(|v| v.as_str()) .unwrap_or(""); obj.insert( "banner".to_string(), serde_json::json!(app_state.cdn("banner", did_str, link)), ); } if let Some(created_at) = value.get("createdAt") { obj.insert("createdAt".to_string(), created_at.clone()); } } } Ok(profile) } #[derive(Deserialize)] struct GetAuthorFeedParams { actor: String, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, #[serde(default)] filter: Option, #[serde(default, rename = "includePins")] include_pins: Option, } async fn get_author_feed( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!("failed to parse actor identifier: {e}"); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(repo) => repo, Err(e) => { tracing::error!("failed to resolve actor {}: {e}", params.actor); return proxy_request(req).await; } }; let did = repo.did.clone(); let limit = params.limit.unwrap_or(50).min(100); // Note: Official AppView uses timestamp cursors. // Our local index uses rkeys for list_records. let rkey_cursor = if let Some(c) = ¶ms.cursor { if c.len() < 20 { Some(c.as_str()) } else { None } } else { None }; // Get posts let posts_list = match repo .list_records("app.bsky.feed.post", limit, false, rkey_cursor) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list posts for {}: {e}", did); return proxy_request(req).await; } }; // Get reposts let reposts_list = match repo .list_records("app.bsky.feed.repost", limit, false, rkey_cursor) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list reposts for {}: {e}", did); return proxy_request(req).await; } }; let viewer_did = get_auth_did(&req); let author_profile = match get_profile_internal(&app_state, did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get profile for {}: {e}", did); return proxy_request(req).await; } }; let filter = params.filter.as_deref().unwrap_or("posts_with_replies"); let mut all_items = Vec::new(); for rec in posts_list.records { let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); let created_at = val_json .get("createdAt") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); // Filter logic if filter == "posts_no_replies" && val_json.get("reply").is_some() { continue; } if filter == "posts_and_author_threads" { if let Some(reply) = val_json.get("reply") { let root_uri_str = reply .get("root") .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) .unwrap_or(""); if let Ok(root_uri) = jacquard_common::types::string::AtUri::new(root_uri_str) { if root_uri.authority().as_str() != did.as_str() { continue; } } else { continue; } } } all_items.push((created_at, "post", rec)); } for rec in reposts_list.records { let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); let created_at = val_json .get("createdAt") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); all_items.push((created_at, "repost", rec)); } // Sort by createdAt descending all_items.sort_by(|a, b| b.0.cmp(&a.0)); all_items.truncate(limit); let mut feed = Vec::new(); // Handle Pins let mut pinned_uri_opt = None; if params.include_pins.unwrap_or(false) && params.cursor.is_none() { if let Ok(Some(profile_rec)) = repo.get_record("app.bsky.actor.profile", "self").await { let prof_val = serde_json::to_value(profile_rec.value).unwrap_or(serde_json::json!({})); if let Some(pinned_uri) = prof_val.get("pinnedPost").and_then(|v| v.as_str()) { if let Ok(post_view) = get_post_view(&app_state, pinned_uri, viewer_did.as_deref()).await { feed.push(serde_json::json!({ "post": post_view, })); pinned_uri_opt = Some(pinned_uri.to_string()); } } } } for (created_at, kind, rec) in &all_items { if *kind == "post" { let uri = format!( "at://{}/app.bsky.feed.post/{}", did.as_str(), rec.rkey.as_str() ); // Skip if it was already included as a pin if Some(&uri) == pinned_uri_opt.as_ref() { continue; } let post = match get_post_view(&app_state, &uri, viewer_did.as_deref()).await { Ok(mut p) => { p["author"] = profile_to_basic(author_profile.clone()); p } Err(_) => { let mut val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if val_json.get("$type").is_none() { val_json["$type"] = serde_json::json!("app.bsky.feed.post"); } serde_json::json!({ "$type": "app.bsky.feed.defs#postView", "uri": uri, "cid": rec.cid.to_string(), "author": profile_to_basic(author_profile.clone()), "record": val_json, "replyCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.post").run().await.unwrap_or(0), "repostCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.repost").run().await.unwrap_or(0), "likeCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0), "quoteCount": 0, "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at.clone() }, "viewer": { "muted": false, "blockedBy": false }, "labels": [], }) } }; let mut feed_item = serde_json::json!({ "post": post, }); let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); // Add reply context if it's a reply if let Some(reply) = val_json.get("reply") { if let Some(parent_uri) = reply .get("parent") .and_then(|p| p.get("uri")) .and_then(|u| u.as_str()) { if let Ok(parent_post) = get_post_view(&app_state, parent_uri, viewer_did.as_deref()).await { let root_uri = reply .get("root") .and_then(|p| p.get("uri")) .and_then(|u| u.as_str()) .unwrap_or(parent_uri); if let Ok(root_post) = get_post_view(&app_state, root_uri, viewer_did.as_deref()).await { feed_item["reply"] = serde_json::json!({ "root": root_post, "parent": parent_post, }); } } } } feed.push(feed_item); } else if *kind == "repost" { let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(subject_uri) = val_json .get("subject") .and_then(|s| s.get("uri")) .and_then(|u| u.as_str()) { if let Ok(post_view) = get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await { feed.push(serde_json::json!({ "post": post_view, "reason": { "$type": "app.bsky.feed.defs#reasonRepost", "by": profile_to_basic(author_profile.clone()), "indexedAt": created_at, } })); } } } } let next_cursor = all_items.last().map(|i| i.2.rkey.as_str().to_string()); feed.reverse(); Ok(Json(serde_json::json!({ "feed": feed, "cursor": next_cursor })) .into_response()) } #[derive(Deserialize)] struct GetLikesParams { uri: String, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_likes( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let limit = params.limit.unwrap_or(50).min(100); let mut fetch = app_state .hydrant .backlinks .fetch(params.uri.clone()) .source("app.bsky.feed.like") .limit(limit); if let Some(cursor_str) = params.cursor { match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { Ok(c) => { fetch = fetch.cursor(c); } Err(e) => { tracing::warn!("failed to decode cursor {cursor_str}: {e}"); } } } let backlinks_page = match fetch.run().await { Ok(bp) => bp, Err(e) => { tracing::error!("failed to fetch backlinks for {}: {e}", params.uri); return proxy_request(req).await; } }; let mut likes = Vec::new(); let viewer_did = get_auth_did(&req); for bl in backlinks_page.backlinks { let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { Ok(uri) => uri, Err(e) => { tracing::warn!("failed to parse uri {}: {e}", bl.uri); continue; } }; let author_ident = uri.authority(); let profile = match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref()) .await { Ok(p) => p, Err(e) => { tracing::warn!("failed to get profile for {author_ident}: {e}"); continue; } }; let repo = match app_state.hydrant.repos.resolve(author_ident).await { Ok(repo) => repo, Err(e) => { tracing::warn!("failed to resolve actor {author_ident}: {e}"); continue; } }; let collection = uri.collection().unwrap().as_str(); let rkey = uri.rkey().unwrap().0.as_str(); let record = match repo.get_record(collection, rkey).await { Ok(Some(record)) => record, Ok(None) => { tracing::warn!("record not found: {collection}/{rkey}"); continue; } Err(e) => { tracing::warn!("failed to get record {collection}/{rkey}: {e}"); continue; } }; let value = serde_json::to_value(record.value).unwrap_or(serde_json::json!({})); let created_at = value .get("createdAt") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); likes.push(serde_json::json!({ "actor": profile, "createdAt": created_at, "indexedAt": chrono::Utc::now().to_rfc3339(), })); } let cursor = backlinks_page .next_cursor .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); Ok(Json(serde_json::json!({ "uri": params.uri, "likes": likes, "cursor": cursor, })) .into_response()) } async fn get_reposted_by( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let limit = params.limit.unwrap_or(50).min(100); let mut fetch = app_state .hydrant .backlinks .fetch(params.uri.clone()) .source("app.bsky.feed.repost") .limit(limit); if let Some(cursor_str) = params.cursor { match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { Ok(c) => { fetch = fetch.cursor(c); } Err(e) => { tracing::warn!("failed to decode cursor {cursor_str}: {e}"); } } } let backlinks_page = match fetch.run().await { Ok(bp) => bp, Err(e) => { tracing::error!("failed to fetch backlinks for {}: {e}", params.uri); return proxy_request(req).await; } }; let mut reposted_by = Vec::new(); let viewer_did = get_auth_did(&req); for bl in backlinks_page.backlinks { let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { Ok(uri) => uri, Err(e) => { tracing::warn!("failed to parse uri {}: {e}", bl.uri); continue; } }; let author_ident = uri.authority(); let profile = match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref()) .await { Ok(p) => p, Err(e) => { tracing::warn!("failed to get profile for {author_ident}: {e}"); continue; } }; reposted_by.push(profile); } let cursor = backlinks_page .next_cursor .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); Ok(Json(serde_json::json!({ "uri": params.uri, "repostedBy": reposted_by, "cursor": cursor, })) .into_response()) } fn extract_query_array(query: &str, key: &str) -> Vec { let mut res = Vec::new(); let prefix1 = format!("{}=", key); let prefix2 = format!("{}[]=", key); for part in query.split('&') { let val = part .strip_prefix(&prefix1) .or_else(|| part.strip_prefix(&prefix2)); if let Some(v) = val { if let Ok(decoded) = urlencoding::decode(v) { res.push(decoded.into_owned()); } } } res } async fn get_profiles( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let actors = extract_query_array(query_str, "actors"); if actors.is_empty() { return proxy_request(req).await; } let viewer_did = get_auth_did(&req); let mut profiles = Vec::new(); for actor in actors { let ident = match AtIdentifier::new(&actor) { Ok(ident) => ident, Err(e) => { tracing::warn!("failed to parse actor identifier {actor}: {e}"); continue; } }; match app_state.hydrant.repos.resolve(&ident).await { Ok(repo) => { match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()) .await { Ok(profile) => profiles.push(profile), Err(e) => tracing::warn!("failed to get profile for {}: {e}", repo.did), } } Err(e) => tracing::warn!("failed to resolve actor {actor}: {e}"), } } Ok(Json(serde_json::json!({ "profiles": profiles })).into_response()) } async fn get_posts( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let uris = extract_query_array(query_str, "uris"); if uris.is_empty() { return proxy_request(req).await; } let viewer_did = get_auth_did(&req); let mut posts = Vec::new(); for uri in uris { match get_post_view(&app_state, &uri, viewer_did.as_deref()).await { Ok(post) => posts.push(post), Err(e) => tracing::warn!("failed to get post view for {uri}: {e}"), } } Ok(Json(serde_json::json!({ "posts": posts })).into_response()) } #[derive(Deserialize)] struct GetFollowsParams { actor: String, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_follows( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params: GetFollowsParams = serde_urlencoded::from_str(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!("failed to parse actor identifier: {e}"); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(repo) => repo, Err(e) => { tracing::error!("failed to resolve actor {}: {e}", params.actor); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let record_list = match repo .list_records( "app.bsky.graph.follow", limit, true, params.cursor.as_deref(), ) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list follows for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut follows = Vec::new(); let viewer_did = get_auth_did(&req); for rec in record_list.records { let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) { match get_profile_internal(&app_state, subject_did, viewer_did.as_deref()).await { Ok(profile) => follows.push(profile), Err(e) => tracing::warn!("failed to get profile for {subject_did}: {e}"), } } } let subject_profile = match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get profile for {}: {e}", repo.did); return proxy_request(req).await; } }; Ok(Json(serde_json::json!({ "subject": subject_profile, "follows": follows, "cursor": record_list.cursor.map(|c| c.to_string()) })) .into_response()) } async fn get_followers( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params: GetFollowsParams = serde_urlencoded::from_str(query_str).map_err(|e| { tracing::error!("failed to parse query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!("failed to parse actor identifier: {e}"); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(repo) => repo, Err(e) => { tracing::error!("failed to resolve actor {}: {e}", params.actor); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let mut fetch = app_state .hydrant .backlinks .fetch(repo.did.as_str().to_string()) .source("app.bsky.graph.follow") .limit(limit); if let Some(cursor_str) = params.cursor { match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { Ok(c) => fetch = fetch.cursor(c), Err(e) => tracing::warn!("failed to decode cursor {cursor_str}: {e}"), } } let backlinks_page = match fetch.run().await { Ok(bp) => bp, Err(e) => { tracing::error!("failed to fetch backlinks for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut followers = Vec::new(); let viewer_did = get_auth_did(&req); for bl in backlinks_page.backlinks { match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { Ok(uri) => { let author_ident = uri.authority(); match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref()) .await { Ok(profile) => followers.push(profile), Err(e) => tracing::warn!("failed to get profile for {author_ident}: {e}"), } } Err(e) => tracing::warn!("failed to parse uri {}: {e}", bl.uri), } } let subject_profile = match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get profile for {}: {e}", repo.did); return proxy_request(req).await; } }; let cursor = backlinks_page .next_cursor .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); Ok(Json(serde_json::json!({ "subject": subject_profile, "followers": followers, "cursor": cursor, })) .into_response()) } async fn get_age_assurance_state() -> Result { Ok(Json(serde_json::json!({ "status": "assured" })) .into_response()) } fn get_auth_did(req: &Request) -> Option { let auth = req.headers().get("authorization")?.to_str().ok()?; let token = auth.strip_prefix("Bearer ")?; let parts: Vec<&str> = token.split('.').collect(); if parts.len() != 3 { return None; } use base64::Engine; let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD .decode(parts[1]) .ok()?; let json: serde_json::Value = serde_json::from_slice(&payload).ok()?; json.get("sub") .and_then(|s| s.as_str()) .map(|s| s.to_string()) } #[derive(Deserialize)] struct CreateBookmarkReq { uri: String, cid: String, } async fn create_bookmark( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let payload: CreateBookmarkReq = serde_json::from_slice(&body_bytes).map_err(|e| { tracing::error!("failed to parse create bookmark request: {e}"); StatusCode::BAD_REQUEST })?; let key = format!("bookmark:{}:{}", did, payload.uri); app_state .bookmarks .insert(key.as_bytes(), payload.cid.as_bytes()) .map_err(|e| { tracing::error!("failed to insert bookmark: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(serde_json::json!({})).into_response()) } #[derive(Deserialize)] struct DeleteBookmarkReq { uri: String, } async fn delete_bookmark( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let payload: DeleteBookmarkReq = serde_json::from_slice(&body_bytes).map_err(|e| { tracing::error!("failed to parse delete bookmark request: {e}"); StatusCode::BAD_REQUEST })?; let key = format!("bookmark:{}:{}", did, payload.uri); app_state.bookmarks.remove(key.as_bytes()).map_err(|e| { tracing::error!("failed to remove bookmark: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(serde_json::json!({})).into_response()) } #[derive(Deserialize)] struct GetBookmarksParams { #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_bookmarks( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).unwrap_or(GetBookmarksParams { limit: None, cursor: None, }); let limit = params.limit.unwrap_or(50).min(100); let prefix = format!("bookmark:{}:", did); let iter: Box> = if let Some(cursor) = params.cursor { Box::new(app_state.bookmarks.range::<&[u8], _>(( std::ops::Bound::Included(cursor.as_bytes()), std::ops::Bound::Unbounded, ))) } else { Box::new(app_state.bookmarks.prefix(prefix.as_bytes())) }; let mut fetched_items = Vec::new(); let mut next_cursor = None; for item in iter { let (key, cid_bytes) = match item.into_inner() { Ok(inner) => inner, Err(e) => { tracing::warn!("failed to get item from bookmarks iterator: {e}"); continue; } }; if !key.starts_with(prefix.as_bytes()) { break; } fetched_items.push((key.to_vec(), cid_bytes.to_vec())); if fetched_items.len() >= limit { next_cursor = Some(String::from_utf8_lossy(&key).to_string()); break; } } let mut bookmarks = Vec::new(); for (key, cid_bytes) in fetched_items { let key_str = String::from_utf8_lossy(&key); let uri_str = key_str.strip_prefix(&prefix).unwrap_or(&key_str); let cid_str = String::from_utf8_lossy(&cid_bytes); match jacquard_common::types::string::AtUri::new(uri_str) { Ok(uri) => { let author_ident = uri.authority(); let collection = uri.collection().unwrap().as_str(); let rkey = uri.rkey().unwrap().0.as_str(); match app_state.hydrant.repos.resolve(author_ident).await { Ok(repo) => match repo.get_record(collection, rkey).await { Ok(Some(record)) => { match get_profile_internal(&app_state, repo.did.as_str(), None).await { Ok(author_profile) => { bookmarks.push(serde_json::json!({ "uri": uri_str, "cid": cid_str.to_string(), "author": author_profile, "record": record.value, "replyCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.post").run().await.unwrap_or(0), "repostCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.repost").run().await.unwrap_or(0), "likeCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.like").run().await.unwrap_or(0), "indexedAt": chrono::Utc::now().to_rfc3339(), })); } Err(e) => { tracing::warn!("failed to get profile for {}: {e}", repo.did) } } } Ok(None) => tracing::warn!("bookmark record not found: {uri_str}"), Err(e) => tracing::warn!("failed to get bookmark record {uri_str}: {e}"), }, Err(e) => tracing::warn!("failed to resolve actor {author_ident}: {e}"), } } Err(e) => tracing::warn!("failed to parse bookmark uri {uri_str}: {e}"), } if bookmarks.len() >= limit { next_cursor = Some(String::from_utf8_lossy(&key).to_string()); break; } } Ok(Json(serde_json::json!({ "bookmarks": bookmarks, "cursor": next_cursor, })) .into_response()) } #[derive(Deserialize)] struct CreateDraftReq { draft: serde_json::Value, } async fn create_draft( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|e| { tracing::error!("failed to read request body: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let payload = serde_json::from_slice::(&body_bytes).map_err(|e| { tracing::error!("failed to deserialize create_draft request: {e}"); StatusCode::BAD_REQUEST })?; let tid = jacquard_common::types::tid::Tid::now(1.try_into().unwrap()).to_string(); let key = format!("draft:{}:{}", did, tid); let draft_obj = payload.draft.clone(); let now = chrono::Utc::now().to_rfc3339(); let store_obj = serde_json::json!({ "id": tid, "draft": draft_obj, "createdAt": now, "updatedAt": now, }); let val = serde_json::to_vec(&store_obj).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; app_state .drafts .insert(key.as_bytes(), val) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(serde_json::json!({ "id": tid })).into_response()) } #[derive(Deserialize)] struct UpdateDraftReq { draft: DraftWithId, } #[derive(Deserialize)] struct DraftWithId { id: String, draft: serde_json::Value, } async fn update_draft( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|e| { tracing::error!("failed to read request body: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let payload = serde_json::from_slice::(&body_bytes).map_err(|e| { tracing::error!("failed to deserialize update_draft request: {e}"); StatusCode::BAD_REQUEST })?; let tid = payload.draft.id; let key = format!("draft:{}:{}", did, tid); let existing = app_state.drafts.get(key.as_bytes()).map_err(|e| { tracing::error!("failed to get draft from db: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; if let Some(existing_bytes) = existing.as_deref() { match serde_json::from_slice::(&existing_bytes) { Ok(mut existing_obj) => { let now = chrono::Utc::now().to_rfc3339(); existing_obj["draft"] = payload.draft.draft; existing_obj["updatedAt"] = serde_json::json!(now); let val = serde_json::to_vec(&existing_obj).map_err(|e| { tracing::error!("failed to serialize updated draft: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; app_state.drafts.insert(key.as_bytes(), val).map_err(|e| { tracing::error!("failed to insert updated draft to db: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; } Err(e) => { tracing::error!("failed to parse existing draft from db: {e}"); } } } Ok(Json(serde_json::json!({})).into_response()) } #[derive(Deserialize)] struct DeleteDraftReq { id: String, } async fn delete_draft( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|e| { tracing::error!("failed to read request body: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; let payload = serde_json::from_slice::(&body_bytes).map_err(|e| { tracing::error!("failed to deserialize delete_draft request: {e}"); StatusCode::BAD_REQUEST })?; let key = format!("draft:{}:{}", did, payload.id); app_state.drafts.remove(key.as_bytes()).map_err(|e| { tracing::error!("failed to remove draft from db: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(serde_json::json!({})).into_response()) } #[derive(Deserialize)] struct GetDraftsParams { #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_drafts( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).unwrap_or(GetDraftsParams { limit: None, cursor: None, }); let limit = params.limit.unwrap_or(50).min(100); let prefix = format!("draft:{}:", did); let iter: Box> = if let Some(cursor) = params.cursor { Box::new(app_state.drafts.range::<&[u8], _>(( std::ops::Bound::Included(cursor.as_bytes()), std::ops::Bound::Unbounded, ))) } else { Box::new(app_state.drafts.prefix(prefix.as_bytes())) }; let mut drafts = Vec::new(); let mut next_cursor = None; for item in iter { let (key, val_bytes) = match item.into_inner() { Ok(v) => v, Err(e) => { tracing::error!("failed to get item from drafts iterator: {e}"); continue; } }; if !key.starts_with(prefix.as_bytes()) { break; } match serde_json::from_slice::(&val_bytes) { Ok(obj) => drafts.push(obj), Err(e) => tracing::error!("failed to parse draft from db: {e}"), } if drafts.len() >= limit { next_cursor = Some(String::from_utf8_lossy(&key).to_string()); break; } } Ok(Json(serde_json::json!({ "drafts": drafts, "cursor": next_cursor, })) .into_response()) } #[derive(Deserialize)] struct GetTimelineParams { #[serde(default)] algorithm: Option, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_timeline( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_timeline query params: {e}"); StatusCode::BAD_REQUEST })?; let Some(did_str) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let ident = AtIdentifier::new(&did_str).map_err(|e| { tracing::error!("failed to create AtIdentifier from did_str {did_str}: {e}"); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; // Get all follows of auth user let mut follows = std::collections::HashSet::new(); follows.insert(repo.did.clone()); // Include self let mut cursor = None; loop { let record_list = match repo .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list follows for {}: {e}", repo.did); break; } }; for rec in &record_list.records { let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) { match jacquard_common::types::string::Did::new(subject_did) { Ok(did) => { follows.insert(did.into_static()); } Err(e) => { tracing::error!("failed to parse subject DID {subject_did}: {e}"); } } } } if record_list.cursor.is_none() { break; } cursor = record_list.cursor.map(|c| c.to_string()); } let limit = params.limit.unwrap_or(50).min(100); let mut all_items = Vec::new(); for did in follows { match app_state.hydrant.repos.get(&did).info().await { Ok(Some(info)) => { if !info.tracked { continue; } } Ok(None) => continue, Err(e) => { tracing::error!("failed to get info for followed repo {did}: {e}"); continue; } } let followed_repo = app_state.hydrant.repos.get(&did); // get posts match followed_repo .list_records("app.bsky.feed.post", limit, true, None) .await { Ok(record_list) => { for rec in record_list.records { let rkey = rec.rkey.as_str().to_string(); let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); all_items.push(( did.clone(), "app.bsky.feed.post".to_string(), rkey, val_json, )); } } Err(e) => { tracing::warn!("failed to list posts for {did}: {e}"); } } // get reposts match followed_repo .list_records("app.bsky.feed.repost", limit, true, None) .await { Ok(record_list) => { for rec in record_list.records { let rkey = rec.rkey.as_str().to_string(); let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); all_items.push(( did.clone(), "app.bsky.feed.repost".to_string(), rkey, val_json, )); } } Err(e) => { tracing::warn!("failed to list reposts for {did}: {e}"); } } } // Sort by createdAt descending all_items.sort_by(|a, b| { let ca = a.3.get("createdAt").and_then(|v| v.as_str()).unwrap_or(""); let cb = b.3.get("createdAt").and_then(|v| v.as_str()).unwrap_or(""); cb.cmp(ca) }); if let Some(c) = params.cursor { all_items.retain(|item| { let item_ca = item .3 .get("createdAt") .and_then(|v| v.as_str()) .unwrap_or(""); item_ca < c.as_str() }); } all_items.truncate(limit); let mut feed = Vec::new(); let mut next_cursor = None; let viewer_did = get_auth_did(&req); for (did, col, rkey, value) in all_items { let created_at = value .get("createdAt") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(); next_cursor = Some(created_at.clone()); let uri = format!("at://{}/{}/{}", did.as_str(), col, rkey); if col == "app.bsky.feed.post" { match get_post_view(&app_state, &uri, viewer_did.as_deref()).await { Ok(mut post_view) => { post_view["author"] = profile_to_basic(post_view["author"].clone()); feed.push(serde_json::json!({ "post": post_view })); } Err(e) => tracing::warn!("failed to get post view for {uri}: {e}"), } } else if col == "app.bsky.feed.repost" { if let Some(subject_uri) = value .get("subject") .and_then(|s| s.get("uri")) .and_then(|u| u.as_str()) { match get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await { Ok(mut post_view) => { post_view["author"] = profile_to_basic(post_view["author"].clone()); match get_profile_internal(&app_state, did.as_str(), viewer_did.as_deref()) .await { Ok(reposter_profile) => { feed.push(serde_json::json!({ "post": post_view, "reason": { "$type": "app.bsky.feed.defs#reasonRepost", "by": profile_to_basic(reposter_profile), "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at }, } })); } Err(e) => { tracing::warn!("failed to get profile for reposter {did}: {e}"); } } } Err(e) => { tracing::warn!( "failed to get post view for reposted post {subject_uri}: {e}" ); } } } } } feed.reverse(); Ok(Json(serde_json::json!({ "feed": feed, "cursor": next_cursor })) .into_response()) } #[derive(Deserialize)] struct GetQuotesParams { uri: String, #[serde(default)] cid: Option, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_quotes( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_quotes query params: {e}"); StatusCode::BAD_REQUEST })?; let limit = params.limit.unwrap_or(50).min(100); let mut fetch = app_state .hydrant .backlinks .fetch(params.uri.clone()) .source("app.bsky.feed.post") .limit(limit * 3); if let Some(cursor_str) = params.cursor { match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { Ok(c) => { fetch = fetch.cursor(c); } Err(e) => { tracing::error!("failed to decode cursor {cursor_str}: {e}"); } } } let backlinks_page = match fetch.run().await { Ok(bp) => bp, Err(e) => { tracing::error!("failed to fetch backlinks for {}: {e}", params.uri); return proxy_request(req).await; } }; let mut posts = Vec::new(); let mut found = 0; for bl in backlinks_page.backlinks { if found >= limit { break; } let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { Ok(u) => u, Err(e) => { tracing::error!("failed to parse backlink URI {}: {e}", bl.uri); continue; } }; let author_ident = uri.authority(); let collection = uri.collection().unwrap().as_str(); let rkey = uri.rkey().unwrap().0.as_str(); let repo = match app_state.hydrant.repos.resolve(author_ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {author_ident}: {e}"); continue; } }; let record = match repo.get_record(collection, rkey).await { Ok(Some(rec)) => rec, Ok(None) => continue, Err(e) => { tracing::error!( "failed to get record {collection}/{rkey} from repo {author_ident}: {e}" ); continue; } }; let value = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); let is_quote = value.get("embed").map_or(false, |embed| { let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or(""); if t == "app.bsky.embed.record" { embed .get("record") .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) == Some(params.uri.as_str()) } else if t == "app.bsky.embed.recordWithMedia" { embed .get("record") .and_then(|r| r.get("record")) .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) == Some(params.uri.as_str()) } else { false } }); if is_quote { let viewer_did = get_auth_did(&req); match get_post_view(&app_state, bl.uri.as_str(), viewer_did.as_deref()).await { Ok(post_view) => { posts.push(post_view); found += 1; } Err(e) => { tracing::warn!("failed to get post view for quoted post {}: {e}", bl.uri); } } } } let cursor = backlinks_page .next_cursor .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); Ok(Json(serde_json::json!({ "uri": params.uri, "posts": posts, "cursor": cursor, })) .into_response()) } #[derive(Deserialize)] struct GetActorLikesParams { actor: String, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_actor_likes( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_actor_likes query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!( "failed to create AtIdentifier for actor {}: {e}", params.actor ); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let record_list = match repo .list_records("app.bsky.feed.like", limit, true, params.cursor.as_deref()) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list likes for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut feed = Vec::new(); let viewer_did = get_auth_did(&req); for rec in record_list.records { let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(subject_uri) = value .get("subject") .and_then(|s| s.get("uri")) .and_then(|u| u.as_str()) { match get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await { Ok(post_view) => feed.push(serde_json::json!({ "post": post_view })), Err(e) => { tracing::warn!("failed to get post view for liked post {subject_uri}: {e}") } } } } Ok(Json(serde_json::json!({ "feed": feed, "cursor": record_list.cursor.map(|c| c.to_string()) })) .into_response()) } #[derive(Deserialize)] struct GetRelationshipsParams { actor: String, } async fn get_relationships( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_relationships query params: {e}"); StatusCode::BAD_REQUEST })?; let others = extract_query_array(query_str, "others"); if others.is_empty() { return proxy_request(req).await; } let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!( "failed to create AtIdentifier for actor {}: {e}", params.actor ); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; let actor_did = repo.did.as_str().to_string(); let mut relationships = Vec::new(); for other in others { let other_ident = match AtIdentifier::new(&other) { Ok(i) => i, Err(e) => { tracing::error!("failed to create AtIdentifier for other actor {other}: {e}"); continue; } }; let other_repo = match app_state.hydrant.repos.resolve(&other_ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for other actor {other_ident}: {e}"); continue; } }; let other_did = other_repo.did.as_str().to_string(); let mut following = None; let mut followed_by = None; let mut cursor = None; loop { match repo .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) .await { Ok(rl) => { for rec in &rl.records { let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if val.get("subject").and_then(|s| s.as_str()) == Some(&other_did) { following = Some(format!( "at://{}/app.bsky.graph.follow/{}", actor_did, rec.rkey.as_str() )); break; } } if following.is_some() || rl.cursor.is_none() { break; } cursor = rl.cursor.map(|c| c.to_string()); } Err(e) => { tracing::error!("failed to list follows for {actor_did}: {e}"); break; } } } let mut cursor = None; loop { match other_repo .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) .await { Ok(rl) => { for rec in &rl.records { let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if val.get("subject").and_then(|s| s.as_str()) == Some(&actor_did) { followed_by = Some(format!( "at://{}/app.bsky.graph.follow/{}", other_did, rec.rkey.as_str() )); break; } } if followed_by.is_some() || rl.cursor.is_none() { break; } cursor = rl.cursor.map(|c| c.to_string()); } Err(e) => { tracing::error!("failed to list follows for {other_did}: {e}"); break; } } } relationships.push(serde_json::json!({ "$type": "app.bsky.graph.defs#relationship", "did": other_did, "following": following, "followedBy": followed_by, })); } Ok(Json(serde_json::json!({ "actor": actor_did, "relationships": relationships })) .into_response()) } async fn get_known_followers( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_known_followers query params: {e}"); StatusCode::BAD_REQUEST })?; let Some(did_str) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let auth_ident = AtIdentifier::new(&did_str).map_err(|e| { tracing::error!("failed to create AtIdentifier for auth user {did_str}: {e}"); StatusCode::BAD_REQUEST })?; let auth_repo = match app_state.hydrant.repos.resolve(&auth_ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for auth user {auth_ident}: {e}"); return proxy_request(req).await; } }; let mut auth_follows = std::collections::HashSet::new(); let mut cursor = None; loop { match auth_repo .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref()) .await { Ok(rl) => { for rec in &rl.records { let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(subj) = val.get("subject").and_then(|s| s.as_str()) { auth_follows.insert(subj.to_string()); } } if rl.cursor.is_none() { break; } cursor = rl.cursor.map(|c| c.to_string()); } Err(e) => { tracing::error!( "failed to list follows for auth user {}: {e}", auth_repo.did ); break; } } } let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!( "failed to create AtIdentifier for actor {}: {e}", params.actor ); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let mut fetch = app_state .hydrant .backlinks .fetch(repo.did.as_str().to_string()) .source("app.bsky.graph.follow") .limit(limit * 3); if let Some(cursor_str) = params.cursor { match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { Ok(c) => { fetch = fetch.cursor(c); } Err(e) => { tracing::error!("failed to decode cursor {cursor_str}: {e}"); } } } let backlinks_page = match fetch.run().await { Ok(bp) => bp, Err(e) => { tracing::error!("failed to fetch backlinks for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut followers = Vec::new(); let mut found = 0; for bl in backlinks_page.backlinks { if found >= limit { break; } let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { Ok(u) => u, Err(e) => { tracing::error!("failed to parse backlink URI {}: {e}", bl.uri); continue; } }; let author_ident = uri.authority().as_str().to_string(); if auth_follows.contains(&author_ident) { match get_profile_internal(&app_state, author_ident.as_str(), None).await { Ok(profile) => { followers.push(profile); found += 1; } Err(e) => { tracing::warn!("failed to get profile for follower {author_ident}: {e}"); } } } } let subject_profile = match get_profile_internal(&app_state, repo.did.as_str(), None).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get subject profile for {}: {e}", repo.did); return proxy_request(req).await; } }; let cursor = backlinks_page .next_cursor .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)); Ok(Json(serde_json::json!({ "subject": subject_profile, "followers": followers, "cursor": cursor, })) .into_response()) } async fn get_blocks( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_blocks query params: {e}"); StatusCode::BAD_REQUEST })?; let Some(did_str) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let ident = AtIdentifier::new(&did_str).map_err(|e| { tracing::error!("failed to create AtIdentifier for did_str {did_str}: {e}"); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let record_list = match repo .list_records( "app.bsky.graph.block", limit, true, params.cursor.as_deref(), ) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list blocks for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut blocks = Vec::new(); for rec in record_list.records { let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) { match get_profile_internal(&app_state, subject_did, None).await { Ok(profile) => blocks.push(profile), Err(e) => { tracing::warn!("failed to get profile for blocked actor {subject_did}: {e}") } } } } Ok(Json(serde_json::json!({ "blocks": blocks, "cursor": record_list.cursor.map(|c| c.to_string()) })) .into_response()) } async fn get_list(State(app_state): State, req: Request) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); #[derive(Deserialize)] struct GetListParams { list: String, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_list query params: {e}"); StatusCode::BAD_REQUEST })?; let uri = jacquard_common::types::string::AtUri::new(¶ms.list).map_err(|e| { tracing::error!("failed to parse list URI {}: {e}", params.list); StatusCode::BAD_REQUEST })?; let author_ident = uri.authority(); let rkey = uri .rkey() .ok_or_else(|| { tracing::error!("missing rkey in list URI {}", params.list); StatusCode::BAD_REQUEST })? .0 .as_str() .to_string(); let repo = match app_state.hydrant.repos.resolve(author_ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {author_ident}: {e}"); return proxy_request(req).await; } }; let record = match repo.get_record("app.bsky.graph.list", &rkey).await { Ok(Some(rec)) => rec, Ok(None) => { tracing::error!("list record not found: {author_ident}/app.bsky.graph.list/{rkey}"); return proxy_request(req).await; } Err(e) => { tracing::error!("failed to get list record {author_ident}/{rkey}: {e}"); return proxy_request(req).await; } }; let viewer_did = get_auth_did(&req); let author_profile = match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get author profile for {}: {e}", repo.did); return proxy_request(req).await; } }; let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); let list_view = serde_json::json!({ "uri": params.list, "cid": record.cid.to_string(), "creator": author_profile, "name": val_json.get("name").and_then(|v| v.as_str()).unwrap_or(""), "purpose": val_json.get("purpose").and_then(|v| v.as_str()).unwrap_or("app.bsky.graph.defs#modlist"), "description": val_json.get("description").and_then(|v| v.as_str()), "avatar": val_json.get("avatar").and_then(|v| v.as_str()), "indexedAt": chrono::Utc::now().to_rfc3339(), }); // items from backlinks let limit = params.limit.unwrap_or(50).min(100); let mut fetch = app_state .hydrant .backlinks .fetch(params.list.clone()) .source("app.bsky.graph.listitem") .limit(limit); if let Some(cursor_str) = params.cursor { match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) { Ok(c) => { fetch = fetch.cursor(c); } Err(e) => { tracing::error!("failed to decode cursor {cursor_str}: {e}"); } } } let backlinks_page = match fetch.run().await { Ok(bp) => bp, Err(e) => { tracing::error!("failed to fetch backlinks for list {}: {e}", params.list); return proxy_request(req).await; } }; let mut items = Vec::new(); for bl in backlinks_page.backlinks { let item_uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) { Ok(u) => u, Err(e) => { tracing::error!("failed to parse listitem URI {}: {e}", bl.uri); continue; } }; let item_author = item_uri.authority(); let item_rkey = item_uri.rkey().unwrap().0.as_str(); let item_repo = match app_state.hydrant.repos.resolve(item_author).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for listitem author {item_author}: {e}"); continue; } }; let item_rec = match item_repo .get_record("app.bsky.graph.listitem", item_rkey) .await { Ok(Some(rec)) => rec, Ok(None) => continue, Err(e) => { tracing::error!("failed to get listitem record {item_author}/{item_rkey}: {e}"); continue; } }; let item_val = serde_json::to_value(&item_rec.value).unwrap_or(serde_json::json!({})); if let Some(subject_did) = item_val.get("subject").and_then(|s| s.as_str()) { match get_profile_internal(&app_state, subject_did, None).await { Ok(subject_profile) => { items.push(serde_json::json!({ "uri": bl.uri.as_str(), "subject": subject_profile, })); } Err(e) => { tracing::warn!("failed to get profile for listitem subject {subject_did}: {e}"); } } } } Ok(Json(serde_json::json!({ "list": list_view, "items": items, "cursor": backlinks_page.next_cursor.map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)), })) .into_response()) } async fn get_lists( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_lists query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!( "failed to create AtIdentifier for actor {}: {e}", params.actor ); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; let viewer_did = get_auth_did(&req); let author_profile = match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get author profile for {}: {e}", repo.did); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let record_list = match repo .list_records("app.bsky.graph.list", limit, true, params.cursor.as_deref()) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list lists for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut lists = Vec::new(); for rec in record_list.records { let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); lists.push(serde_json::json!({ "uri": format!("at://{}/app.bsky.graph.list/{}", repo.did.as_str(), rec.rkey.as_str()), "cid": rec.cid.to_string(), "creator": author_profile.clone(), "name": val_json.get("name").and_then(|v| v.as_str()).unwrap_or(""), "purpose": val_json.get("purpose").and_then(|v| v.as_str()).unwrap_or("app.bsky.graph.defs#modlist"), "description": val_json.get("description").and_then(|v| v.as_str()), "avatar": val_json.get("avatar").and_then(|v| v.as_str()), "indexedAt": chrono::Utc::now().to_rfc3339(), })); } Ok(Json(serde_json::json!({ "lists": lists, "cursor": record_list.cursor.map(|c| c.to_string()) })) .into_response()) } async fn get_lists_with_membership( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).map_err(|e| { tracing::error!("failed to parse get_lists_with_membership query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!( "failed to create AtIdentifier for actor {}: {e}", params.actor ); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; // All lists created by actor let record_list = match repo .list_records("app.bsky.graph.list", 100, true, None) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list lists for {}: {e}", repo.did); return proxy_request(req).await; } }; let lists: Vec = Vec::new(); for rec in record_list.records { let _uri = format!( "at://{}/app.bsky.graph.list/{}", repo.did.as_str(), rec.rkey.as_str() ); // Check if subject is in this list // This is tricky without reverse index on listitem subject. // We might just fallback here or implement a scan. // For now, proxy it if we can't efficiently answer. return proxy_request(req).await; } Ok(Json(serde_json::json!({ "lists": lists })).into_response()) } async fn get_actor_feeds( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params: GetAuthorFeedParams = serde_urlencoded::from_str(query_str).map_err(|e| { tracing::error!("failed to parse get_actor_feeds query params: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(¶ms.actor).map_err(|e| { tracing::error!( "failed to create AtIdentifier for actor {}: {e}", params.actor ); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return proxy_request(req).await; } }; let viewer_did = get_auth_did(&req); let author_profile = match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get author profile for {}: {e}", repo.did); return proxy_request(req).await; } }; let limit = params.limit.unwrap_or(50).min(100); let record_list = match repo .list_records( "app.bsky.feed.generator", limit, true, params.cursor.as_deref(), ) .await { Ok(rl) => rl, Err(e) => { tracing::error!("failed to list feed generators for {}: {e}", repo.did); return proxy_request(req).await; } }; let mut feeds = Vec::new(); for rec in record_list.records { let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({})); let uri = format!( "at://{}/app.bsky.feed.generator/{}", repo.did.as_str(), rec.rkey.as_str() ); feeds.push(serde_json::json!({ "uri": uri, "cid": rec.cid.to_string(), "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""), "creator": author_profile.clone(), "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""), "description": val_json.get("description").and_then(|v| v.as_str()), "avatar": val_json.get("avatar").and_then(|v| v.as_str()), "likeCount": app_state.hydrant.backlinks.count(uri).source("app.bsky.feed.like").run().await.unwrap_or(0), "indexedAt": chrono::Utc::now().to_rfc3339(), })); } Ok(Json(serde_json::json!({ "feeds": feeds, "cursor": record_list.cursor.map(|c| c.to_string()) })) .into_response()) } async fn get_feed_generator( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); #[derive(Deserialize)] struct GetFeedGeneratorParams { feed: String, } let params: GetFeedGeneratorParams = serde_urlencoded::from_str(query_str).map_err(|e| { tracing::error!("failed to parse get_feed_generator query params: {e}"); StatusCode::BAD_REQUEST })?; let uri = jacquard_common::types::string::AtUri::new(¶ms.feed).map_err(|e| { tracing::error!("failed to parse feed uri {}: {e}", params.feed); StatusCode::BAD_REQUEST })?; let author_ident = uri.authority(); let rkey = uri .rkey() .ok_or_else(|| { tracing::error!("missing rkey in feed uri {}", params.feed); StatusCode::BAD_REQUEST })? .0 .as_str() .to_string(); let repo = match app_state.hydrant.repos.resolve(author_ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {author_ident}: {e}"); return proxy_request(req).await; } }; let record = match repo.get_record("app.bsky.feed.generator", &rkey).await { Ok(Some(r)) => r, Ok(None) => { tracing::error!("feed generator record not found: {author_ident}/{rkey}"); return proxy_request(req).await; } Err(e) => { tracing::error!("failed to get feed generator record {author_ident}/{rkey}: {e}"); return proxy_request(req).await; } }; let viewer_did = get_auth_did(&req); let author_profile = match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await { Ok(p) => p, Err(e) => { tracing::error!("failed to get author profile for {}: {e}", repo.did); return proxy_request(req).await; } }; let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); let view = serde_json::json!({ "uri": params.feed, "cid": record.cid.to_string(), "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""), "creator": author_profile, "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""), "description": val_json.get("description").and_then(|v| v.as_str()), "avatar": val_json.get("avatar").and_then(|v| v.as_str()), "likeCount": app_state.hydrant.backlinks.count(params.feed.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0), "indexedAt": chrono::Utc::now().to_rfc3339(), }); Ok(Json(serde_json::json!({ "view": view, "isOnline": true, "isValid": true, })) .into_response()) } async fn get_feed_generators( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let feeds = extract_query_array(query_str, "feeds"); if feeds.is_empty() { return proxy_request(req).await; } let mut views = Vec::new(); let viewer_did = get_auth_did(&req); for feed_uri in feeds { let uri = match jacquard_common::types::string::AtUri::new(&feed_uri) { Ok(u) => u, Err(e) => { tracing::warn!("failed to parse feed uri {feed_uri}: {e}"); continue; } }; let author_ident = uri.authority(); let rkey = uri.rkey().unwrap().0.as_str().to_string(); match app_state.hydrant.repos.resolve(author_ident).await { Ok(repo) => match repo.get_record("app.bsky.feed.generator", &rkey).await { Ok(Some(record)) => { match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()) .await { Ok(author_profile) => { let val_json = serde_json::to_value(&record.value) .unwrap_or(serde_json::json!({})); views.push(serde_json::json!({ "uri": feed_uri, "cid": record.cid.to_string(), "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""), "creator": author_profile, "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""), "description": val_json.get("description").and_then(|v| v.as_str()), "avatar": val_json.get("avatar").and_then(|v| v.as_str()), "likeCount": app_state.hydrant.backlinks.count(feed_uri.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0), "indexedAt": chrono::Utc::now().to_rfc3339(), })); } Err(e) => { tracing::warn!("failed to get author profile for {}: {e}", repo.did) } } } Ok(None) => { tracing::warn!("feed generator record not found: {author_ident}/{rkey}") } Err(e) => { tracing::warn!("failed to get feed generator record {author_ident}/{rkey}: {e}") } }, Err(e) => tracing::warn!("failed to resolve repo for {author_ident}: {e}"), } } Ok(Json(serde_json::json!({ "feeds": views })).into_response()) } #[derive(Deserialize)] struct PutPreferencesReq { preferences: Vec, } async fn put_preferences( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let payload: PutPreferencesReq = serde_json::from_slice(&body_bytes).map_err(|e| { tracing::error!("failed to parse put_preferences request: {e}"); StatusCode::BAD_REQUEST })?; let key = format!("prefs:{}", did); let val = serde_json::to_vec(&payload.preferences).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; app_state .preferences .insert(key.as_bytes(), val) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(serde_json::json!({})).into_response()) } async fn get_preferences( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let key = format!("prefs:{}", did); let existing = app_state .preferences .get(key.as_bytes()) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let preferences = if let Some(bytes) = existing { serde_json::from_slice::>(&bytes).unwrap_or_default() } else { Vec::new() }; Ok(Json(serde_json::json!({ "preferences": preferences })).into_response()) } #[derive(Deserialize)] struct MuteActorReq { actor: String, } async fn mute_actor( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let payload: MuteActorReq = serde_json::from_slice(&body_bytes).map_err(|e| { tracing::error!("failed to parse mute_actor/unmute_actor request: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(&payload.actor).map_err(|e| { tracing::error!("failed to parse actor identifier {}: {e}", payload.actor); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return Err(StatusCode::NOT_FOUND); } }; let key = format!("mute:{}:{}", did, repo.did.as_str()); app_state .mutes .insert(key.as_bytes(), b"") .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(serde_json::json!({})).into_response()) } async fn unmute_actor( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let payload: MuteActorReq = serde_json::from_slice(&body_bytes).map_err(|e| { tracing::error!("failed to parse mute_actor/unmute_actor request: {e}"); StatusCode::BAD_REQUEST })?; let ident = AtIdentifier::new(&payload.actor).map_err(|e| { tracing::error!("failed to parse actor identifier {}: {e}", payload.actor); StatusCode::BAD_REQUEST })?; let repo = match app_state.hydrant.repos.resolve(&ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {ident}: {e}"); return Err(StatusCode::NOT_FOUND); } }; let key = format!("mute:{}:{}", did, repo.did.as_str()); app_state .mutes .remove(key.as_bytes()) .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; Ok(Json(serde_json::json!({})).into_response()) } async fn get_mutes( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let query_str = req.uri().query().unwrap_or(""); let params: GetBookmarksParams = serde_urlencoded::from_str(query_str).map_err(|e| { tracing::error!("failed to parse get_mutes query params: {e}"); StatusCode::BAD_REQUEST })?; let limit = params.limit.unwrap_or(50).min(100); let prefix = format!("mute:{}:", did); let iter: Box> = if let Some(cursor) = params.cursor { Box::new(app_state.mutes.range::<&[u8], _>(( std::ops::Bound::Included(cursor.as_bytes()), std::ops::Bound::Unbounded, ))) } else { Box::new(app_state.mutes.prefix(prefix.as_bytes())) }; let mut fetched_keys = Vec::new(); for item in iter { let (key, _) = match item.into_inner() { Ok(inner) => inner, Err(e) => { tracing::warn!("failed to get item from mutes iterator: {e}"); continue; } }; if !key.starts_with(prefix.as_bytes()) { break; } fetched_keys.push(key.to_vec()); if fetched_keys.len() >= limit { break; } } let mut mutes = Vec::new(); let mut next_cursor = None; for key in fetched_keys { let key_str = String::from_utf8_lossy(&key); let muted_did = key_str.strip_prefix(&prefix).unwrap_or(&key_str); match get_profile_internal(&app_state, muted_did, None).await { Ok(profile) => mutes.push(profile), Err(e) => tracing::warn!("failed to get profile for muted actor {muted_did}: {e}"), } next_cursor = Some(key_str.to_string()); } Ok(Json(serde_json::json!({ "mutes": mutes, "cursor": next_cursor })) .into_response()) } async fn notification_indexer(app_state: AppState) { let mut stream = app_state.hydrant.subscribe(None); while let Some(evt) = stream.next().await { let Some(rec) = evt.record else { continue }; if rec.action != "create" { continue; } let author_did = rec.did.as_str(); let collection = rec.collection.as_str(); let rkey = rec.rkey.as_str(); let uri = format!("at://{}/{}/{}", author_did, collection, rkey); let Some(val) = rec.record else { continue }; let mut targets = Vec::new(); // (target_did, reason, reason_subject) match collection { "app.bsky.feed.like" => { if let Some(subj_uri) = val .get("subject") .and_then(|s| s.get("uri")) .and_then(|u| u.as_str()) { match jacquard_common::types::string::AtUri::new(subj_uri) { Ok(u) => { targets.push(( u.authority().as_str().to_string(), "like", Some(subj_uri.to_string()), )); } Err(e) => { tracing::warn!("failed to parse notification like uri {subj_uri}: {e}") } } } } "app.bsky.feed.repost" => { if let Some(subj_uri) = val .get("subject") .and_then(|s| s.get("uri")) .and_then(|u| u.as_str()) { match jacquard_common::types::string::AtUri::new(subj_uri) { Ok(u) => { targets.push(( u.authority().as_str().to_string(), "repost", Some(subj_uri.to_string()), )); } Err(e) => tracing::warn!( "failed to parse notification repost uri {subj_uri}: {e}" ), } } } "app.bsky.graph.follow" => { if let Some(subj_did) = val.get("subject").and_then(|s| s.as_str()) { targets.push((subj_did.to_string(), "follow", None)); } } "app.bsky.feed.post" => { if let Some(parent_uri) = val .get("reply") .and_then(|r| r.get("parent")) .and_then(|p| p.get("uri")) .and_then(|u| u.as_str()) { match jacquard_common::types::string::AtUri::new(parent_uri) { Ok(u) => { targets.push(( u.authority().as_str().to_string(), "reply", Some(parent_uri.to_string()), )); } Err(e) => tracing::warn!( "failed to parse notification reply uri {parent_uri}: {e}" ), } } let is_quote = val.get("embed").map_or(false, |embed| { let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or(""); if t == "app.bsky.embed.record" { embed .get("record") .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) .is_some() } else if t == "app.bsky.embed.recordWithMedia" { embed .get("record") .and_then(|r| r.get("record")) .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) .is_some() } else { false } }); if is_quote { if let Some(embed) = val.get("embed") { let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or(""); let quote_uri = if t == "app.bsky.embed.record" { embed .get("record") .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) } else if t == "app.bsky.embed.recordWithMedia" { embed .get("record") .and_then(|r| r.get("record")) .and_then(|r| r.get("uri")) .and_then(|u| u.as_str()) } else { None }; if let Some(qu) = quote_uri { match jacquard_common::types::string::AtUri::new(qu) { Ok(u) => { targets.push(( u.authority().as_str().to_string(), "quote", Some(qu.to_string()), )); } Err(e) => tracing::warn!( "failed to parse notification quote uri {qu}: {e}" ), } } } } if let Some(facets) = val.get("facets").and_then(|f| f.as_array()) { for facet in facets { if let Some(features) = facet.get("features").and_then(|f| f.as_array()) { for feature in features { if feature.get("$type").and_then(|t| t.as_str()) == Some("app.bsky.richtext.facet#mention") { if let Some(mention_did) = feature.get("did").and_then(|d| d.as_str()) { targets.push((mention_did.to_string(), "mention", None)); } } } } } } } _ => {} } let indexed_at = chrono::Utc::now().to_rfc3339(); for (target_did, reason, reason_subject) in targets { if target_did == author_did { continue; } let key = format!("notif:{}:{:016x}", target_did, evt.id); let notif = serde_json::json!({ "uri": uri, "cid": rec.cid.map(|c| c.to_string()).unwrap_or_default(), "author_did": author_did, "reason": reason, "reasonSubject": reason_subject, "record": val, "indexedAt": indexed_at, "id": evt.id }); let _ = app_state .notifications .insert(key.as_bytes(), serde_json::to_vec(¬if).unwrap()); } } } #[derive(Deserialize)] struct ListNotificationsParams { #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn list_notifications( State(app_state): State, req: Request, ) -> Result { // let hydrant = &app_state.hydrant; let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let query_str = req.uri().query().unwrap_or(""); let params = serde_urlencoded::from_str::(query_str).unwrap_or( ListNotificationsParams { limit: None, cursor: None, }, ); let limit = params.limit.unwrap_or(50).min(100); let prefix = format!("notif:{}:", did); // Collect all notification keys into a vector so we can iterate in reverse let mut all_keys = Vec::new(); for item in app_state.notifications.prefix(prefix.as_bytes()) { match item.into_inner() { Ok((k, v)) => { all_keys.push((k, v)); } Err(e) => { tracing::warn!("failed to get notification item: {e}"); } } } // Sort keys just in case and reverse all_keys.sort_by(|a, b| b.0.cmp(&a.0)); // Pagination if let Some(cursor) = params.cursor { all_keys.retain(|(k, _)| String::from_utf8_lossy(k).as_ref() < cursor.as_str()); } let seen_key = format!("seen:{}", did); let last_seen = app_state .seen .get(seen_key.as_bytes()) .ok() .flatten() .and_then(|b| String::from_utf8(b.to_vec()).ok()) .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); let mut notifications = Vec::new(); let mut next_cursor = None; for (key, val_bytes) in all_keys.into_iter().take(limit) { next_cursor = Some(String::from_utf8_lossy(&key).to_string()); let val = match serde_json::from_slice::(&val_bytes) { Ok(v) => v, Err(e) => { tracing::warn!("failed to parse notification json: {e}"); continue; } }; let author_did = val.get("author_did").and_then(|a| a.as_str()).unwrap_or(""); let indexed_at = val.get("indexedAt").and_then(|a| a.as_str()).unwrap_or(""); let is_read = indexed_at <= last_seen.as_str(); let mut notif = val.clone(); match get_profile_internal(&app_state, author_did, None).await { Ok(author_profile) => { notif["author"] = author_profile; } Err(e) => { tracing::warn!( "failed to get author profile for notification from {author_did}: {e}" ); continue; } } notif["isRead"] = serde_json::json!(is_read); notif.as_object_mut().unwrap().remove("author_did"); notif.as_object_mut().unwrap().remove("id"); notifications.push(notif); } Ok(Json(serde_json::json!({ "notifications": notifications, "cursor": if notifications.len() == limit { next_cursor } else { None } })) .into_response()) } async fn get_unread_count( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let prefix = format!("notif:{}:", did); let seen_key = format!("seen:{}", did); let last_seen = app_state .seen .get(seen_key.as_bytes()) .ok() .flatten() .and_then(|b| String::from_utf8(b.to_vec()).ok()) .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string()); let mut count = 0; for item in app_state.notifications.prefix(prefix.as_bytes()) { let (_key, val_bytes) = match item.into_inner() { Ok(inner) => inner, Err(e) => { tracing::warn!("failed to get notification item for count: {e}"); continue; } }; match serde_json::from_slice::(&val_bytes) { Ok(val) => { if let Some(indexed_at) = val.get("indexedAt").and_then(|a| a.as_str()) { if indexed_at > last_seen.as_str() { count += 1; } } } Err(e) => tracing::warn!("failed to parse notification json for count: {e}"), } } Ok(Json(serde_json::json!({ "count": count })) .into_response()) } #[derive(Deserialize)] struct UpdateSeenReq { #[serde(rename = "seenAt")] seen_at: String, } async fn update_seen( State(app_state): State, req: Request, ) -> Result { let Some(did) = get_auth_did(&req) else { return Err(StatusCode::UNAUTHORIZED); }; let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; let payload: UpdateSeenReq = serde_json::from_slice(&body_bytes).map_err(|e| { tracing::error!("failed to parse update_seen request: {e}"); StatusCode::BAD_REQUEST })?; let seen_key = format!("seen:{}", did); app_state .seen .insert(seen_key.as_bytes(), payload.seen_at.as_bytes()) .map_err(|e| { tracing::error!("failed to insert seen: {e}"); StatusCode::INTERNAL_SERVER_ERROR })?; Ok(Json(serde_json::json!({})).into_response()) } #[derive(Deserialize)] struct GetFeedParams { feed: String, #[serde(default)] limit: Option, #[serde(default)] cursor: Option, } async fn get_feed(State(app_state): State, req: Request) -> Result { // let hydrant = &app_state.hydrant; let query_str = req.uri().query().unwrap_or(""); let params: GetFeedParams = serde_urlencoded::from_str(query_str).map_err(|e| { tracing::error!("failed to parse get_feed query params: {e}"); StatusCode::BAD_REQUEST })?; let uri = jacquard_common::types::string::AtUri::new(¶ms.feed).map_err(|e| { tracing::error!("failed to parse feed uri {}: {e}", params.feed); StatusCode::BAD_REQUEST })?; let author_ident = uri.authority(); let rkey = uri .rkey() .ok_or_else(|| { tracing::error!("missing rkey in feed uri {}", params.feed); StatusCode::BAD_REQUEST })? .0 .as_str() .to_string(); let repo = match app_state.hydrant.repos.resolve(author_ident).await { Ok(r) => r, Err(e) => { tracing::error!("failed to resolve repo for {author_ident}: {e}"); return proxy_request(req).await; } }; let record = match repo.get_record("app.bsky.feed.generator", &rkey).await { Ok(Some(r)) => r, Ok(None) => { tracing::error!("feed generator record not found: {author_ident}/{rkey}"); return proxy_request(req).await; } Err(e) => { tracing::error!("failed to get feed generator record {author_ident}/{rkey}: {e}"); return proxy_request(req).await; } }; let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({})); let Some(service_did) = val_json.get("did").and_then(|d| d.as_str()) else { tracing::error!("missing did in feed generator record {author_ident}/{rkey}"); return Err(StatusCode::BAD_REQUEST); }; let service_did_parsed = jacquard_common::types::string::Did::new(service_did).map_err(|e| { tracing::error!("failed to parse service did {service_did}: {e}"); StatusCode::BAD_REQUEST })?; let (doc_data, _) = match app_state .hydrant .resolver() .resolve_raw_doc(&service_did_parsed) .await { Ok(d) => d, Err(e) => { tracing::error!("failed to resolve service did {service_did}: {e}"); return proxy_request(req).await; } }; let doc_json = serde_json::to_value(&doc_data).unwrap_or(serde_json::json!({})); let mut endpoint = None; if let Some(services) = doc_json.get("service").and_then(|s| s.as_array()) { for srv in services { if let Some(typ) = srv.get("type") { let is_fg = if let Some(t_str) = typ.as_str() { t_str == "BskyFeedGenerator" } else if let Some(t_arr) = typ.as_array() { t_arr .iter() .any(|v| v.as_str() == Some("BskyFeedGenerator")) } else { false }; if is_fg { if let Some(ep) = srv.get("serviceEndpoint").and_then(|e| e.as_str()) { endpoint = Some(ep.to_string()); break; } } } // fallback check by ID if srv.get("id").and_then(|i| i.as_str()) == Some("#bsky_fg") { if let Some(ep) = srv.get("serviceEndpoint").and_then(|e| e.as_str()) { endpoint = Some(ep.to_string()); break; } } } } let Some(ep) = endpoint else { return proxy_request(req).await; }; let mut skeleton_url = format!( "{}/xrpc/app.bsky.feed.getFeedSkeleton?feed={}", ep, params.feed ); if let Some(limit) = params.limit { skeleton_url.push_str(&format!("&limit={}", limit)); } if let Some(cursor) = params.cursor { skeleton_url.push_str(&format!("&cursor={}", cursor)); } let client = reqwest::Client::new(); let mut req_builder = client.get(&skeleton_url); if let Some(auth) = req.headers().get("authorization") { req_builder = req_builder.header("authorization", auth); } let res = match req_builder.send().await { Ok(r) => r, Err(e) => { tracing::error!("failed to send request to feed generator {skeleton_url}: {e}"); return proxy_request(req).await; } }; let skeleton = match res.json::().await { Ok(s) => s, Err(e) => { tracing::error!("failed to parse feed skeleton json: {e}"); return proxy_request(req).await; } }; let mut hydrated_feed = Vec::new(); if let Some(feed_items) = skeleton.get("feed").and_then(|f| f.as_array()) { for item in feed_items { if let Some(post_uri) = item.get("post").and_then(|p| p.as_str()) { match get_post_view(&app_state, post_uri, None).await { Ok(post_view) => { let mut feed_item = serde_json::json!({ "post": post_view }); if let Some(reason) = item.get("reason") { feed_item .as_object_mut() .unwrap() .insert("reason".to_string(), reason.clone()); } hydrated_feed.push(feed_item); } Err(e) => { tracing::warn!("failed to get post view for {post_uri} in feed: {e}"); } } } } } Ok(Json(serde_json::json!({ "feed": hydrated_feed, "cursor": skeleton.get("cursor") })) .into_response()) } async fn get_well_known_did(req: Request) -> Result { let host = req .headers() .get("host") .and_then(|h| h.to_str().ok()) .unwrap_or("localhost:8000"); let did = if host.contains(':') { format!("did:web:{}", host.replace(':', "%3A")) } else { format!("did:web:{}", host) }; let scheme = if host.starts_with("localhost") || host.starts_with("127.0.0.1") { "http" } else { "https" }; let service_endpoint = format!("{}://{}", scheme, host); Ok(Json(serde_json::json!({ "@context": [ "https://www.w3.org/ns/did/v1" ], "id": did, "service": [ { "id": "#bsky_notif", "type": "BskyNotificationService", "serviceEndpoint": service_endpoint }, { "id": "#bsky_appview", "type": "BskyAppView", "serviceEndpoint": service_endpoint } ] })) .into_response()) }