slop slop slop sahuuuurrr
1use axum::{
2 Json, Router,
3 extract::{Request, State},
4 http::StatusCode,
5 response::{IntoResponse, Response},
6 routing::{get, post},
7};
8use hydrant::config::Config;
9use hydrant::control::Hydrant;
10use hydrant::deps::futures::StreamExt;
11use jacquard_common::IntoStatic;
12use jacquard_common::types::ident::AtIdentifier;
13use serde::Deserialize;
14use tracing;
15use tracing_subscriber::EnvFilter;
16
17#[derive(Clone)]
18struct AppState {
19 hydrant: Hydrant,
20 bookmarks: fjall::Keyspace,
21 drafts: fjall::Keyspace,
22 preferences: fjall::Keyspace,
23 mutes: fjall::Keyspace,
24 notifications: fjall::Keyspace,
25 seen: fjall::Keyspace,
26 cdn_url: String,
27}
28
29impl AppState {
30 fn cdn(&self, category: &str, did: &str, link: &str) -> String {
31 format!(
32 "{}/img/{}/plain/{}/{}@jpeg",
33 self.cdn_url, category, did, link
34 )
35 }
36}
37
38#[tokio::main]
39async fn main() -> miette::Result<()> {
40 tracing_subscriber::fmt()
41 .with_env_filter(
42 EnvFilter::from_default_env().add_directive("appview=info".parse().unwrap()),
43 )
44 .init();
45
46 hydrant::deps::rustls::crypto::aws_lc_rs::default_provider()
47 .install_default()
48 .ok();
49
50 let db = fjall::Database::builder(std::path::Path::new("./data"))
51 .open()
52 .unwrap();
53 let bookmarks = db
54 .keyspace("bookmarks", || fjall::KeyspaceCreateOptions::default())
55 .unwrap();
56 let drafts = db
57 .keyspace("drafts", || fjall::KeyspaceCreateOptions::default())
58 .unwrap();
59 let preferences = db
60 .keyspace("preferences", || fjall::KeyspaceCreateOptions::default())
61 .unwrap();
62 let mutes = db
63 .keyspace("mutes", || fjall::KeyspaceCreateOptions::default())
64 .unwrap();
65 let notifications = db
66 .keyspace("notifications", || fjall::KeyspaceCreateOptions::default())
67 .unwrap();
68 let seen = db
69 .keyspace("seen", || fjall::KeyspaceCreateOptions::default())
70 .unwrap();
71
72 let mut cfg = Config::from_env()?;
73
74 // Enable backlinks
75 cfg.enable_backlinks = true;
76
77 // By default filter mode is used, so we only track explicitly added repos.
78 cfg.full_network = false;
79
80 cfg.filter_collections = Some(vec![
81 "app.bsky.actor.profile".to_string(),
82 "app.bsky.feed.post".to_string(),
83 "app.bsky.feed.repost".to_string(),
84 "app.bsky.feed.like".to_string(),
85 "app.bsky.feed.generator".to_string(),
86 "app.bsky.graph.follow".to_string(),
87 "app.bsky.graph.block".to_string(),
88 "app.bsky.graph.list".to_string(),
89 "app.bsky.graph.listitem".to_string(),
90 "app.bsky.labeler.service".to_string(),
91 ]);
92
93 let cdn_url = std::env::var("CDN_URL").unwrap_or_else(|_| "https://cdn.bsky.app".to_string());
94
95 let hydrant = Hydrant::new(cfg).await?;
96 let hydrant_clone = hydrant.clone();
97 let app_state = AppState {
98 hydrant: hydrant.clone(),
99 bookmarks,
100 drafts,
101 preferences,
102 mutes,
103 notifications,
104 seen,
105 cdn_url,
106 };
107
108 if let Ok(seed_account) = std::env::var("SEED_ACCOUNT") {
109 let h = hydrant.clone();
110 tokio::spawn(async move {
111 seed_account_follows(h, seed_account).await;
112 });
113 }
114
115 let indexer_state = app_state.clone();
116 tokio::spawn(async move {
117 notification_indexer(indexer_state).await;
118 });
119
120 let app = Router::new()
121 .route("/xrpc/app.bsky.feed.getPostThread", get(get_post_thread))
122 .route("/xrpc/app.bsky.actor.getProfile", get(get_profile))
123 .route("/xrpc/app.bsky.actor.getProfiles", get(get_profiles))
124 .route("/xrpc/app.bsky.actor.getPreferences", get(get_preferences))
125 .route("/xrpc/app.bsky.actor.putPreferences", post(put_preferences))
126 .route("/xrpc/app.bsky.feed.getAuthorFeed", get(get_author_feed))
127 .route("/xrpc/app.bsky.feed.getTimeline", get(get_timeline))
128 .route("/xrpc/app.bsky.feed.getActorLikes", get(get_actor_likes))
129 .route("/xrpc/app.bsky.feed.getQuotes", get(get_quotes))
130 .route(
131 "/xrpc/app.bsky.graph.getRelationships",
132 get(get_relationships),
133 )
134 .route(
135 "/xrpc/app.bsky.graph.getKnownFollowers",
136 get(get_known_followers),
137 )
138 .route("/xrpc/app.bsky.graph.getBlocks", get(get_blocks))
139 .route("/xrpc/app.bsky.graph.getMutes", get(get_mutes))
140 .route("/xrpc/app.bsky.graph.muteActor", post(mute_actor))
141 .route("/xrpc/app.bsky.graph.unmuteActor", post(unmute_actor))
142 .route("/xrpc/app.bsky.graph.getList", get(get_list))
143 .route("/xrpc/app.bsky.graph.getLists", get(get_lists))
144 .route(
145 "/xrpc/app.bsky.graph.getListsWithMembership",
146 get(get_lists_with_membership),
147 )
148 .route(
149 "/xrpc/app.bsky.notification.listNotifications",
150 get(list_notifications),
151 )
152 .route(
153 "/xrpc/app.bsky.notification.getUnreadCount",
154 get(get_unread_count),
155 )
156 .route("/xrpc/app.bsky.notification.updateSeen", post(update_seen))
157 .route("/.well-known/did.json", get(get_well_known_did))
158 .route("/xrpc/app.bsky.feed.getActorFeeds", get(get_actor_feeds))
159 .route("/xrpc/app.bsky.feed.getFeed", get(get_feed))
160 .route(
161 "/xrpc/app.bsky.feed.getFeedGenerator",
162 get(get_feed_generator),
163 )
164 .route(
165 "/xrpc/app.bsky.feed.getFeedGenerators",
166 get(get_feed_generators),
167 )
168 .route("/xrpc/app.bsky.feed.getLikes", get(get_likes))
169 .route("/xrpc/app.bsky.feed.getRepostedBy", get(get_reposted_by))
170 .route("/xrpc/app.bsky.feed.getPosts", get(get_posts))
171 .route("/xrpc/app.bsky.graph.getFollows", get(get_follows))
172 .route("/xrpc/app.bsky.graph.getFollowers", get(get_followers))
173 .route(
174 "/xrpc/app.bsky.unspecced.getAgeAssuranceState",
175 get(get_age_assurance_state),
176 )
177 .route("/xrpc/app.bsky.bookmark.getBookmarks", get(get_bookmarks))
178 .route(
179 "/xrpc/app.bsky.bookmark.createBookmark",
180 post(create_bookmark),
181 )
182 .route(
183 "/xrpc/app.bsky.bookmark.deleteBookmark",
184 post(delete_bookmark),
185 )
186 .route("/xrpc/app.bsky.draft.getDrafts", get(get_drafts))
187 .route("/xrpc/app.bsky.draft.createDraft", post(create_draft))
188 .route("/xrpc/app.bsky.draft.updateDraft", post(update_draft))
189 .route("/xrpc/app.bsky.draft.deleteDraft", post(delete_draft))
190 .fallback(proxy_request)
191 .layer(tower_http::cors::CorsLayer::permissive())
192 .layer(tower_http::trace::TraceLayer::new_for_http())
193 .with_state(app_state);
194
195 let port = std::env::var("PORT")
196 .ok()
197 .and_then(|s| s.parse().ok())
198 .unwrap_or(8000);
199 let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
200 .await
201 .unwrap();
202 tracing::info!("appview listening on {}", listener.local_addr().unwrap());
203
204 tokio::select! {
205 _ = axum::serve(listener, app) => {},
206 r = hydrant_clone.run()? => { r?; },
207 }
208
209 Ok(())
210}
211
212async fn proxy_request(req: Request) -> Result<Response, StatusCode> {
213 tracing::info!("Proxying request: {} {}", req.method(), req.uri());
214 let client = reqwest::Client::new();
215 let uri = req.uri();
216 let mut url = format!("https://public.api.bsky.app{}", uri.path());
217 if let Some(query) = uri.query() {
218 url.push('?');
219 url.push_str(query);
220 }
221
222 let mut req_builder = client.request(req.method().clone(), url);
223 for (name, value) in req.headers() {
224 if name != reqwest::header::HOST {
225 req_builder = req_builder.header(name.clone(), value.clone());
226 }
227 }
228
229 // Also proxy the body if it exists
230 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
231 .await
232 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
233 if !body_bytes.is_empty() {
234 req_builder = req_builder.body(body_bytes);
235 }
236
237 let req = req_builder
238 .build()
239 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
240 let res = client
241 .execute(req)
242 .await
243 .map_err(|_| StatusCode::BAD_GATEWAY)?;
244
245 let mut axum_res = Response::builder().status(res.status());
246 for (name, value) in res.headers() {
247 axum_res = axum_res.header(name, value);
248 }
249 let body = axum::body::Body::from_stream(res.bytes_stream());
250 axum_res
251 .body(body)
252 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
253}
254
255async fn seed_account_follows(hydrant: Hydrant, seed_account: String) {
256 tracing::info!("Seeding account follows for {}", seed_account);
257 let ident = match AtIdentifier::new(&seed_account) {
258 Ok(i) => i,
259 Err(e) => {
260 tracing::error!("Invalid seed account identifier {}: {}", seed_account, e);
261 return;
262 }
263 };
264 let repo = match hydrant.repos.resolve(&ident).await {
265 Ok(r) => r,
266 Err(e) => {
267 tracing::error!("Could not resolve seed account {}: {}", seed_account, e);
268 return;
269 }
270 };
271 let seed_did = repo.did.clone();
272
273 // Use the now-public resolver to get the PDS endpoint
274 let doc = match hydrant.resolver().resolve_doc(&seed_did).await {
275 Ok(d) => d,
276 Err(e) => {
277 tracing::warn!(
278 "Could not resolve DID doc for seed account {}: {}",
279 seed_did,
280 e
281 );
282 return;
283 }
284 };
285
286 let pds_url = doc.pds.as_str();
287
288 let mut cursor = None;
289 let mut dids_to_track = Vec::new();
290 dids_to_track.push(seed_did.clone().into_static());
291
292 loop {
293 let mut list_url = format!(
294 "{pds_url}xrpc/com.atproto.repo.listRecords?repo={seed_did}&collection=app.bsky.graph.follow&limit=100",
295 );
296 if let Some(ref c) = cursor {
297 list_url.push_str(&format!("&cursor={}", c));
298 }
299
300 let res = match reqwest::get(&list_url).await {
301 Ok(r) => r,
302 Err(e) => {
303 tracing::error!("Failed to fetch follows from {list_url}: {e}");
304 break;
305 }
306 };
307 let json = match res.json::<serde_json::Value>().await {
308 Ok(j) => j,
309 Err(e) => {
310 tracing::error!("Failed to parse follows JSON from {list_url}: {e}");
311 break;
312 }
313 };
314
315 if let Some(records) = json.get("records").and_then(|r| r.as_array()) {
316 for record in records {
317 if let Some(did_str) = record
318 .get("value")
319 .and_then(|v| v.get("subject"))
320 .and_then(|s| s.as_str())
321 {
322 if let Ok(did) = jacquard_common::types::string::Did::new(did_str) {
323 dids_to_track.push(did.into_static());
324 }
325 }
326 }
327 }
328
329 if let Some(c) = json.get("cursor").and_then(|c| c.as_str()) {
330 cursor = Some(c.to_string());
331 } else {
332 break;
333 }
334
335 if dids_to_track.len() > 5000 {
336 break;
337 }
338 }
339
340 tracing::info!("Tracking {} repos from seed", dids_to_track.len());
341 let iter: Vec<_> = dids_to_track
342 .iter()
343 .map(|d| jacquard_common::types::string::Did::new(d.as_str()).unwrap())
344 .collect();
345 let _ = hydrant.repos.track(iter).await;
346}
347
348#[derive(Deserialize)]
349struct GetPostThreadParams {
350 uri: String,
351 #[serde(default)]
352 depth: Option<usize>,
353 #[serde(default, rename = "parentHeight")]
354 parent_height: Option<usize>,
355}
356
357async fn get_post_thread(
358 State(app_state): State<AppState>,
359 req: Request,
360) -> Result<Response, StatusCode> {
361 // let hydrant = &app_state.hydrant;
362 let query_str = req.uri().query().unwrap_or("");
363 let params = serde_urlencoded::from_str::<GetPostThreadParams>(query_str).map_err(|e| {
364 tracing::error!("failed to parse query params: {e}");
365 StatusCode::BAD_REQUEST
366 })?;
367
368 let viewer_did = get_auth_did(&req);
369 match get_thread_view_post(
370 &app_state,
371 ¶ms.uri,
372 params.depth.unwrap_or(6),
373 params.parent_height.unwrap_or(0),
374 viewer_did.as_deref(),
375 )
376 .await
377 {
378 Ok(thread) => Ok(Json(serde_json::json!({ "thread": thread })).into_response()),
379 Err(_) => proxy_request(req).await,
380 }
381}
382
383fn profile_to_basic(full: serde_json::Value) -> serde_json::Value {
384 let mut basic = serde_json::json!({
385 "did": full["did"],
386 "handle": full["handle"],
387 });
388 if let Some(v) = full.get("displayName") {
389 basic["displayName"] = v.clone();
390 }
391 if let Some(v) = full.get("avatar") {
392 basic["avatar"] = v.clone();
393 }
394 if let Some(v) = full.get("viewer") {
395 basic["viewer"] = v.clone();
396 }
397 if let Some(v) = full.get("labels") {
398 basic["labels"] = v.clone();
399 }
400 if let Some(v) = full.get("associated") {
401 basic["associated"] = v.clone();
402 }
403 if let Some(v) = full.get("createdAt") {
404 basic["createdAt"] = v.clone();
405 }
406 basic
407}
408
409async fn get_post_view(
410 app_state: &AppState,
411 uri_str: &str,
412 viewer_did: Option<&str>,
413) -> Result<serde_json::Value, StatusCode> {
414 let uri =
415 jacquard_common::types::string::AtUri::new(uri_str).map_err(|_| StatusCode::BAD_REQUEST)?;
416 let author_ident = uri.authority();
417 let collection = uri
418 .collection()
419 .ok_or(StatusCode::BAD_REQUEST)?
420 .as_str()
421 .to_string();
422 let rkey = uri
423 .rkey()
424 .ok_or(StatusCode::BAD_REQUEST)?
425 .0
426 .as_str()
427 .to_string();
428
429 let repo = app_state
430 .hydrant
431 .repos
432 .resolve(author_ident)
433 .await
434 .map_err(|_| StatusCode::NOT_FOUND)?;
435 let record = repo
436 .get_record(&collection, &rkey)
437 .await
438 .map_err(|_| StatusCode::NOT_FOUND)?
439 .ok_or(StatusCode::NOT_FOUND)?;
440
441 let author_profile = get_profile_internal(&app_state, repo.did.as_str(), viewer_did).await?;
442
443 let mut viewer_state = serde_json::json!({
444 "muted": false,
445 "blockedBy": false,
446 });
447 if let Some(viewer) = viewer_did {
448 let like = app_state
449 .hydrant
450 .backlinks
451 .fetch(uri_str.to_string())
452 .source("app.bsky.feed.like")
453 .dids(vec![viewer.to_string()])
454 .limit(1)
455 .run()
456 .await
457 .ok()
458 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string()));
459
460 let repost = app_state
461 .hydrant
462 .backlinks
463 .fetch(uri_str.to_string())
464 .source("app.bsky.feed.repost")
465 .dids(vec![viewer.to_string()])
466 .limit(1)
467 .run()
468 .await
469 .ok()
470 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string()));
471
472 if let Some(l) = like {
473 viewer_state["like"] = serde_json::json!(l);
474 }
475 if let Some(r) = repost {
476 viewer_state["repost"] = serde_json::json!(r);
477 }
478 }
479
480 let mut val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({}));
481 if val_json.get("$type").is_none() {
482 val_json["$type"] = serde_json::json!("app.bsky.feed.post");
483 }
484
485 let created_at = val_json
486 .get("createdAt")
487 .and_then(|v| v.as_str())
488 .unwrap_or("");
489
490 let mut post_view = serde_json::json!({
491 "$type": "app.bsky.feed.defs#postView",
492 "uri": uri_str,
493 "cid": record.cid.to_string(),
494 "author": profile_to_basic(author_profile),
495 "record": val_json,
496 "replyCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.post").run().await.unwrap_or(0),
497 "repostCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.repost").run().await.unwrap_or(0),
498 "likeCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.like").run().await.unwrap_or(0),
499 "quoteCount": 0,
500 "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at.to_string() },
501 "viewer": viewer_state,
502 "labels": [],
503 });
504
505 if let Some(record_embed) = val_json.get("embed") {
506 let t = record_embed
507 .get("$type")
508 .and_then(|v| v.as_str())
509 .unwrap_or("");
510 if t == "app.bsky.embed.images" {
511 if let Some(images) = record_embed.get("images").and_then(|v| v.as_array()) {
512 let mut view_images = Vec::new();
513 for img in images {
514 let link = img
515 .get("image")
516 .and_then(|v| v.get("ref"))
517 .and_then(|v| v.get("$link"))
518 .and_then(|v| v.as_str())
519 .unwrap_or("");
520 let thumb = app_state.cdn("feed_thumbnail", repo.did.as_str(), link);
521 let fullsize = app_state.cdn("feed_fullsize", repo.did.as_str(), link);
522 view_images.push(serde_json::json!({
523 "thumb": thumb,
524 "fullsize": fullsize,
525 "alt": img.get("alt").and_then(|v| v.as_str()).unwrap_or(""),
526 "aspectRatio": img.get("aspectRatio"),
527 }));
528 }
529 post_view["embed"] = serde_json::json!({
530 "$type": "app.bsky.embed.images#view",
531 "images": view_images,
532 });
533 }
534 } else if t == "app.bsky.embed.record" {
535 if let Some(quoted_uri) = record_embed
536 .get("record")
537 .and_then(|v| v.get("uri"))
538 .and_then(|v| v.as_str())
539 {
540 // For now, let's just do a shallow hydration of the quoted post to avoid infinite recursion
541 // We use None for viewer_did here to simplify and avoid auth-loops
542 if let Ok(quoted_post) = Box::pin(get_post_view(app_state, quoted_uri, None)).await
543 {
544 post_view["embed"] = serde_json::json!({
545 "$type": "app.bsky.embed.record#view",
546 "record": {
547 "$type": "app.bsky.embed.record#viewRecord",
548 "uri": quoted_post["uri"],
549 "cid": quoted_post["cid"],
550 "author": quoted_post["author"],
551 "value": quoted_post["record"],
552 "labels": quoted_post["labels"],
553 "indexedAt": quoted_post["indexedAt"],
554 "embeds": quoted_post.get("embed").map(|e| vec![e]).unwrap_or_default(),
555 "likeCount": quoted_post["likeCount"],
556 "repostCount": quoted_post["repostCount"],
557 "replyCount": quoted_post["replyCount"],
558 "quoteCount": quoted_post["quoteCount"],
559 }
560 });
561 }
562 }
563 }
564 }
565
566 Ok(post_view)
567}
568
569async fn get_thread_view_post(
570 app_state: &AppState,
571 uri_str: &str,
572 depth: usize,
573 parent_height: usize,
574 viewer_did: Option<&str>,
575) -> Result<serde_json::Value, StatusCode> {
576 let post = get_post_view(&app_state, uri_str, viewer_did).await?;
577 let mut thread = serde_json::json!({
578 "$type": "app.bsky.feed.defs#threadViewPost",
579 "post": post,
580 });
581
582 if parent_height > 0 {
583 if let Some(reply) = post.get("record").and_then(|r| r.get("reply")) {
584 if let Some(parent_uri) = reply
585 .get("parent")
586 .and_then(|p| p.get("uri"))
587 .and_then(|u| u.as_str())
588 {
589 match Box::pin(get_thread_view_post(
590 app_state,
591 parent_uri,
592 0,
593 parent_height - 1,
594 viewer_did,
595 ))
596 .await
597 {
598 Ok(parent_thread) => {
599 thread
600 .as_object_mut()
601 .unwrap()
602 .insert("parent".to_string(), parent_thread);
603 }
604 Err(e) => {
605 tracing::warn!("failed to fetch parent thread {parent_uri}: {e}");
606 }
607 }
608 }
609 }
610 }
611
612 if depth > 0 {
613 let replies_page = app_state
614 .hydrant
615 .backlinks
616 .fetch(uri_str.to_string())
617 .source("app.bsky.feed.post")
618 .limit(50)
619 .run()
620 .await
621 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
622 let mut replies = Vec::new();
623 for bl in replies_page.backlinks {
624 match Box::pin(get_thread_view_post(
625 app_state,
626 bl.uri.as_str(),
627 depth - 1,
628 0,
629 viewer_did,
630 ))
631 .await
632 {
633 Ok(reply_thread) => {
634 replies.push(reply_thread);
635 }
636 Err(e) => {
637 tracing::warn!("failed to fetch reply thread {}: {e}", bl.uri);
638 }
639 }
640 }
641 thread
642 .as_object_mut()
643 .unwrap()
644 .insert("replies".to_string(), serde_json::Value::Array(replies));
645 }
646
647 Ok(thread)
648}
649
650#[derive(Deserialize)]
651struct GetProfileParams {
652 actor: String,
653}
654
655async fn get_profile(
656 State(app_state): State<AppState>,
657 req: Request,
658) -> Result<Response, StatusCode> {
659 // let hydrant = &app_state.hydrant;
660 let query_str = req.uri().query().unwrap_or("");
661 let params = serde_urlencoded::from_str::<GetProfileParams>(query_str).map_err(|e| {
662 tracing::error!("failed to parse query params: {e}");
663 StatusCode::BAD_REQUEST
664 })?;
665
666 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
667 tracing::error!("failed to parse actor identifier: {e}");
668 StatusCode::BAD_REQUEST
669 })?;
670
671 let viewer_did = get_auth_did(&req);
672 match app_state.hydrant.repos.resolve(&ident).await {
673 Ok(repo) => {
674 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
675 Ok(profile) => return Ok(Json(profile).into_response()),
676 Err(e) => {
677 tracing::warn!("failed to get profile for {}: {e}", repo.did);
678 }
679 }
680 }
681 Err(e) => {
682 tracing::error!("failed to resolve actor {}: {e}", params.actor);
683 }
684 }
685
686 proxy_request(req).await
687}
688
689async fn get_profile_internal(
690 app_state: &AppState,
691 did_str: &str,
692 viewer_did: Option<&str>,
693) -> Result<serde_json::Value, StatusCode> {
694 let did =
695 jacquard_common::types::string::Did::new(did_str).map_err(|_| StatusCode::BAD_REQUEST)?;
696 let repo = app_state.hydrant.repos.get(&did);
697
698 let info = repo
699 .info()
700 .await
701 .map_err(|_| StatusCode::NOT_FOUND)?
702 .ok_or(StatusCode::NOT_FOUND)?;
703 if !info.tracked {
704 return Err(StatusCode::NOT_FOUND);
705 }
706
707 let profile_record = repo
708 .get_record("app.bsky.actor.profile", "self")
709 .await
710 .ok()
711 .flatten();
712
713 let followers_count = app_state
714 .hydrant
715 .backlinks
716 .count(did_str.to_string())
717 .source("app.bsky.graph.follow")
718 .run()
719 .await
720 .unwrap_or(0);
721 let follows_count = repo
722 .count_records("app.bsky.graph.follow")
723 .await
724 .unwrap_or(0);
725 let posts_count = repo.count_records("app.bsky.feed.post").await.unwrap_or(0);
726
727 let handle = info
728 .handle
729 .map(|h| h.to_string())
730 .unwrap_or_else(|| did_str.to_string());
731
732 let mut viewer_state = serde_json::json!({
733 "muted": false,
734 "blockedBy": false,
735 });
736 if let Some(viewer) = viewer_did {
737 if viewer != did_str {
738 // following: does viewer follow did_str?
739 let following = app_state
740 .hydrant
741 .backlinks
742 .fetch(did_str.to_string())
743 .source("app.bsky.graph.follow")
744 .dids(vec![viewer.to_string()])
745 .limit(1)
746 .run()
747 .await
748 .ok()
749 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string()));
750
751 // followedBy: does did_str follow viewer?
752 let followed_by = app_state
753 .hydrant
754 .backlinks
755 .fetch(viewer.to_string())
756 .source("app.bsky.graph.follow")
757 .dids(vec![did_str.to_string()])
758 .limit(1)
759 .run()
760 .await
761 .ok()
762 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string()));
763
764 if let Some(f) = following {
765 viewer_state["following"] = serde_json::json!(f);
766 }
767 if let Some(fb) = followed_by {
768 viewer_state["followedBy"] = serde_json::json!(fb);
769 }
770
771 // blocking: URI of viewer's block record targeting did_str
772 let blocking = app_state
773 .hydrant
774 .backlinks
775 .fetch(did_str.to_string())
776 .source("app.bsky.graph.block")
777 .dids(vec![viewer.to_string()])
778 .limit(1)
779 .run()
780 .await
781 .ok()
782 .and_then(|p| p.backlinks.first().map(|b| b.uri.to_string()));
783 if let Some(b) = blocking {
784 viewer_state["blocking"] = serde_json::json!(b);
785 }
786
787 // blockedBy: does did_str block viewer?
788 let blocked_by = app_state
789 .hydrant
790 .backlinks
791 .count(viewer.to_string())
792 .source("app.bsky.graph.block")
793 .dids(vec![did_str.to_string()])
794 .run()
795 .await
796 .unwrap_or(0)
797 > 0;
798 if blocked_by {
799 viewer_state["blockedBy"] = serde_json::json!(true);
800 }
801
802 // muted: check local fjall mutes
803 let mute_key = format!("mute:{}:{}", viewer, did_str);
804 if app_state
805 .mutes
806 .get(mute_key.as_bytes())
807 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
808 .is_some()
809 {
810 viewer_state["muted"] = serde_json::json!(true);
811 }
812 }
813 }
814
815 let mut profile = serde_json::json!({
816 "did": did_str,
817 "handle": handle,
818 "followersCount": followers_count,
819 "followsCount": follows_count,
820 "postsCount": posts_count,
821 "indexedAt": chrono::Utc::now().to_rfc3339(),
822 "viewer": viewer_state,
823 "labels": [],
824 "associated": {
825 "chat": { "allowIncoming": "all" }
826 }
827 });
828
829 if let Some(rec) = profile_record {
830 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
831 if let Some(obj) = profile.as_object_mut() {
832 if let Some(display_name) = value.get("displayName") {
833 obj.insert("displayName".to_string(), display_name.clone());
834 }
835 if let Some(description) = value.get("description") {
836 obj.insert("description".to_string(), description.clone());
837 }
838 if let Some(avatar) = value.get("avatar") {
839 let link = avatar
840 .get("ref")
841 .and_then(|v| v.get("$link"))
842 .and_then(|v| v.as_str())
843 .unwrap_or("");
844 obj.insert(
845 "avatar".to_string(),
846 serde_json::json!(app_state.cdn("avatar", did_str, link)),
847 );
848 }
849 if let Some(banner) = value.get("banner") {
850 let link = banner
851 .get("ref")
852 .and_then(|v| v.get("$link"))
853 .and_then(|v| v.as_str())
854 .unwrap_or("");
855 obj.insert(
856 "banner".to_string(),
857 serde_json::json!(app_state.cdn("banner", did_str, link)),
858 );
859 }
860 if let Some(created_at) = value.get("createdAt") {
861 obj.insert("createdAt".to_string(), created_at.clone());
862 }
863 }
864 }
865
866 Ok(profile)
867}
868#[derive(Deserialize)]
869struct GetAuthorFeedParams {
870 actor: String,
871 #[serde(default)]
872 limit: Option<usize>,
873 #[serde(default)]
874 cursor: Option<String>,
875 #[serde(default)]
876 filter: Option<String>,
877 #[serde(default, rename = "includePins")]
878 include_pins: Option<bool>,
879}
880
881async fn get_author_feed(
882 State(app_state): State<AppState>,
883 req: Request,
884) -> Result<Response, StatusCode> {
885 // let hydrant = &app_state.hydrant;
886 let query_str = req.uri().query().unwrap_or("");
887 let params = serde_urlencoded::from_str::<GetAuthorFeedParams>(query_str).map_err(|e| {
888 tracing::error!("failed to parse query params: {e}");
889 StatusCode::BAD_REQUEST
890 })?;
891
892 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
893 tracing::error!("failed to parse actor identifier: {e}");
894 StatusCode::BAD_REQUEST
895 })?;
896
897 let repo = match app_state.hydrant.repos.resolve(&ident).await {
898 Ok(repo) => repo,
899 Err(e) => {
900 tracing::error!("failed to resolve actor {}: {e}", params.actor);
901 return proxy_request(req).await;
902 }
903 };
904 let did = repo.did.clone();
905
906 let limit = params.limit.unwrap_or(50).min(100);
907
908 // Note: Official AppView uses timestamp cursors.
909 // Our local index uses rkeys for list_records.
910 let rkey_cursor = if let Some(c) = ¶ms.cursor {
911 if c.len() < 20 { Some(c.as_str()) } else { None }
912 } else {
913 None
914 };
915
916 // Get posts
917 let posts_list = match repo
918 .list_records("app.bsky.feed.post", limit, false, rkey_cursor)
919 .await
920 {
921 Ok(rl) => rl,
922 Err(e) => {
923 tracing::error!("failed to list posts for {}: {e}", did);
924 return proxy_request(req).await;
925 }
926 };
927
928 // Get reposts
929 let reposts_list = match repo
930 .list_records("app.bsky.feed.repost", limit, false, rkey_cursor)
931 .await
932 {
933 Ok(rl) => rl,
934 Err(e) => {
935 tracing::error!("failed to list reposts for {}: {e}", did);
936 return proxy_request(req).await;
937 }
938 };
939
940 let viewer_did = get_auth_did(&req);
941 let author_profile =
942 match get_profile_internal(&app_state, did.as_str(), viewer_did.as_deref()).await {
943 Ok(p) => p,
944 Err(e) => {
945 tracing::error!("failed to get profile for {}: {e}", did);
946 return proxy_request(req).await;
947 }
948 };
949
950 let filter = params.filter.as_deref().unwrap_or("posts_with_replies");
951
952 let mut all_items = Vec::new();
953 for rec in posts_list.records {
954 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
955 let created_at = val_json
956 .get("createdAt")
957 .and_then(|v| v.as_str())
958 .unwrap_or("")
959 .to_string();
960
961 // Filter logic
962 if filter == "posts_no_replies" && val_json.get("reply").is_some() {
963 continue;
964 }
965
966 if filter == "posts_and_author_threads" {
967 if let Some(reply) = val_json.get("reply") {
968 let root_uri_str = reply
969 .get("root")
970 .and_then(|r| r.get("uri"))
971 .and_then(|u| u.as_str())
972 .unwrap_or("");
973 if let Ok(root_uri) = jacquard_common::types::string::AtUri::new(root_uri_str) {
974 if root_uri.authority().as_str() != did.as_str() {
975 continue;
976 }
977 } else {
978 continue;
979 }
980 }
981 }
982
983 all_items.push((created_at, "post", rec));
984 }
985 for rec in reposts_list.records {
986 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
987 let created_at = val_json
988 .get("createdAt")
989 .and_then(|v| v.as_str())
990 .unwrap_or("")
991 .to_string();
992 all_items.push((created_at, "repost", rec));
993 }
994
995 // Sort by createdAt descending
996 all_items.sort_by(|a, b| b.0.cmp(&a.0));
997 all_items.truncate(limit);
998
999 let mut feed = Vec::new();
1000
1001 // Handle Pins
1002 let mut pinned_uri_opt = None;
1003 if params.include_pins.unwrap_or(false) && params.cursor.is_none() {
1004 if let Ok(Some(profile_rec)) = repo.get_record("app.bsky.actor.profile", "self").await {
1005 let prof_val = serde_json::to_value(profile_rec.value).unwrap_or(serde_json::json!({}));
1006 if let Some(pinned_uri) = prof_val.get("pinnedPost").and_then(|v| v.as_str()) {
1007 if let Ok(post_view) =
1008 get_post_view(&app_state, pinned_uri, viewer_did.as_deref()).await
1009 {
1010 feed.push(serde_json::json!({
1011 "post": post_view,
1012 }));
1013 pinned_uri_opt = Some(pinned_uri.to_string());
1014 }
1015 }
1016 }
1017 }
1018
1019 for (created_at, kind, rec) in &all_items {
1020 if *kind == "post" {
1021 let uri = format!(
1022 "at://{}/app.bsky.feed.post/{}",
1023 did.as_str(),
1024 rec.rkey.as_str()
1025 );
1026
1027 // Skip if it was already included as a pin
1028 if Some(&uri) == pinned_uri_opt.as_ref() {
1029 continue;
1030 }
1031
1032 let post = match get_post_view(&app_state, &uri, viewer_did.as_deref()).await {
1033 Ok(mut p) => {
1034 p["author"] = profile_to_basic(author_profile.clone());
1035 p
1036 }
1037 Err(_) => {
1038 let mut val_json =
1039 serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
1040 if val_json.get("$type").is_none() {
1041 val_json["$type"] = serde_json::json!("app.bsky.feed.post");
1042 }
1043 serde_json::json!({
1044 "$type": "app.bsky.feed.defs#postView",
1045 "uri": uri,
1046 "cid": rec.cid.to_string(),
1047 "author": profile_to_basic(author_profile.clone()),
1048 "record": val_json,
1049 "replyCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.post").run().await.unwrap_or(0),
1050 "repostCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.repost").run().await.unwrap_or(0),
1051 "likeCount": app_state.hydrant.backlinks.count(uri.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0),
1052 "quoteCount": 0,
1053 "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at.clone() },
1054 "viewer": { "muted": false, "blockedBy": false },
1055 "labels": [],
1056 })
1057 }
1058 };
1059
1060 let mut feed_item = serde_json::json!({
1061 "post": post,
1062 });
1063
1064 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
1065 // Add reply context if it's a reply
1066 if let Some(reply) = val_json.get("reply") {
1067 if let Some(parent_uri) = reply
1068 .get("parent")
1069 .and_then(|p| p.get("uri"))
1070 .and_then(|u| u.as_str())
1071 {
1072 if let Ok(parent_post) =
1073 get_post_view(&app_state, parent_uri, viewer_did.as_deref()).await
1074 {
1075 let root_uri = reply
1076 .get("root")
1077 .and_then(|p| p.get("uri"))
1078 .and_then(|u| u.as_str())
1079 .unwrap_or(parent_uri);
1080 if let Ok(root_post) =
1081 get_post_view(&app_state, root_uri, viewer_did.as_deref()).await
1082 {
1083 feed_item["reply"] = serde_json::json!({
1084 "root": root_post,
1085 "parent": parent_post,
1086 });
1087 }
1088 }
1089 }
1090 }
1091
1092 feed.push(feed_item);
1093 } else if *kind == "repost" {
1094 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
1095 if let Some(subject_uri) = val_json
1096 .get("subject")
1097 .and_then(|s| s.get("uri"))
1098 .and_then(|u| u.as_str())
1099 {
1100 if let Ok(post_view) =
1101 get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await
1102 {
1103 feed.push(serde_json::json!({
1104 "post": post_view,
1105 "reason": {
1106 "$type": "app.bsky.feed.defs#reasonRepost",
1107 "by": profile_to_basic(author_profile.clone()),
1108 "indexedAt": created_at,
1109 }
1110 }));
1111 }
1112 }
1113 }
1114 }
1115
1116 let next_cursor = all_items.last().map(|i| i.2.rkey.as_str().to_string());
1117
1118 feed.reverse();
1119
1120 Ok(Json(serde_json::json!({
1121 "feed": feed,
1122 "cursor": next_cursor
1123 }))
1124 .into_response())
1125}
1126
1127#[derive(Deserialize)]
1128struct GetLikesParams {
1129 uri: String,
1130 #[serde(default)]
1131 limit: Option<usize>,
1132 #[serde(default)]
1133 cursor: Option<String>,
1134}
1135
1136async fn get_likes(
1137 State(app_state): State<AppState>,
1138 req: Request,
1139) -> Result<Response, StatusCode> {
1140 // let hydrant = &app_state.hydrant;
1141 let query_str = req.uri().query().unwrap_or("");
1142 let params = serde_urlencoded::from_str::<GetLikesParams>(query_str).map_err(|e| {
1143 tracing::error!("failed to parse query params: {e}");
1144 StatusCode::BAD_REQUEST
1145 })?;
1146
1147 let limit = params.limit.unwrap_or(50).min(100);
1148
1149 let mut fetch = app_state
1150 .hydrant
1151 .backlinks
1152 .fetch(params.uri.clone())
1153 .source("app.bsky.feed.like")
1154 .limit(limit);
1155 if let Some(cursor_str) = params.cursor {
1156 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) {
1157 Ok(c) => {
1158 fetch = fetch.cursor(c);
1159 }
1160 Err(e) => {
1161 tracing::warn!("failed to decode cursor {cursor_str}: {e}");
1162 }
1163 }
1164 }
1165
1166 let backlinks_page = match fetch.run().await {
1167 Ok(bp) => bp,
1168 Err(e) => {
1169 tracing::error!("failed to fetch backlinks for {}: {e}", params.uri);
1170 return proxy_request(req).await;
1171 }
1172 };
1173
1174 let mut likes = Vec::new();
1175 let viewer_did = get_auth_did(&req);
1176 for bl in backlinks_page.backlinks {
1177 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) {
1178 Ok(uri) => uri,
1179 Err(e) => {
1180 tracing::warn!("failed to parse uri {}: {e}", bl.uri);
1181 continue;
1182 }
1183 };
1184 let author_ident = uri.authority();
1185
1186 let profile =
1187 match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref())
1188 .await
1189 {
1190 Ok(p) => p,
1191 Err(e) => {
1192 tracing::warn!("failed to get profile for {author_ident}: {e}");
1193 continue;
1194 }
1195 };
1196 let repo = match app_state.hydrant.repos.resolve(author_ident).await {
1197 Ok(repo) => repo,
1198 Err(e) => {
1199 tracing::warn!("failed to resolve actor {author_ident}: {e}");
1200 continue;
1201 }
1202 };
1203 let collection = uri.collection().unwrap().as_str();
1204 let rkey = uri.rkey().unwrap().0.as_str();
1205
1206 let record = match repo.get_record(collection, rkey).await {
1207 Ok(Some(record)) => record,
1208 Ok(None) => {
1209 tracing::warn!("record not found: {collection}/{rkey}");
1210 continue;
1211 }
1212 Err(e) => {
1213 tracing::warn!("failed to get record {collection}/{rkey}: {e}");
1214 continue;
1215 }
1216 };
1217 let value = serde_json::to_value(record.value).unwrap_or(serde_json::json!({}));
1218 let created_at = value
1219 .get("createdAt")
1220 .and_then(|v| v.as_str())
1221 .unwrap_or("")
1222 .to_string();
1223
1224 likes.push(serde_json::json!({
1225 "actor": profile,
1226 "createdAt": created_at,
1227 "indexedAt": chrono::Utc::now().to_rfc3339(),
1228 }));
1229 }
1230
1231 let cursor = backlinks_page
1232 .next_cursor
1233 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c));
1234
1235 Ok(Json(serde_json::json!({
1236 "uri": params.uri,
1237 "likes": likes,
1238 "cursor": cursor,
1239 }))
1240 .into_response())
1241}
1242
1243async fn get_reposted_by(
1244 State(app_state): State<AppState>,
1245 req: Request,
1246) -> Result<Response, StatusCode> {
1247 // let hydrant = &app_state.hydrant;
1248 let query_str = req.uri().query().unwrap_or("");
1249 let params = serde_urlencoded::from_str::<GetLikesParams>(query_str).map_err(|e| {
1250 tracing::error!("failed to parse query params: {e}");
1251 StatusCode::BAD_REQUEST
1252 })?;
1253
1254 let limit = params.limit.unwrap_or(50).min(100);
1255
1256 let mut fetch = app_state
1257 .hydrant
1258 .backlinks
1259 .fetch(params.uri.clone())
1260 .source("app.bsky.feed.repost")
1261 .limit(limit);
1262 if let Some(cursor_str) = params.cursor {
1263 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) {
1264 Ok(c) => {
1265 fetch = fetch.cursor(c);
1266 }
1267 Err(e) => {
1268 tracing::warn!("failed to decode cursor {cursor_str}: {e}");
1269 }
1270 }
1271 }
1272
1273 let backlinks_page = match fetch.run().await {
1274 Ok(bp) => bp,
1275 Err(e) => {
1276 tracing::error!("failed to fetch backlinks for {}: {e}", params.uri);
1277 return proxy_request(req).await;
1278 }
1279 };
1280
1281 let mut reposted_by = Vec::new();
1282 let viewer_did = get_auth_did(&req);
1283 for bl in backlinks_page.backlinks {
1284 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) {
1285 Ok(uri) => uri,
1286 Err(e) => {
1287 tracing::warn!("failed to parse uri {}: {e}", bl.uri);
1288 continue;
1289 }
1290 };
1291 let author_ident = uri.authority();
1292 let profile =
1293 match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref())
1294 .await
1295 {
1296 Ok(p) => p,
1297 Err(e) => {
1298 tracing::warn!("failed to get profile for {author_ident}: {e}");
1299 continue;
1300 }
1301 };
1302 reposted_by.push(profile);
1303 }
1304
1305 let cursor = backlinks_page
1306 .next_cursor
1307 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c));
1308
1309 Ok(Json(serde_json::json!({
1310 "uri": params.uri,
1311 "repostedBy": reposted_by,
1312 "cursor": cursor,
1313 }))
1314 .into_response())
1315}
1316
1317fn extract_query_array(query: &str, key: &str) -> Vec<String> {
1318 let mut res = Vec::new();
1319 let prefix1 = format!("{}=", key);
1320 let prefix2 = format!("{}[]=", key);
1321 for part in query.split('&') {
1322 let val = part
1323 .strip_prefix(&prefix1)
1324 .or_else(|| part.strip_prefix(&prefix2));
1325 if let Some(v) = val {
1326 if let Ok(decoded) = urlencoding::decode(v) {
1327 res.push(decoded.into_owned());
1328 }
1329 }
1330 }
1331 res
1332}
1333
1334async fn get_profiles(
1335 State(app_state): State<AppState>,
1336 req: Request,
1337) -> Result<Response, StatusCode> {
1338 // let hydrant = &app_state.hydrant;
1339 let query_str = req.uri().query().unwrap_or("");
1340 let actors = extract_query_array(query_str, "actors");
1341 if actors.is_empty() {
1342 return proxy_request(req).await;
1343 }
1344
1345 let viewer_did = get_auth_did(&req);
1346 let mut profiles = Vec::new();
1347 for actor in actors {
1348 let ident = match AtIdentifier::new(&actor) {
1349 Ok(ident) => ident,
1350 Err(e) => {
1351 tracing::warn!("failed to parse actor identifier {actor}: {e}");
1352 continue;
1353 }
1354 };
1355 match app_state.hydrant.repos.resolve(&ident).await {
1356 Ok(repo) => {
1357 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref())
1358 .await
1359 {
1360 Ok(profile) => profiles.push(profile),
1361 Err(e) => tracing::warn!("failed to get profile for {}: {e}", repo.did),
1362 }
1363 }
1364 Err(e) => tracing::warn!("failed to resolve actor {actor}: {e}"),
1365 }
1366 }
1367
1368 Ok(Json(serde_json::json!({ "profiles": profiles })).into_response())
1369}
1370
1371async fn get_posts(
1372 State(app_state): State<AppState>,
1373 req: Request,
1374) -> Result<Response, StatusCode> {
1375 // let hydrant = &app_state.hydrant;
1376 let query_str = req.uri().query().unwrap_or("");
1377 let uris = extract_query_array(query_str, "uris");
1378 if uris.is_empty() {
1379 return proxy_request(req).await;
1380 }
1381
1382 let viewer_did = get_auth_did(&req);
1383 let mut posts = Vec::new();
1384 for uri in uris {
1385 match get_post_view(&app_state, &uri, viewer_did.as_deref()).await {
1386 Ok(post) => posts.push(post),
1387 Err(e) => tracing::warn!("failed to get post view for {uri}: {e}"),
1388 }
1389 }
1390
1391 Ok(Json(serde_json::json!({ "posts": posts })).into_response())
1392}
1393
1394#[derive(Deserialize)]
1395struct GetFollowsParams {
1396 actor: String,
1397 #[serde(default)]
1398 limit: Option<usize>,
1399 #[serde(default)]
1400 cursor: Option<String>,
1401}
1402
1403async fn get_follows(
1404 State(app_state): State<AppState>,
1405 req: Request,
1406) -> Result<Response, StatusCode> {
1407 // let hydrant = &app_state.hydrant;
1408 let query_str = req.uri().query().unwrap_or("");
1409 let params: GetFollowsParams = serde_urlencoded::from_str(query_str).map_err(|e| {
1410 tracing::error!("failed to parse query params: {e}");
1411 StatusCode::BAD_REQUEST
1412 })?;
1413
1414 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
1415 tracing::error!("failed to parse actor identifier: {e}");
1416 StatusCode::BAD_REQUEST
1417 })?;
1418 let repo = match app_state.hydrant.repos.resolve(&ident).await {
1419 Ok(repo) => repo,
1420 Err(e) => {
1421 tracing::error!("failed to resolve actor {}: {e}", params.actor);
1422 return proxy_request(req).await;
1423 }
1424 };
1425
1426 let limit = params.limit.unwrap_or(50).min(100);
1427 let record_list = match repo
1428 .list_records(
1429 "app.bsky.graph.follow",
1430 limit,
1431 true,
1432 params.cursor.as_deref(),
1433 )
1434 .await
1435 {
1436 Ok(rl) => rl,
1437 Err(e) => {
1438 tracing::error!("failed to list follows for {}: {e}", repo.did);
1439 return proxy_request(req).await;
1440 }
1441 };
1442
1443 let mut follows = Vec::new();
1444 let viewer_did = get_auth_did(&req);
1445 for rec in record_list.records {
1446 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
1447 if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) {
1448 match get_profile_internal(&app_state, subject_did, viewer_did.as_deref()).await {
1449 Ok(profile) => follows.push(profile),
1450 Err(e) => tracing::warn!("failed to get profile for {subject_did}: {e}"),
1451 }
1452 }
1453 }
1454
1455 let subject_profile =
1456 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
1457 Ok(p) => p,
1458 Err(e) => {
1459 tracing::error!("failed to get profile for {}: {e}", repo.did);
1460 return proxy_request(req).await;
1461 }
1462 };
1463
1464 Ok(Json(serde_json::json!({
1465 "subject": subject_profile,
1466 "follows": follows,
1467 "cursor": record_list.cursor.map(|c| c.to_string())
1468 }))
1469 .into_response())
1470}
1471
1472async fn get_followers(
1473 State(app_state): State<AppState>,
1474 req: Request,
1475) -> Result<Response, StatusCode> {
1476 // let hydrant = &app_state.hydrant;
1477 let query_str = req.uri().query().unwrap_or("");
1478 let params: GetFollowsParams = serde_urlencoded::from_str(query_str).map_err(|e| {
1479 tracing::error!("failed to parse query params: {e}");
1480 StatusCode::BAD_REQUEST
1481 })?;
1482
1483 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
1484 tracing::error!("failed to parse actor identifier: {e}");
1485 StatusCode::BAD_REQUEST
1486 })?;
1487 let repo = match app_state.hydrant.repos.resolve(&ident).await {
1488 Ok(repo) => repo,
1489 Err(e) => {
1490 tracing::error!("failed to resolve actor {}: {e}", params.actor);
1491 return proxy_request(req).await;
1492 }
1493 };
1494
1495 let limit = params.limit.unwrap_or(50).min(100);
1496 let mut fetch = app_state
1497 .hydrant
1498 .backlinks
1499 .fetch(repo.did.as_str().to_string())
1500 .source("app.bsky.graph.follow")
1501 .limit(limit);
1502 if let Some(cursor_str) = params.cursor {
1503 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) {
1504 Ok(c) => fetch = fetch.cursor(c),
1505 Err(e) => tracing::warn!("failed to decode cursor {cursor_str}: {e}"),
1506 }
1507 }
1508
1509 let backlinks_page = match fetch.run().await {
1510 Ok(bp) => bp,
1511 Err(e) => {
1512 tracing::error!("failed to fetch backlinks for {}: {e}", repo.did);
1513 return proxy_request(req).await;
1514 }
1515 };
1516
1517 let mut followers = Vec::new();
1518 let viewer_did = get_auth_did(&req);
1519 for bl in backlinks_page.backlinks {
1520 match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) {
1521 Ok(uri) => {
1522 let author_ident = uri.authority();
1523 match get_profile_internal(&app_state, author_ident.as_str(), viewer_did.as_deref())
1524 .await
1525 {
1526 Ok(profile) => followers.push(profile),
1527 Err(e) => tracing::warn!("failed to get profile for {author_ident}: {e}"),
1528 }
1529 }
1530 Err(e) => tracing::warn!("failed to parse uri {}: {e}", bl.uri),
1531 }
1532 }
1533
1534 let subject_profile =
1535 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
1536 Ok(p) => p,
1537 Err(e) => {
1538 tracing::error!("failed to get profile for {}: {e}", repo.did);
1539 return proxy_request(req).await;
1540 }
1541 };
1542
1543 let cursor = backlinks_page
1544 .next_cursor
1545 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c));
1546
1547 Ok(Json(serde_json::json!({
1548 "subject": subject_profile,
1549 "followers": followers,
1550 "cursor": cursor,
1551 }))
1552 .into_response())
1553}
1554
1555async fn get_age_assurance_state() -> Result<Response, StatusCode> {
1556 Ok(Json(serde_json::json!({
1557 "status": "assured"
1558 }))
1559 .into_response())
1560}
1561
1562fn get_auth_did(req: &Request) -> Option<String> {
1563 let auth = req.headers().get("authorization")?.to_str().ok()?;
1564 let token = auth.strip_prefix("Bearer ")?;
1565 let parts: Vec<&str> = token.split('.').collect();
1566 if parts.len() != 3 {
1567 return None;
1568 }
1569 use base64::Engine;
1570 let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD
1571 .decode(parts[1])
1572 .ok()?;
1573 let json: serde_json::Value = serde_json::from_slice(&payload).ok()?;
1574 json.get("sub")
1575 .and_then(|s| s.as_str())
1576 .map(|s| s.to_string())
1577}
1578
1579#[derive(Deserialize)]
1580struct CreateBookmarkReq {
1581 uri: String,
1582 cid: String,
1583}
1584
1585async fn create_bookmark(
1586 State(app_state): State<AppState>,
1587 req: Request,
1588) -> Result<Response, StatusCode> {
1589 let Some(did) = get_auth_did(&req) else {
1590 return Err(StatusCode::UNAUTHORIZED);
1591 };
1592
1593 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
1594 .await
1595 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1596 let payload: CreateBookmarkReq = serde_json::from_slice(&body_bytes).map_err(|e| {
1597 tracing::error!("failed to parse create bookmark request: {e}");
1598 StatusCode::BAD_REQUEST
1599 })?;
1600
1601 let key = format!("bookmark:{}:{}", did, payload.uri);
1602 app_state
1603 .bookmarks
1604 .insert(key.as_bytes(), payload.cid.as_bytes())
1605 .map_err(|e| {
1606 tracing::error!("failed to insert bookmark: {e}");
1607 StatusCode::INTERNAL_SERVER_ERROR
1608 })?;
1609
1610 Ok(Json(serde_json::json!({})).into_response())
1611}
1612
1613#[derive(Deserialize)]
1614struct DeleteBookmarkReq {
1615 uri: String,
1616}
1617
1618async fn delete_bookmark(
1619 State(app_state): State<AppState>,
1620 req: Request,
1621) -> Result<Response, StatusCode> {
1622 let Some(did) = get_auth_did(&req) else {
1623 return Err(StatusCode::UNAUTHORIZED);
1624 };
1625
1626 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
1627 .await
1628 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1629 let payload: DeleteBookmarkReq = serde_json::from_slice(&body_bytes).map_err(|e| {
1630 tracing::error!("failed to parse delete bookmark request: {e}");
1631 StatusCode::BAD_REQUEST
1632 })?;
1633
1634 let key = format!("bookmark:{}:{}", did, payload.uri);
1635 app_state.bookmarks.remove(key.as_bytes()).map_err(|e| {
1636 tracing::error!("failed to remove bookmark: {e}");
1637 StatusCode::INTERNAL_SERVER_ERROR
1638 })?;
1639
1640 Ok(Json(serde_json::json!({})).into_response())
1641}
1642
1643#[derive(Deserialize)]
1644struct GetBookmarksParams {
1645 #[serde(default)]
1646 limit: Option<usize>,
1647 #[serde(default)]
1648 cursor: Option<String>,
1649}
1650
1651async fn get_bookmarks(
1652 State(app_state): State<AppState>,
1653 req: Request,
1654) -> Result<Response, StatusCode> {
1655 // let hydrant = &app_state.hydrant;
1656 let Some(did) = get_auth_did(&req) else {
1657 return Err(StatusCode::UNAUTHORIZED);
1658 };
1659
1660 let query_str = req.uri().query().unwrap_or("");
1661 let params =
1662 serde_urlencoded::from_str::<GetBookmarksParams>(query_str).unwrap_or(GetBookmarksParams {
1663 limit: None,
1664 cursor: None,
1665 });
1666 let limit = params.limit.unwrap_or(50).min(100);
1667
1668 let prefix = format!("bookmark:{}:", did);
1669
1670 let iter: Box<dyn Iterator<Item = _>> = if let Some(cursor) = params.cursor {
1671 Box::new(app_state.bookmarks.range::<&[u8], _>((
1672 std::ops::Bound::Included(cursor.as_bytes()),
1673 std::ops::Bound::Unbounded,
1674 )))
1675 } else {
1676 Box::new(app_state.bookmarks.prefix(prefix.as_bytes()))
1677 };
1678
1679 let mut fetched_items = Vec::new();
1680 let mut next_cursor = None;
1681 for item in iter {
1682 let (key, cid_bytes) = match item.into_inner() {
1683 Ok(inner) => inner,
1684 Err(e) => {
1685 tracing::warn!("failed to get item from bookmarks iterator: {e}");
1686 continue;
1687 }
1688 };
1689 if !key.starts_with(prefix.as_bytes()) {
1690 break;
1691 }
1692 fetched_items.push((key.to_vec(), cid_bytes.to_vec()));
1693 if fetched_items.len() >= limit {
1694 next_cursor = Some(String::from_utf8_lossy(&key).to_string());
1695 break;
1696 }
1697 }
1698
1699 let mut bookmarks = Vec::new();
1700 for (key, cid_bytes) in fetched_items {
1701 let key_str = String::from_utf8_lossy(&key);
1702 let uri_str = key_str.strip_prefix(&prefix).unwrap_or(&key_str);
1703 let cid_str = String::from_utf8_lossy(&cid_bytes);
1704
1705 match jacquard_common::types::string::AtUri::new(uri_str) {
1706 Ok(uri) => {
1707 let author_ident = uri.authority();
1708 let collection = uri.collection().unwrap().as_str();
1709 let rkey = uri.rkey().unwrap().0.as_str();
1710
1711 match app_state.hydrant.repos.resolve(author_ident).await {
1712 Ok(repo) => match repo.get_record(collection, rkey).await {
1713 Ok(Some(record)) => {
1714 match get_profile_internal(&app_state, repo.did.as_str(), None).await {
1715 Ok(author_profile) => {
1716 bookmarks.push(serde_json::json!({
1717 "uri": uri_str,
1718 "cid": cid_str.to_string(),
1719 "author": author_profile,
1720 "record": record.value,
1721 "replyCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.post").run().await.unwrap_or(0),
1722 "repostCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.repost").run().await.unwrap_or(0),
1723 "likeCount": app_state.hydrant.backlinks.count(uri_str.to_string()).source("app.bsky.feed.like").run().await.unwrap_or(0),
1724 "indexedAt": chrono::Utc::now().to_rfc3339(),
1725 }));
1726 }
1727 Err(e) => {
1728 tracing::warn!("failed to get profile for {}: {e}", repo.did)
1729 }
1730 }
1731 }
1732 Ok(None) => tracing::warn!("bookmark record not found: {uri_str}"),
1733 Err(e) => tracing::warn!("failed to get bookmark record {uri_str}: {e}"),
1734 },
1735 Err(e) => tracing::warn!("failed to resolve actor {author_ident}: {e}"),
1736 }
1737 }
1738 Err(e) => tracing::warn!("failed to parse bookmark uri {uri_str}: {e}"),
1739 }
1740
1741 if bookmarks.len() >= limit {
1742 next_cursor = Some(String::from_utf8_lossy(&key).to_string());
1743 break;
1744 }
1745 }
1746
1747 Ok(Json(serde_json::json!({
1748 "bookmarks": bookmarks,
1749 "cursor": next_cursor,
1750 }))
1751 .into_response())
1752}
1753
1754#[derive(Deserialize)]
1755struct CreateDraftReq {
1756 draft: serde_json::Value,
1757}
1758
1759async fn create_draft(
1760 State(app_state): State<AppState>,
1761 req: Request,
1762) -> Result<Response, StatusCode> {
1763 let Some(did) = get_auth_did(&req) else {
1764 return Err(StatusCode::UNAUTHORIZED);
1765 };
1766
1767 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
1768 .await
1769 .map_err(|e| {
1770 tracing::error!("failed to read request body: {e}");
1771 StatusCode::INTERNAL_SERVER_ERROR
1772 })?;
1773 let payload = serde_json::from_slice::<CreateDraftReq>(&body_bytes).map_err(|e| {
1774 tracing::error!("failed to deserialize create_draft request: {e}");
1775 StatusCode::BAD_REQUEST
1776 })?;
1777
1778 let tid = jacquard_common::types::tid::Tid::now(1.try_into().unwrap()).to_string();
1779 let key = format!("draft:{}:{}", did, tid);
1780 let draft_obj = payload.draft.clone();
1781
1782 let now = chrono::Utc::now().to_rfc3339();
1783 let store_obj = serde_json::json!({
1784 "id": tid,
1785 "draft": draft_obj,
1786 "createdAt": now,
1787 "updatedAt": now,
1788 });
1789
1790 let val = serde_json::to_vec(&store_obj).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1791 app_state
1792 .drafts
1793 .insert(key.as_bytes(), val)
1794 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
1795
1796 Ok(Json(serde_json::json!({ "id": tid })).into_response())
1797}
1798
1799#[derive(Deserialize)]
1800struct UpdateDraftReq {
1801 draft: DraftWithId,
1802}
1803
1804#[derive(Deserialize)]
1805struct DraftWithId {
1806 id: String,
1807 draft: serde_json::Value,
1808}
1809
1810async fn update_draft(
1811 State(app_state): State<AppState>,
1812 req: Request,
1813) -> Result<Response, StatusCode> {
1814 let Some(did) = get_auth_did(&req) else {
1815 return Err(StatusCode::UNAUTHORIZED);
1816 };
1817
1818 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
1819 .await
1820 .map_err(|e| {
1821 tracing::error!("failed to read request body: {e}");
1822 StatusCode::INTERNAL_SERVER_ERROR
1823 })?;
1824 let payload = serde_json::from_slice::<UpdateDraftReq>(&body_bytes).map_err(|e| {
1825 tracing::error!("failed to deserialize update_draft request: {e}");
1826 StatusCode::BAD_REQUEST
1827 })?;
1828
1829 let tid = payload.draft.id;
1830 let key = format!("draft:{}:{}", did, tid);
1831
1832 let existing = app_state.drafts.get(key.as_bytes()).map_err(|e| {
1833 tracing::error!("failed to get draft from db: {e}");
1834 StatusCode::INTERNAL_SERVER_ERROR
1835 })?;
1836 if let Some(existing_bytes) = existing.as_deref() {
1837 match serde_json::from_slice::<serde_json::Value>(&existing_bytes) {
1838 Ok(mut existing_obj) => {
1839 let now = chrono::Utc::now().to_rfc3339();
1840 existing_obj["draft"] = payload.draft.draft;
1841 existing_obj["updatedAt"] = serde_json::json!(now);
1842
1843 let val = serde_json::to_vec(&existing_obj).map_err(|e| {
1844 tracing::error!("failed to serialize updated draft: {e}");
1845 StatusCode::INTERNAL_SERVER_ERROR
1846 })?;
1847 app_state.drafts.insert(key.as_bytes(), val).map_err(|e| {
1848 tracing::error!("failed to insert updated draft to db: {e}");
1849 StatusCode::INTERNAL_SERVER_ERROR
1850 })?;
1851 }
1852 Err(e) => {
1853 tracing::error!("failed to parse existing draft from db: {e}");
1854 }
1855 }
1856 }
1857
1858 Ok(Json(serde_json::json!({})).into_response())
1859}
1860
1861#[derive(Deserialize)]
1862struct DeleteDraftReq {
1863 id: String,
1864}
1865
1866async fn delete_draft(
1867 State(app_state): State<AppState>,
1868 req: Request,
1869) -> Result<Response, StatusCode> {
1870 let Some(did) = get_auth_did(&req) else {
1871 return Err(StatusCode::UNAUTHORIZED);
1872 };
1873
1874 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
1875 .await
1876 .map_err(|e| {
1877 tracing::error!("failed to read request body: {e}");
1878 StatusCode::INTERNAL_SERVER_ERROR
1879 })?;
1880 let payload = serde_json::from_slice::<DeleteDraftReq>(&body_bytes).map_err(|e| {
1881 tracing::error!("failed to deserialize delete_draft request: {e}");
1882 StatusCode::BAD_REQUEST
1883 })?;
1884
1885 let key = format!("draft:{}:{}", did, payload.id);
1886 app_state.drafts.remove(key.as_bytes()).map_err(|e| {
1887 tracing::error!("failed to remove draft from db: {e}");
1888 StatusCode::INTERNAL_SERVER_ERROR
1889 })?;
1890
1891 Ok(Json(serde_json::json!({})).into_response())
1892}
1893
1894#[derive(Deserialize)]
1895struct GetDraftsParams {
1896 #[serde(default)]
1897 limit: Option<usize>,
1898 #[serde(default)]
1899 cursor: Option<String>,
1900}
1901
1902async fn get_drafts(
1903 State(app_state): State<AppState>,
1904 req: Request,
1905) -> Result<Response, StatusCode> {
1906 let Some(did) = get_auth_did(&req) else {
1907 return Err(StatusCode::UNAUTHORIZED);
1908 };
1909
1910 let query_str = req.uri().query().unwrap_or("");
1911 let params =
1912 serde_urlencoded::from_str::<GetDraftsParams>(query_str).unwrap_or(GetDraftsParams {
1913 limit: None,
1914 cursor: None,
1915 });
1916 let limit = params.limit.unwrap_or(50).min(100);
1917
1918 let prefix = format!("draft:{}:", did);
1919
1920 let iter: Box<dyn Iterator<Item = _>> = if let Some(cursor) = params.cursor {
1921 Box::new(app_state.drafts.range::<&[u8], _>((
1922 std::ops::Bound::Included(cursor.as_bytes()),
1923 std::ops::Bound::Unbounded,
1924 )))
1925 } else {
1926 Box::new(app_state.drafts.prefix(prefix.as_bytes()))
1927 };
1928
1929 let mut drafts = Vec::new();
1930 let mut next_cursor = None;
1931
1932 for item in iter {
1933 let (key, val_bytes) = match item.into_inner() {
1934 Ok(v) => v,
1935 Err(e) => {
1936 tracing::error!("failed to get item from drafts iterator: {e}");
1937 continue;
1938 }
1939 };
1940 if !key.starts_with(prefix.as_bytes()) {
1941 break;
1942 }
1943
1944 match serde_json::from_slice::<serde_json::Value>(&val_bytes) {
1945 Ok(obj) => drafts.push(obj),
1946 Err(e) => tracing::error!("failed to parse draft from db: {e}"),
1947 }
1948
1949 if drafts.len() >= limit {
1950 next_cursor = Some(String::from_utf8_lossy(&key).to_string());
1951 break;
1952 }
1953 }
1954
1955 Ok(Json(serde_json::json!({
1956 "drafts": drafts,
1957 "cursor": next_cursor,
1958 }))
1959 .into_response())
1960}
1961
1962#[derive(Deserialize)]
1963struct GetTimelineParams {
1964 #[serde(default)]
1965 algorithm: Option<String>,
1966 #[serde(default)]
1967 limit: Option<usize>,
1968 #[serde(default)]
1969 cursor: Option<String>,
1970}
1971
1972async fn get_timeline(
1973 State(app_state): State<AppState>,
1974 req: Request,
1975) -> Result<Response, StatusCode> {
1976 // let hydrant = &app_state.hydrant;
1977 let query_str = req.uri().query().unwrap_or("");
1978 let params = serde_urlencoded::from_str::<GetTimelineParams>(query_str).map_err(|e| {
1979 tracing::error!("failed to parse get_timeline query params: {e}");
1980 StatusCode::BAD_REQUEST
1981 })?;
1982
1983 let Some(did_str) = get_auth_did(&req) else {
1984 return Err(StatusCode::UNAUTHORIZED);
1985 };
1986
1987 let ident = AtIdentifier::new(&did_str).map_err(|e| {
1988 tracing::error!("failed to create AtIdentifier from did_str {did_str}: {e}");
1989 StatusCode::BAD_REQUEST
1990 })?;
1991 let repo = match app_state.hydrant.repos.resolve(&ident).await {
1992 Ok(r) => r,
1993 Err(e) => {
1994 tracing::error!("failed to resolve repo for {ident}: {e}");
1995 return proxy_request(req).await;
1996 }
1997 };
1998
1999 // Get all follows of auth user
2000 let mut follows = std::collections::HashSet::new();
2001 follows.insert(repo.did.clone()); // Include self
2002
2003 let mut cursor = None;
2004 loop {
2005 let record_list = match repo
2006 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref())
2007 .await
2008 {
2009 Ok(rl) => rl,
2010 Err(e) => {
2011 tracing::error!("failed to list follows for {}: {e}", repo.did);
2012 break;
2013 }
2014 };
2015 for rec in &record_list.records {
2016 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2017 if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) {
2018 match jacquard_common::types::string::Did::new(subject_did) {
2019 Ok(did) => {
2020 follows.insert(did.into_static());
2021 }
2022 Err(e) => {
2023 tracing::error!("failed to parse subject DID {subject_did}: {e}");
2024 }
2025 }
2026 }
2027 }
2028 if record_list.cursor.is_none() {
2029 break;
2030 }
2031 cursor = record_list.cursor.map(|c| c.to_string());
2032 }
2033
2034 let limit = params.limit.unwrap_or(50).min(100);
2035
2036 let mut all_items = Vec::new();
2037
2038 for did in follows {
2039 match app_state.hydrant.repos.get(&did).info().await {
2040 Ok(Some(info)) => {
2041 if !info.tracked {
2042 continue;
2043 }
2044 }
2045 Ok(None) => continue,
2046 Err(e) => {
2047 tracing::error!("failed to get info for followed repo {did}: {e}");
2048 continue;
2049 }
2050 }
2051
2052 let followed_repo = app_state.hydrant.repos.get(&did);
2053
2054 // get posts
2055 match followed_repo
2056 .list_records("app.bsky.feed.post", limit, true, None)
2057 .await
2058 {
2059 Ok(record_list) => {
2060 for rec in record_list.records {
2061 let rkey = rec.rkey.as_str().to_string();
2062 let val_json =
2063 serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2064 all_items.push((
2065 did.clone(),
2066 "app.bsky.feed.post".to_string(),
2067 rkey,
2068 val_json,
2069 ));
2070 }
2071 }
2072 Err(e) => {
2073 tracing::warn!("failed to list posts for {did}: {e}");
2074 }
2075 }
2076
2077 // get reposts
2078 match followed_repo
2079 .list_records("app.bsky.feed.repost", limit, true, None)
2080 .await
2081 {
2082 Ok(record_list) => {
2083 for rec in record_list.records {
2084 let rkey = rec.rkey.as_str().to_string();
2085 let val_json =
2086 serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2087 all_items.push((
2088 did.clone(),
2089 "app.bsky.feed.repost".to_string(),
2090 rkey,
2091 val_json,
2092 ));
2093 }
2094 }
2095 Err(e) => {
2096 tracing::warn!("failed to list reposts for {did}: {e}");
2097 }
2098 }
2099 }
2100
2101 // Sort by createdAt descending
2102 all_items.sort_by(|a, b| {
2103 let ca = a.3.get("createdAt").and_then(|v| v.as_str()).unwrap_or("");
2104 let cb = b.3.get("createdAt").and_then(|v| v.as_str()).unwrap_or("");
2105 cb.cmp(ca)
2106 });
2107
2108 if let Some(c) = params.cursor {
2109 all_items.retain(|item| {
2110 let item_ca = item
2111 .3
2112 .get("createdAt")
2113 .and_then(|v| v.as_str())
2114 .unwrap_or("");
2115 item_ca < c.as_str()
2116 });
2117 }
2118
2119 all_items.truncate(limit);
2120
2121 let mut feed = Vec::new();
2122 let mut next_cursor = None;
2123 let viewer_did = get_auth_did(&req);
2124 for (did, col, rkey, value) in all_items {
2125 let created_at = value
2126 .get("createdAt")
2127 .and_then(|v| v.as_str())
2128 .unwrap_or("")
2129 .to_string();
2130 next_cursor = Some(created_at.clone());
2131 let uri = format!("at://{}/{}/{}", did.as_str(), col, rkey);
2132
2133 if col == "app.bsky.feed.post" {
2134 match get_post_view(&app_state, &uri, viewer_did.as_deref()).await {
2135 Ok(mut post_view) => {
2136 post_view["author"] = profile_to_basic(post_view["author"].clone());
2137 feed.push(serde_json::json!({ "post": post_view }));
2138 }
2139 Err(e) => tracing::warn!("failed to get post view for {uri}: {e}"),
2140 }
2141 } else if col == "app.bsky.feed.repost" {
2142 if let Some(subject_uri) = value
2143 .get("subject")
2144 .and_then(|s| s.get("uri"))
2145 .and_then(|u| u.as_str())
2146 {
2147 match get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await {
2148 Ok(mut post_view) => {
2149 post_view["author"] = profile_to_basic(post_view["author"].clone());
2150 match get_profile_internal(&app_state, did.as_str(), viewer_did.as_deref())
2151 .await
2152 {
2153 Ok(reposter_profile) => {
2154 feed.push(serde_json::json!({
2155 "post": post_view,
2156 "reason": {
2157 "$type": "app.bsky.feed.defs#reasonRepost",
2158 "by": profile_to_basic(reposter_profile),
2159 "indexedAt": if created_at.is_empty() { chrono::Utc::now().to_rfc3339() } else { created_at },
2160 }
2161 }));
2162 }
2163 Err(e) => {
2164 tracing::warn!("failed to get profile for reposter {did}: {e}");
2165 }
2166 }
2167 }
2168 Err(e) => {
2169 tracing::warn!(
2170 "failed to get post view for reposted post {subject_uri}: {e}"
2171 );
2172 }
2173 }
2174 }
2175 }
2176 }
2177
2178 feed.reverse();
2179
2180 Ok(Json(serde_json::json!({
2181 "feed": feed,
2182 "cursor": next_cursor
2183 }))
2184 .into_response())
2185}
2186
2187#[derive(Deserialize)]
2188struct GetQuotesParams {
2189 uri: String,
2190 #[serde(default)]
2191 cid: Option<String>,
2192 #[serde(default)]
2193 limit: Option<usize>,
2194 #[serde(default)]
2195 cursor: Option<String>,
2196}
2197
2198async fn get_quotes(
2199 State(app_state): State<AppState>,
2200 req: Request,
2201) -> Result<Response, StatusCode> {
2202 // let hydrant = &app_state.hydrant;
2203 let query_str = req.uri().query().unwrap_or("");
2204 let params = serde_urlencoded::from_str::<GetQuotesParams>(query_str).map_err(|e| {
2205 tracing::error!("failed to parse get_quotes query params: {e}");
2206 StatusCode::BAD_REQUEST
2207 })?;
2208
2209 let limit = params.limit.unwrap_or(50).min(100);
2210
2211 let mut fetch = app_state
2212 .hydrant
2213 .backlinks
2214 .fetch(params.uri.clone())
2215 .source("app.bsky.feed.post")
2216 .limit(limit * 3);
2217 if let Some(cursor_str) = params.cursor {
2218 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) {
2219 Ok(c) => {
2220 fetch = fetch.cursor(c);
2221 }
2222 Err(e) => {
2223 tracing::error!("failed to decode cursor {cursor_str}: {e}");
2224 }
2225 }
2226 }
2227
2228 let backlinks_page = match fetch.run().await {
2229 Ok(bp) => bp,
2230 Err(e) => {
2231 tracing::error!("failed to fetch backlinks for {}: {e}", params.uri);
2232 return proxy_request(req).await;
2233 }
2234 };
2235
2236 let mut posts = Vec::new();
2237 let mut found = 0;
2238 for bl in backlinks_page.backlinks {
2239 if found >= limit {
2240 break;
2241 }
2242
2243 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) {
2244 Ok(u) => u,
2245 Err(e) => {
2246 tracing::error!("failed to parse backlink URI {}: {e}", bl.uri);
2247 continue;
2248 }
2249 };
2250 let author_ident = uri.authority();
2251 let collection = uri.collection().unwrap().as_str();
2252 let rkey = uri.rkey().unwrap().0.as_str();
2253
2254 let repo = match app_state.hydrant.repos.resolve(author_ident).await {
2255 Ok(r) => r,
2256 Err(e) => {
2257 tracing::error!("failed to resolve repo for {author_ident}: {e}");
2258 continue;
2259 }
2260 };
2261 let record = match repo.get_record(collection, rkey).await {
2262 Ok(Some(rec)) => rec,
2263 Ok(None) => continue,
2264 Err(e) => {
2265 tracing::error!(
2266 "failed to get record {collection}/{rkey} from repo {author_ident}: {e}"
2267 );
2268 continue;
2269 }
2270 };
2271 let value = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({}));
2272
2273 let is_quote = value.get("embed").map_or(false, |embed| {
2274 let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or("");
2275 if t == "app.bsky.embed.record" {
2276 embed
2277 .get("record")
2278 .and_then(|r| r.get("uri"))
2279 .and_then(|u| u.as_str())
2280 == Some(params.uri.as_str())
2281 } else if t == "app.bsky.embed.recordWithMedia" {
2282 embed
2283 .get("record")
2284 .and_then(|r| r.get("record"))
2285 .and_then(|r| r.get("uri"))
2286 .and_then(|u| u.as_str())
2287 == Some(params.uri.as_str())
2288 } else {
2289 false
2290 }
2291 });
2292
2293 if is_quote {
2294 let viewer_did = get_auth_did(&req);
2295 match get_post_view(&app_state, bl.uri.as_str(), viewer_did.as_deref()).await {
2296 Ok(post_view) => {
2297 posts.push(post_view);
2298 found += 1;
2299 }
2300 Err(e) => {
2301 tracing::warn!("failed to get post view for quoted post {}: {e}", bl.uri);
2302 }
2303 }
2304 }
2305 }
2306
2307 let cursor = backlinks_page
2308 .next_cursor
2309 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c));
2310
2311 Ok(Json(serde_json::json!({
2312 "uri": params.uri,
2313 "posts": posts,
2314 "cursor": cursor,
2315 }))
2316 .into_response())
2317}
2318
2319#[derive(Deserialize)]
2320struct GetActorLikesParams {
2321 actor: String,
2322 #[serde(default)]
2323 limit: Option<usize>,
2324 #[serde(default)]
2325 cursor: Option<String>,
2326}
2327
2328async fn get_actor_likes(
2329 State(app_state): State<AppState>,
2330 req: Request,
2331) -> Result<Response, StatusCode> {
2332 // let hydrant = &app_state.hydrant;
2333 let query_str = req.uri().query().unwrap_or("");
2334 let params = serde_urlencoded::from_str::<GetActorLikesParams>(query_str).map_err(|e| {
2335 tracing::error!("failed to parse get_actor_likes query params: {e}");
2336 StatusCode::BAD_REQUEST
2337 })?;
2338
2339 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
2340 tracing::error!(
2341 "failed to create AtIdentifier for actor {}: {e}",
2342 params.actor
2343 );
2344 StatusCode::BAD_REQUEST
2345 })?;
2346 let repo = match app_state.hydrant.repos.resolve(&ident).await {
2347 Ok(r) => r,
2348 Err(e) => {
2349 tracing::error!("failed to resolve repo for {ident}: {e}");
2350 return proxy_request(req).await;
2351 }
2352 };
2353
2354 let limit = params.limit.unwrap_or(50).min(100);
2355
2356 let record_list = match repo
2357 .list_records("app.bsky.feed.like", limit, true, params.cursor.as_deref())
2358 .await
2359 {
2360 Ok(rl) => rl,
2361 Err(e) => {
2362 tracing::error!("failed to list likes for {}: {e}", repo.did);
2363 return proxy_request(req).await;
2364 }
2365 };
2366
2367 let mut feed = Vec::new();
2368 let viewer_did = get_auth_did(&req);
2369 for rec in record_list.records {
2370 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2371 if let Some(subject_uri) = value
2372 .get("subject")
2373 .and_then(|s| s.get("uri"))
2374 .and_then(|u| u.as_str())
2375 {
2376 match get_post_view(&app_state, subject_uri, viewer_did.as_deref()).await {
2377 Ok(post_view) => feed.push(serde_json::json!({ "post": post_view })),
2378 Err(e) => {
2379 tracing::warn!("failed to get post view for liked post {subject_uri}: {e}")
2380 }
2381 }
2382 }
2383 }
2384
2385 Ok(Json(serde_json::json!({
2386 "feed": feed,
2387 "cursor": record_list.cursor.map(|c| c.to_string())
2388 }))
2389 .into_response())
2390}
2391
2392#[derive(Deserialize)]
2393struct GetRelationshipsParams {
2394 actor: String,
2395}
2396
2397async fn get_relationships(
2398 State(app_state): State<AppState>,
2399 req: Request,
2400) -> Result<Response, StatusCode> {
2401 // let hydrant = &app_state.hydrant;
2402 let query_str = req.uri().query().unwrap_or("");
2403 let params = serde_urlencoded::from_str::<GetRelationshipsParams>(query_str).map_err(|e| {
2404 tracing::error!("failed to parse get_relationships query params: {e}");
2405 StatusCode::BAD_REQUEST
2406 })?;
2407
2408 let others = extract_query_array(query_str, "others");
2409 if others.is_empty() {
2410 return proxy_request(req).await;
2411 }
2412
2413 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
2414 tracing::error!(
2415 "failed to create AtIdentifier for actor {}: {e}",
2416 params.actor
2417 );
2418 StatusCode::BAD_REQUEST
2419 })?;
2420 let repo = match app_state.hydrant.repos.resolve(&ident).await {
2421 Ok(r) => r,
2422 Err(e) => {
2423 tracing::error!("failed to resolve repo for {ident}: {e}");
2424 return proxy_request(req).await;
2425 }
2426 };
2427 let actor_did = repo.did.as_str().to_string();
2428
2429 let mut relationships = Vec::new();
2430 for other in others {
2431 let other_ident = match AtIdentifier::new(&other) {
2432 Ok(i) => i,
2433 Err(e) => {
2434 tracing::error!("failed to create AtIdentifier for other actor {other}: {e}");
2435 continue;
2436 }
2437 };
2438 let other_repo = match app_state.hydrant.repos.resolve(&other_ident).await {
2439 Ok(r) => r,
2440 Err(e) => {
2441 tracing::error!("failed to resolve repo for other actor {other_ident}: {e}");
2442 continue;
2443 }
2444 };
2445 let other_did = other_repo.did.as_str().to_string();
2446
2447 let mut following = None;
2448 let mut followed_by = None;
2449
2450 let mut cursor = None;
2451 loop {
2452 match repo
2453 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref())
2454 .await
2455 {
2456 Ok(rl) => {
2457 for rec in &rl.records {
2458 let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2459 if val.get("subject").and_then(|s| s.as_str()) == Some(&other_did) {
2460 following = Some(format!(
2461 "at://{}/app.bsky.graph.follow/{}",
2462 actor_did,
2463 rec.rkey.as_str()
2464 ));
2465 break;
2466 }
2467 }
2468 if following.is_some() || rl.cursor.is_none() {
2469 break;
2470 }
2471 cursor = rl.cursor.map(|c| c.to_string());
2472 }
2473 Err(e) => {
2474 tracing::error!("failed to list follows for {actor_did}: {e}");
2475 break;
2476 }
2477 }
2478 }
2479
2480 let mut cursor = None;
2481 loop {
2482 match other_repo
2483 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref())
2484 .await
2485 {
2486 Ok(rl) => {
2487 for rec in &rl.records {
2488 let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2489 if val.get("subject").and_then(|s| s.as_str()) == Some(&actor_did) {
2490 followed_by = Some(format!(
2491 "at://{}/app.bsky.graph.follow/{}",
2492 other_did,
2493 rec.rkey.as_str()
2494 ));
2495 break;
2496 }
2497 }
2498 if followed_by.is_some() || rl.cursor.is_none() {
2499 break;
2500 }
2501 cursor = rl.cursor.map(|c| c.to_string());
2502 }
2503 Err(e) => {
2504 tracing::error!("failed to list follows for {other_did}: {e}");
2505 break;
2506 }
2507 }
2508 }
2509
2510 relationships.push(serde_json::json!({
2511 "$type": "app.bsky.graph.defs#relationship",
2512 "did": other_did,
2513 "following": following,
2514 "followedBy": followed_by,
2515 }));
2516 }
2517
2518 Ok(Json(serde_json::json!({
2519 "actor": actor_did,
2520 "relationships": relationships
2521 }))
2522 .into_response())
2523}
2524
2525async fn get_known_followers(
2526 State(app_state): State<AppState>,
2527 req: Request,
2528) -> Result<Response, StatusCode> {
2529 // let hydrant = &app_state.hydrant;
2530 let query_str = req.uri().query().unwrap_or("");
2531 let params = serde_urlencoded::from_str::<GetFollowsParams>(query_str).map_err(|e| {
2532 tracing::error!("failed to parse get_known_followers query params: {e}");
2533 StatusCode::BAD_REQUEST
2534 })?;
2535
2536 let Some(did_str) = get_auth_did(&req) else {
2537 return Err(StatusCode::UNAUTHORIZED);
2538 };
2539 let auth_ident = AtIdentifier::new(&did_str).map_err(|e| {
2540 tracing::error!("failed to create AtIdentifier for auth user {did_str}: {e}");
2541 StatusCode::BAD_REQUEST
2542 })?;
2543 let auth_repo = match app_state.hydrant.repos.resolve(&auth_ident).await {
2544 Ok(r) => r,
2545 Err(e) => {
2546 tracing::error!("failed to resolve repo for auth user {auth_ident}: {e}");
2547 return proxy_request(req).await;
2548 }
2549 };
2550
2551 let mut auth_follows = std::collections::HashSet::new();
2552 let mut cursor = None;
2553 loop {
2554 match auth_repo
2555 .list_records("app.bsky.graph.follow", 100, true, cursor.as_deref())
2556 .await
2557 {
2558 Ok(rl) => {
2559 for rec in &rl.records {
2560 let val = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2561 if let Some(subj) = val.get("subject").and_then(|s| s.as_str()) {
2562 auth_follows.insert(subj.to_string());
2563 }
2564 }
2565 if rl.cursor.is_none() {
2566 break;
2567 }
2568 cursor = rl.cursor.map(|c| c.to_string());
2569 }
2570 Err(e) => {
2571 tracing::error!(
2572 "failed to list follows for auth user {}: {e}",
2573 auth_repo.did
2574 );
2575 break;
2576 }
2577 }
2578 }
2579
2580 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
2581 tracing::error!(
2582 "failed to create AtIdentifier for actor {}: {e}",
2583 params.actor
2584 );
2585 StatusCode::BAD_REQUEST
2586 })?;
2587 let repo = match app_state.hydrant.repos.resolve(&ident).await {
2588 Ok(r) => r,
2589 Err(e) => {
2590 tracing::error!("failed to resolve repo for {ident}: {e}");
2591 return proxy_request(req).await;
2592 }
2593 };
2594
2595 let limit = params.limit.unwrap_or(50).min(100);
2596
2597 let mut fetch = app_state
2598 .hydrant
2599 .backlinks
2600 .fetch(repo.did.as_str().to_string())
2601 .source("app.bsky.graph.follow")
2602 .limit(limit * 3);
2603 if let Some(cursor_str) = params.cursor {
2604 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) {
2605 Ok(c) => {
2606 fetch = fetch.cursor(c);
2607 }
2608 Err(e) => {
2609 tracing::error!("failed to decode cursor {cursor_str}: {e}");
2610 }
2611 }
2612 }
2613
2614 let backlinks_page = match fetch.run().await {
2615 Ok(bp) => bp,
2616 Err(e) => {
2617 tracing::error!("failed to fetch backlinks for {}: {e}", repo.did);
2618 return proxy_request(req).await;
2619 }
2620 };
2621
2622 let mut followers = Vec::new();
2623 let mut found = 0;
2624 for bl in backlinks_page.backlinks {
2625 if found >= limit {
2626 break;
2627 }
2628 let uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) {
2629 Ok(u) => u,
2630 Err(e) => {
2631 tracing::error!("failed to parse backlink URI {}: {e}", bl.uri);
2632 continue;
2633 }
2634 };
2635 let author_ident = uri.authority().as_str().to_string();
2636
2637 if auth_follows.contains(&author_ident) {
2638 match get_profile_internal(&app_state, author_ident.as_str(), None).await {
2639 Ok(profile) => {
2640 followers.push(profile);
2641 found += 1;
2642 }
2643 Err(e) => {
2644 tracing::warn!("failed to get profile for follower {author_ident}: {e}");
2645 }
2646 }
2647 }
2648 }
2649
2650 let subject_profile = match get_profile_internal(&app_state, repo.did.as_str(), None).await {
2651 Ok(p) => p,
2652 Err(e) => {
2653 tracing::error!("failed to get subject profile for {}: {e}", repo.did);
2654 return proxy_request(req).await;
2655 }
2656 };
2657
2658 let cursor = backlinks_page
2659 .next_cursor
2660 .map(|c| data_encoding::BASE64URL_NOPAD.encode(&c));
2661
2662 Ok(Json(serde_json::json!({
2663 "subject": subject_profile,
2664 "followers": followers,
2665 "cursor": cursor,
2666 }))
2667 .into_response())
2668}
2669
2670async fn get_blocks(
2671 State(app_state): State<AppState>,
2672 req: Request,
2673) -> Result<Response, StatusCode> {
2674 // let hydrant = &app_state.hydrant;
2675 let query_str = req.uri().query().unwrap_or("");
2676 let params = serde_urlencoded::from_str::<GetFollowsParams>(query_str).map_err(|e| {
2677 tracing::error!("failed to parse get_blocks query params: {e}");
2678 StatusCode::BAD_REQUEST
2679 })?;
2680
2681 let Some(did_str) = get_auth_did(&req) else {
2682 return Err(StatusCode::UNAUTHORIZED);
2683 };
2684 let ident = AtIdentifier::new(&did_str).map_err(|e| {
2685 tracing::error!("failed to create AtIdentifier for did_str {did_str}: {e}");
2686 StatusCode::BAD_REQUEST
2687 })?;
2688 let repo = match app_state.hydrant.repos.resolve(&ident).await {
2689 Ok(r) => r,
2690 Err(e) => {
2691 tracing::error!("failed to resolve repo for {ident}: {e}");
2692 return proxy_request(req).await;
2693 }
2694 };
2695
2696 let limit = params.limit.unwrap_or(50).min(100);
2697 let record_list = match repo
2698 .list_records(
2699 "app.bsky.graph.block",
2700 limit,
2701 true,
2702 params.cursor.as_deref(),
2703 )
2704 .await
2705 {
2706 Ok(rl) => rl,
2707 Err(e) => {
2708 tracing::error!("failed to list blocks for {}: {e}", repo.did);
2709 return proxy_request(req).await;
2710 }
2711 };
2712
2713 let mut blocks = Vec::new();
2714 for rec in record_list.records {
2715 let value = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2716 if let Some(subject_did) = value.get("subject").and_then(|s| s.as_str()) {
2717 match get_profile_internal(&app_state, subject_did, None).await {
2718 Ok(profile) => blocks.push(profile),
2719 Err(e) => {
2720 tracing::warn!("failed to get profile for blocked actor {subject_did}: {e}")
2721 }
2722 }
2723 }
2724 }
2725
2726 Ok(Json(serde_json::json!({
2727 "blocks": blocks,
2728 "cursor": record_list.cursor.map(|c| c.to_string())
2729 }))
2730 .into_response())
2731}
2732
2733async fn get_list(State(app_state): State<AppState>, req: Request) -> Result<Response, StatusCode> {
2734 // let hydrant = &app_state.hydrant;
2735 let query_str = req.uri().query().unwrap_or("");
2736 #[derive(Deserialize)]
2737 struct GetListParams {
2738 list: String,
2739 #[serde(default)]
2740 limit: Option<usize>,
2741 #[serde(default)]
2742 cursor: Option<String>,
2743 }
2744 let params = serde_urlencoded::from_str::<GetListParams>(query_str).map_err(|e| {
2745 tracing::error!("failed to parse get_list query params: {e}");
2746 StatusCode::BAD_REQUEST
2747 })?;
2748
2749 let uri = jacquard_common::types::string::AtUri::new(¶ms.list).map_err(|e| {
2750 tracing::error!("failed to parse list URI {}: {e}", params.list);
2751 StatusCode::BAD_REQUEST
2752 })?;
2753 let author_ident = uri.authority();
2754 let rkey = uri
2755 .rkey()
2756 .ok_or_else(|| {
2757 tracing::error!("missing rkey in list URI {}", params.list);
2758 StatusCode::BAD_REQUEST
2759 })?
2760 .0
2761 .as_str()
2762 .to_string();
2763
2764 let repo = match app_state.hydrant.repos.resolve(author_ident).await {
2765 Ok(r) => r,
2766 Err(e) => {
2767 tracing::error!("failed to resolve repo for {author_ident}: {e}");
2768 return proxy_request(req).await;
2769 }
2770 };
2771
2772 let record = match repo.get_record("app.bsky.graph.list", &rkey).await {
2773 Ok(Some(rec)) => rec,
2774 Ok(None) => {
2775 tracing::error!("list record not found: {author_ident}/app.bsky.graph.list/{rkey}");
2776 return proxy_request(req).await;
2777 }
2778 Err(e) => {
2779 tracing::error!("failed to get list record {author_ident}/{rkey}: {e}");
2780 return proxy_request(req).await;
2781 }
2782 };
2783
2784 let viewer_did = get_auth_did(&req);
2785 let author_profile =
2786 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
2787 Ok(p) => p,
2788 Err(e) => {
2789 tracing::error!("failed to get author profile for {}: {e}", repo.did);
2790 return proxy_request(req).await;
2791 }
2792 };
2793
2794 let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({}));
2795
2796 let list_view = serde_json::json!({
2797 "uri": params.list,
2798 "cid": record.cid.to_string(),
2799 "creator": author_profile,
2800 "name": val_json.get("name").and_then(|v| v.as_str()).unwrap_or(""),
2801 "purpose": val_json.get("purpose").and_then(|v| v.as_str()).unwrap_or("app.bsky.graph.defs#modlist"),
2802 "description": val_json.get("description").and_then(|v| v.as_str()),
2803 "avatar": val_json.get("avatar").and_then(|v| v.as_str()),
2804 "indexedAt": chrono::Utc::now().to_rfc3339(),
2805 });
2806
2807 // items from backlinks
2808 let limit = params.limit.unwrap_or(50).min(100);
2809 let mut fetch = app_state
2810 .hydrant
2811 .backlinks
2812 .fetch(params.list.clone())
2813 .source("app.bsky.graph.listitem")
2814 .limit(limit);
2815 if let Some(cursor_str) = params.cursor {
2816 match data_encoding::BASE64URL_NOPAD.decode(cursor_str.as_bytes()) {
2817 Ok(c) => {
2818 fetch = fetch.cursor(c);
2819 }
2820 Err(e) => {
2821 tracing::error!("failed to decode cursor {cursor_str}: {e}");
2822 }
2823 }
2824 }
2825
2826 let backlinks_page = match fetch.run().await {
2827 Ok(bp) => bp,
2828 Err(e) => {
2829 tracing::error!("failed to fetch backlinks for list {}: {e}", params.list);
2830 return proxy_request(req).await;
2831 }
2832 };
2833
2834 let mut items = Vec::new();
2835 for bl in backlinks_page.backlinks {
2836 let item_uri = match jacquard_common::types::string::AtUri::new(bl.uri.as_str()) {
2837 Ok(u) => u,
2838 Err(e) => {
2839 tracing::error!("failed to parse listitem URI {}: {e}", bl.uri);
2840 continue;
2841 }
2842 };
2843 let item_author = item_uri.authority();
2844 let item_rkey = item_uri.rkey().unwrap().0.as_str();
2845
2846 let item_repo = match app_state.hydrant.repos.resolve(item_author).await {
2847 Ok(r) => r,
2848 Err(e) => {
2849 tracing::error!("failed to resolve repo for listitem author {item_author}: {e}");
2850 continue;
2851 }
2852 };
2853 let item_rec = match item_repo
2854 .get_record("app.bsky.graph.listitem", item_rkey)
2855 .await
2856 {
2857 Ok(Some(rec)) => rec,
2858 Ok(None) => continue,
2859 Err(e) => {
2860 tracing::error!("failed to get listitem record {item_author}/{item_rkey}: {e}");
2861 continue;
2862 }
2863 };
2864 let item_val = serde_json::to_value(&item_rec.value).unwrap_or(serde_json::json!({}));
2865
2866 if let Some(subject_did) = item_val.get("subject").and_then(|s| s.as_str()) {
2867 match get_profile_internal(&app_state, subject_did, None).await {
2868 Ok(subject_profile) => {
2869 items.push(serde_json::json!({
2870 "uri": bl.uri.as_str(),
2871 "subject": subject_profile,
2872 }));
2873 }
2874 Err(e) => {
2875 tracing::warn!("failed to get profile for listitem subject {subject_did}: {e}");
2876 }
2877 }
2878 }
2879 }
2880
2881 Ok(Json(serde_json::json!({
2882 "list": list_view,
2883 "items": items,
2884 "cursor": backlinks_page.next_cursor.map(|c| data_encoding::BASE64URL_NOPAD.encode(&c)),
2885 }))
2886 .into_response())
2887}
2888
2889async fn get_lists(
2890 State(app_state): State<AppState>,
2891 req: Request,
2892) -> Result<Response, StatusCode> {
2893 // let hydrant = &app_state.hydrant;
2894 let query_str = req.uri().query().unwrap_or("");
2895 let params = serde_urlencoded::from_str::<GetAuthorFeedParams>(query_str).map_err(|e| {
2896 tracing::error!("failed to parse get_lists query params: {e}");
2897 StatusCode::BAD_REQUEST
2898 })?;
2899
2900 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
2901 tracing::error!(
2902 "failed to create AtIdentifier for actor {}: {e}",
2903 params.actor
2904 );
2905 StatusCode::BAD_REQUEST
2906 })?;
2907 let repo = match app_state.hydrant.repos.resolve(&ident).await {
2908 Ok(r) => r,
2909 Err(e) => {
2910 tracing::error!("failed to resolve repo for {ident}: {e}");
2911 return proxy_request(req).await;
2912 }
2913 };
2914
2915 let viewer_did = get_auth_did(&req);
2916 let author_profile =
2917 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
2918 Ok(p) => p,
2919 Err(e) => {
2920 tracing::error!("failed to get author profile for {}: {e}", repo.did);
2921 return proxy_request(req).await;
2922 }
2923 };
2924
2925 let limit = params.limit.unwrap_or(50).min(100);
2926 let record_list = match repo
2927 .list_records("app.bsky.graph.list", limit, true, params.cursor.as_deref())
2928 .await
2929 {
2930 Ok(rl) => rl,
2931 Err(e) => {
2932 tracing::error!("failed to list lists for {}: {e}", repo.did);
2933 return proxy_request(req).await;
2934 }
2935 };
2936
2937 let mut lists = Vec::new();
2938 for rec in record_list.records {
2939 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
2940 lists.push(serde_json::json!({
2941 "uri": format!("at://{}/app.bsky.graph.list/{}", repo.did.as_str(), rec.rkey.as_str()),
2942 "cid": rec.cid.to_string(),
2943 "creator": author_profile.clone(),
2944 "name": val_json.get("name").and_then(|v| v.as_str()).unwrap_or(""),
2945 "purpose": val_json.get("purpose").and_then(|v| v.as_str()).unwrap_or("app.bsky.graph.defs#modlist"),
2946 "description": val_json.get("description").and_then(|v| v.as_str()),
2947 "avatar": val_json.get("avatar").and_then(|v| v.as_str()),
2948 "indexedAt": chrono::Utc::now().to_rfc3339(),
2949 }));
2950 }
2951
2952 Ok(Json(serde_json::json!({
2953 "lists": lists,
2954 "cursor": record_list.cursor.map(|c| c.to_string())
2955 }))
2956 .into_response())
2957}
2958
2959async fn get_lists_with_membership(
2960 State(app_state): State<AppState>,
2961 req: Request,
2962) -> Result<Response, StatusCode> {
2963 // let hydrant = &app_state.hydrant;
2964 let query_str = req.uri().query().unwrap_or("");
2965 let params = serde_urlencoded::from_str::<GetAuthorFeedParams>(query_str).map_err(|e| {
2966 tracing::error!("failed to parse get_lists_with_membership query params: {e}");
2967 StatusCode::BAD_REQUEST
2968 })?;
2969
2970 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
2971 tracing::error!(
2972 "failed to create AtIdentifier for actor {}: {e}",
2973 params.actor
2974 );
2975 StatusCode::BAD_REQUEST
2976 })?;
2977 let repo = match app_state.hydrant.repos.resolve(&ident).await {
2978 Ok(r) => r,
2979 Err(e) => {
2980 tracing::error!("failed to resolve repo for {ident}: {e}");
2981 return proxy_request(req).await;
2982 }
2983 };
2984
2985 // All lists created by actor
2986 let record_list = match repo
2987 .list_records("app.bsky.graph.list", 100, true, None)
2988 .await
2989 {
2990 Ok(rl) => rl,
2991 Err(e) => {
2992 tracing::error!("failed to list lists for {}: {e}", repo.did);
2993 return proxy_request(req).await;
2994 }
2995 };
2996
2997 let lists: Vec<serde_json::Value> = Vec::new();
2998 for rec in record_list.records {
2999 let _uri = format!(
3000 "at://{}/app.bsky.graph.list/{}",
3001 repo.did.as_str(),
3002 rec.rkey.as_str()
3003 );
3004
3005 // Check if subject is in this list
3006 // This is tricky without reverse index on listitem subject.
3007 // We might just fallback here or implement a scan.
3008 // For now, proxy it if we can't efficiently answer.
3009 return proxy_request(req).await;
3010 }
3011
3012 Ok(Json(serde_json::json!({ "lists": lists })).into_response())
3013}
3014
3015async fn get_actor_feeds(
3016 State(app_state): State<AppState>,
3017 req: Request,
3018) -> Result<Response, StatusCode> {
3019 // let hydrant = &app_state.hydrant;
3020 let query_str = req.uri().query().unwrap_or("");
3021 let params: GetAuthorFeedParams = serde_urlencoded::from_str(query_str).map_err(|e| {
3022 tracing::error!("failed to parse get_actor_feeds query params: {e}");
3023 StatusCode::BAD_REQUEST
3024 })?;
3025
3026 let ident = AtIdentifier::new(¶ms.actor).map_err(|e| {
3027 tracing::error!(
3028 "failed to create AtIdentifier for actor {}: {e}",
3029 params.actor
3030 );
3031 StatusCode::BAD_REQUEST
3032 })?;
3033 let repo = match app_state.hydrant.repos.resolve(&ident).await {
3034 Ok(r) => r,
3035 Err(e) => {
3036 tracing::error!("failed to resolve repo for {ident}: {e}");
3037 return proxy_request(req).await;
3038 }
3039 };
3040
3041 let viewer_did = get_auth_did(&req);
3042 let author_profile =
3043 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
3044 Ok(p) => p,
3045 Err(e) => {
3046 tracing::error!("failed to get author profile for {}: {e}", repo.did);
3047 return proxy_request(req).await;
3048 }
3049 };
3050
3051 let limit = params.limit.unwrap_or(50).min(100);
3052 let record_list = match repo
3053 .list_records(
3054 "app.bsky.feed.generator",
3055 limit,
3056 true,
3057 params.cursor.as_deref(),
3058 )
3059 .await
3060 {
3061 Ok(rl) => rl,
3062 Err(e) => {
3063 tracing::error!("failed to list feed generators for {}: {e}", repo.did);
3064 return proxy_request(req).await;
3065 }
3066 };
3067
3068 let mut feeds = Vec::new();
3069 for rec in record_list.records {
3070 let val_json = serde_json::to_value(&rec.value).unwrap_or(serde_json::json!({}));
3071 let uri = format!(
3072 "at://{}/app.bsky.feed.generator/{}",
3073 repo.did.as_str(),
3074 rec.rkey.as_str()
3075 );
3076 feeds.push(serde_json::json!({
3077 "uri": uri,
3078 "cid": rec.cid.to_string(),
3079 "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""),
3080 "creator": author_profile.clone(),
3081 "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""),
3082 "description": val_json.get("description").and_then(|v| v.as_str()),
3083 "avatar": val_json.get("avatar").and_then(|v| v.as_str()),
3084 "likeCount": app_state.hydrant.backlinks.count(uri).source("app.bsky.feed.like").run().await.unwrap_or(0),
3085 "indexedAt": chrono::Utc::now().to_rfc3339(),
3086 }));
3087 }
3088
3089 Ok(Json(serde_json::json!({
3090 "feeds": feeds,
3091 "cursor": record_list.cursor.map(|c| c.to_string())
3092 }))
3093 .into_response())
3094}
3095
3096async fn get_feed_generator(
3097 State(app_state): State<AppState>,
3098 req: Request,
3099) -> Result<Response, StatusCode> {
3100 // let hydrant = &app_state.hydrant;
3101 let query_str = req.uri().query().unwrap_or("");
3102 #[derive(Deserialize)]
3103 struct GetFeedGeneratorParams {
3104 feed: String,
3105 }
3106 let params: GetFeedGeneratorParams = serde_urlencoded::from_str(query_str).map_err(|e| {
3107 tracing::error!("failed to parse get_feed_generator query params: {e}");
3108 StatusCode::BAD_REQUEST
3109 })?;
3110
3111 let uri = jacquard_common::types::string::AtUri::new(¶ms.feed).map_err(|e| {
3112 tracing::error!("failed to parse feed uri {}: {e}", params.feed);
3113 StatusCode::BAD_REQUEST
3114 })?;
3115 let author_ident = uri.authority();
3116 let rkey = uri
3117 .rkey()
3118 .ok_or_else(|| {
3119 tracing::error!("missing rkey in feed uri {}", params.feed);
3120 StatusCode::BAD_REQUEST
3121 })?
3122 .0
3123 .as_str()
3124 .to_string();
3125
3126 let repo = match app_state.hydrant.repos.resolve(author_ident).await {
3127 Ok(r) => r,
3128 Err(e) => {
3129 tracing::error!("failed to resolve repo for {author_ident}: {e}");
3130 return proxy_request(req).await;
3131 }
3132 };
3133
3134 let record = match repo.get_record("app.bsky.feed.generator", &rkey).await {
3135 Ok(Some(r)) => r,
3136 Ok(None) => {
3137 tracing::error!("feed generator record not found: {author_ident}/{rkey}");
3138 return proxy_request(req).await;
3139 }
3140 Err(e) => {
3141 tracing::error!("failed to get feed generator record {author_ident}/{rkey}: {e}");
3142 return proxy_request(req).await;
3143 }
3144 };
3145
3146 let viewer_did = get_auth_did(&req);
3147 let author_profile =
3148 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref()).await {
3149 Ok(p) => p,
3150 Err(e) => {
3151 tracing::error!("failed to get author profile for {}: {e}", repo.did);
3152 return proxy_request(req).await;
3153 }
3154 };
3155
3156 let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({}));
3157
3158 let view = serde_json::json!({
3159 "uri": params.feed,
3160 "cid": record.cid.to_string(),
3161 "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""),
3162 "creator": author_profile,
3163 "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""),
3164 "description": val_json.get("description").and_then(|v| v.as_str()),
3165 "avatar": val_json.get("avatar").and_then(|v| v.as_str()),
3166 "likeCount": app_state.hydrant.backlinks.count(params.feed.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0),
3167 "indexedAt": chrono::Utc::now().to_rfc3339(),
3168 });
3169
3170 Ok(Json(serde_json::json!({
3171 "view": view,
3172 "isOnline": true,
3173 "isValid": true,
3174 }))
3175 .into_response())
3176}
3177
3178async fn get_feed_generators(
3179 State(app_state): State<AppState>,
3180 req: Request,
3181) -> Result<Response, StatusCode> {
3182 // let hydrant = &app_state.hydrant;
3183 let query_str = req.uri().query().unwrap_or("");
3184 let feeds = extract_query_array(query_str, "feeds");
3185 if feeds.is_empty() {
3186 return proxy_request(req).await;
3187 }
3188
3189 let mut views = Vec::new();
3190 let viewer_did = get_auth_did(&req);
3191 for feed_uri in feeds {
3192 let uri = match jacquard_common::types::string::AtUri::new(&feed_uri) {
3193 Ok(u) => u,
3194 Err(e) => {
3195 tracing::warn!("failed to parse feed uri {feed_uri}: {e}");
3196 continue;
3197 }
3198 };
3199 let author_ident = uri.authority();
3200 let rkey = uri.rkey().unwrap().0.as_str().to_string();
3201
3202 match app_state.hydrant.repos.resolve(author_ident).await {
3203 Ok(repo) => match repo.get_record("app.bsky.feed.generator", &rkey).await {
3204 Ok(Some(record)) => {
3205 match get_profile_internal(&app_state, repo.did.as_str(), viewer_did.as_deref())
3206 .await
3207 {
3208 Ok(author_profile) => {
3209 let val_json = serde_json::to_value(&record.value)
3210 .unwrap_or(serde_json::json!({}));
3211 views.push(serde_json::json!({
3212 "uri": feed_uri,
3213 "cid": record.cid.to_string(),
3214 "did": val_json.get("did").and_then(|v| v.as_str()).unwrap_or(""),
3215 "creator": author_profile,
3216 "displayName": val_json.get("displayName").and_then(|v| v.as_str()).unwrap_or(""),
3217 "description": val_json.get("description").and_then(|v| v.as_str()),
3218 "avatar": val_json.get("avatar").and_then(|v| v.as_str()),
3219 "likeCount": app_state.hydrant.backlinks.count(feed_uri.clone()).source("app.bsky.feed.like").run().await.unwrap_or(0),
3220 "indexedAt": chrono::Utc::now().to_rfc3339(),
3221 }));
3222 }
3223 Err(e) => {
3224 tracing::warn!("failed to get author profile for {}: {e}", repo.did)
3225 }
3226 }
3227 }
3228 Ok(None) => {
3229 tracing::warn!("feed generator record not found: {author_ident}/{rkey}")
3230 }
3231 Err(e) => {
3232 tracing::warn!("failed to get feed generator record {author_ident}/{rkey}: {e}")
3233 }
3234 },
3235 Err(e) => tracing::warn!("failed to resolve repo for {author_ident}: {e}"),
3236 }
3237 }
3238
3239 Ok(Json(serde_json::json!({ "feeds": views })).into_response())
3240}
3241
3242#[derive(Deserialize)]
3243struct PutPreferencesReq {
3244 preferences: Vec<serde_json::Value>,
3245}
3246
3247async fn put_preferences(
3248 State(app_state): State<AppState>,
3249 req: Request,
3250) -> Result<Response, StatusCode> {
3251 let Some(did) = get_auth_did(&req) else {
3252 return Err(StatusCode::UNAUTHORIZED);
3253 };
3254
3255 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
3256 .await
3257 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3258 let payload: PutPreferencesReq = serde_json::from_slice(&body_bytes).map_err(|e| {
3259 tracing::error!("failed to parse put_preferences request: {e}");
3260 StatusCode::BAD_REQUEST
3261 })?;
3262
3263 let key = format!("prefs:{}", did);
3264 let val =
3265 serde_json::to_vec(&payload.preferences).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3266 app_state
3267 .preferences
3268 .insert(key.as_bytes(), val)
3269 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3270
3271 Ok(Json(serde_json::json!({})).into_response())
3272}
3273
3274async fn get_preferences(
3275 State(app_state): State<AppState>,
3276 req: Request,
3277) -> Result<Response, StatusCode> {
3278 let Some(did) = get_auth_did(&req) else {
3279 return Err(StatusCode::UNAUTHORIZED);
3280 };
3281
3282 let key = format!("prefs:{}", did);
3283 let existing = app_state
3284 .preferences
3285 .get(key.as_bytes())
3286 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3287
3288 let preferences = if let Some(bytes) = existing {
3289 serde_json::from_slice::<Vec<serde_json::Value>>(&bytes).unwrap_or_default()
3290 } else {
3291 Vec::new()
3292 };
3293
3294 Ok(Json(serde_json::json!({ "preferences": preferences })).into_response())
3295}
3296
3297#[derive(Deserialize)]
3298struct MuteActorReq {
3299 actor: String,
3300}
3301
3302async fn mute_actor(
3303 State(app_state): State<AppState>,
3304 req: Request,
3305) -> Result<Response, StatusCode> {
3306 // let hydrant = &app_state.hydrant;
3307 let Some(did) = get_auth_did(&req) else {
3308 return Err(StatusCode::UNAUTHORIZED);
3309 };
3310
3311 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
3312 .await
3313 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3314 let payload: MuteActorReq = serde_json::from_slice(&body_bytes).map_err(|e| {
3315 tracing::error!("failed to parse mute_actor/unmute_actor request: {e}");
3316 StatusCode::BAD_REQUEST
3317 })?;
3318
3319 let ident = AtIdentifier::new(&payload.actor).map_err(|e| {
3320 tracing::error!("failed to parse actor identifier {}: {e}", payload.actor);
3321 StatusCode::BAD_REQUEST
3322 })?;
3323 let repo = match app_state.hydrant.repos.resolve(&ident).await {
3324 Ok(r) => r,
3325 Err(e) => {
3326 tracing::error!("failed to resolve repo for {ident}: {e}");
3327 return Err(StatusCode::NOT_FOUND);
3328 }
3329 };
3330
3331 let key = format!("mute:{}:{}", did, repo.did.as_str());
3332 app_state
3333 .mutes
3334 .insert(key.as_bytes(), b"")
3335 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3336
3337 Ok(Json(serde_json::json!({})).into_response())
3338}
3339
3340async fn unmute_actor(
3341 State(app_state): State<AppState>,
3342 req: Request,
3343) -> Result<Response, StatusCode> {
3344 // let hydrant = &app_state.hydrant;
3345 let Some(did) = get_auth_did(&req) else {
3346 return Err(StatusCode::UNAUTHORIZED);
3347 };
3348
3349 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
3350 .await
3351 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3352 let payload: MuteActorReq = serde_json::from_slice(&body_bytes).map_err(|e| {
3353 tracing::error!("failed to parse mute_actor/unmute_actor request: {e}");
3354 StatusCode::BAD_REQUEST
3355 })?;
3356
3357 let ident = AtIdentifier::new(&payload.actor).map_err(|e| {
3358 tracing::error!("failed to parse actor identifier {}: {e}", payload.actor);
3359 StatusCode::BAD_REQUEST
3360 })?;
3361 let repo = match app_state.hydrant.repos.resolve(&ident).await {
3362 Ok(r) => r,
3363 Err(e) => {
3364 tracing::error!("failed to resolve repo for {ident}: {e}");
3365 return Err(StatusCode::NOT_FOUND);
3366 }
3367 };
3368
3369 let key = format!("mute:{}:{}", did, repo.did.as_str());
3370 app_state
3371 .mutes
3372 .remove(key.as_bytes())
3373 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3374
3375 Ok(Json(serde_json::json!({})).into_response())
3376}
3377
3378async fn get_mutes(
3379 State(app_state): State<AppState>,
3380 req: Request,
3381) -> Result<Response, StatusCode> {
3382 // let hydrant = &app_state.hydrant;
3383 let Some(did) = get_auth_did(&req) else {
3384 return Err(StatusCode::UNAUTHORIZED);
3385 };
3386
3387 let query_str = req.uri().query().unwrap_or("");
3388 let params: GetBookmarksParams = serde_urlencoded::from_str(query_str).map_err(|e| {
3389 tracing::error!("failed to parse get_mutes query params: {e}");
3390 StatusCode::BAD_REQUEST
3391 })?;
3392
3393 let limit = params.limit.unwrap_or(50).min(100);
3394 let prefix = format!("mute:{}:", did);
3395
3396 let iter: Box<dyn Iterator<Item = _>> = if let Some(cursor) = params.cursor {
3397 Box::new(app_state.mutes.range::<&[u8], _>((
3398 std::ops::Bound::Included(cursor.as_bytes()),
3399 std::ops::Bound::Unbounded,
3400 )))
3401 } else {
3402 Box::new(app_state.mutes.prefix(prefix.as_bytes()))
3403 };
3404
3405 let mut fetched_keys = Vec::new();
3406 for item in iter {
3407 let (key, _) = match item.into_inner() {
3408 Ok(inner) => inner,
3409 Err(e) => {
3410 tracing::warn!("failed to get item from mutes iterator: {e}");
3411 continue;
3412 }
3413 };
3414 if !key.starts_with(prefix.as_bytes()) {
3415 break;
3416 }
3417 fetched_keys.push(key.to_vec());
3418 if fetched_keys.len() >= limit {
3419 break;
3420 }
3421 }
3422
3423 let mut mutes = Vec::new();
3424 let mut next_cursor = None;
3425 for key in fetched_keys {
3426 let key_str = String::from_utf8_lossy(&key);
3427 let muted_did = key_str.strip_prefix(&prefix).unwrap_or(&key_str);
3428 match get_profile_internal(&app_state, muted_did, None).await {
3429 Ok(profile) => mutes.push(profile),
3430 Err(e) => tracing::warn!("failed to get profile for muted actor {muted_did}: {e}"),
3431 }
3432 next_cursor = Some(key_str.to_string());
3433 }
3434
3435 Ok(Json(serde_json::json!({
3436 "mutes": mutes,
3437 "cursor": next_cursor
3438 }))
3439 .into_response())
3440}
3441
3442async fn notification_indexer(app_state: AppState) {
3443 let mut stream = app_state.hydrant.subscribe(None);
3444 while let Some(evt) = stream.next().await {
3445 let Some(rec) = evt.record else { continue };
3446 if rec.action != "create" {
3447 continue;
3448 }
3449
3450 let author_did = rec.did.as_str();
3451 let collection = rec.collection.as_str();
3452 let rkey = rec.rkey.as_str();
3453 let uri = format!("at://{}/{}/{}", author_did, collection, rkey);
3454
3455 let Some(val) = rec.record else { continue };
3456
3457 let mut targets = Vec::new(); // (target_did, reason, reason_subject)
3458
3459 match collection {
3460 "app.bsky.feed.like" => {
3461 if let Some(subj_uri) = val
3462 .get("subject")
3463 .and_then(|s| s.get("uri"))
3464 .and_then(|u| u.as_str())
3465 {
3466 match jacquard_common::types::string::AtUri::new(subj_uri) {
3467 Ok(u) => {
3468 targets.push((
3469 u.authority().as_str().to_string(),
3470 "like",
3471 Some(subj_uri.to_string()),
3472 ));
3473 }
3474 Err(e) => {
3475 tracing::warn!("failed to parse notification like uri {subj_uri}: {e}")
3476 }
3477 }
3478 }
3479 }
3480 "app.bsky.feed.repost" => {
3481 if let Some(subj_uri) = val
3482 .get("subject")
3483 .and_then(|s| s.get("uri"))
3484 .and_then(|u| u.as_str())
3485 {
3486 match jacquard_common::types::string::AtUri::new(subj_uri) {
3487 Ok(u) => {
3488 targets.push((
3489 u.authority().as_str().to_string(),
3490 "repost",
3491 Some(subj_uri.to_string()),
3492 ));
3493 }
3494 Err(e) => tracing::warn!(
3495 "failed to parse notification repost uri {subj_uri}: {e}"
3496 ),
3497 }
3498 }
3499 }
3500 "app.bsky.graph.follow" => {
3501 if let Some(subj_did) = val.get("subject").and_then(|s| s.as_str()) {
3502 targets.push((subj_did.to_string(), "follow", None));
3503 }
3504 }
3505 "app.bsky.feed.post" => {
3506 if let Some(parent_uri) = val
3507 .get("reply")
3508 .and_then(|r| r.get("parent"))
3509 .and_then(|p| p.get("uri"))
3510 .and_then(|u| u.as_str())
3511 {
3512 match jacquard_common::types::string::AtUri::new(parent_uri) {
3513 Ok(u) => {
3514 targets.push((
3515 u.authority().as_str().to_string(),
3516 "reply",
3517 Some(parent_uri.to_string()),
3518 ));
3519 }
3520 Err(e) => tracing::warn!(
3521 "failed to parse notification reply uri {parent_uri}: {e}"
3522 ),
3523 }
3524 }
3525
3526 let is_quote = val.get("embed").map_or(false, |embed| {
3527 let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or("");
3528 if t == "app.bsky.embed.record" {
3529 embed
3530 .get("record")
3531 .and_then(|r| r.get("uri"))
3532 .and_then(|u| u.as_str())
3533 .is_some()
3534 } else if t == "app.bsky.embed.recordWithMedia" {
3535 embed
3536 .get("record")
3537 .and_then(|r| r.get("record"))
3538 .and_then(|r| r.get("uri"))
3539 .and_then(|u| u.as_str())
3540 .is_some()
3541 } else {
3542 false
3543 }
3544 });
3545
3546 if is_quote {
3547 if let Some(embed) = val.get("embed") {
3548 let t = embed.get("$type").and_then(|t| t.as_str()).unwrap_or("");
3549 let quote_uri = if t == "app.bsky.embed.record" {
3550 embed
3551 .get("record")
3552 .and_then(|r| r.get("uri"))
3553 .and_then(|u| u.as_str())
3554 } else if t == "app.bsky.embed.recordWithMedia" {
3555 embed
3556 .get("record")
3557 .and_then(|r| r.get("record"))
3558 .and_then(|r| r.get("uri"))
3559 .and_then(|u| u.as_str())
3560 } else {
3561 None
3562 };
3563
3564 if let Some(qu) = quote_uri {
3565 match jacquard_common::types::string::AtUri::new(qu) {
3566 Ok(u) => {
3567 targets.push((
3568 u.authority().as_str().to_string(),
3569 "quote",
3570 Some(qu.to_string()),
3571 ));
3572 }
3573 Err(e) => tracing::warn!(
3574 "failed to parse notification quote uri {qu}: {e}"
3575 ),
3576 }
3577 }
3578 }
3579 }
3580
3581 if let Some(facets) = val.get("facets").and_then(|f| f.as_array()) {
3582 for facet in facets {
3583 if let Some(features) = facet.get("features").and_then(|f| f.as_array()) {
3584 for feature in features {
3585 if feature.get("$type").and_then(|t| t.as_str())
3586 == Some("app.bsky.richtext.facet#mention")
3587 {
3588 if let Some(mention_did) =
3589 feature.get("did").and_then(|d| d.as_str())
3590 {
3591 targets.push((mention_did.to_string(), "mention", None));
3592 }
3593 }
3594 }
3595 }
3596 }
3597 }
3598 }
3599 _ => {}
3600 }
3601
3602 let indexed_at = chrono::Utc::now().to_rfc3339();
3603 for (target_did, reason, reason_subject) in targets {
3604 if target_did == author_did {
3605 continue;
3606 }
3607
3608 let key = format!("notif:{}:{:016x}", target_did, evt.id);
3609 let notif = serde_json::json!({
3610 "uri": uri,
3611 "cid": rec.cid.map(|c| c.to_string()).unwrap_or_default(),
3612 "author_did": author_did,
3613 "reason": reason,
3614 "reasonSubject": reason_subject,
3615 "record": val,
3616 "indexedAt": indexed_at,
3617 "id": evt.id
3618 });
3619
3620 let _ = app_state
3621 .notifications
3622 .insert(key.as_bytes(), serde_json::to_vec(¬if).unwrap());
3623 }
3624 }
3625}
3626
3627#[derive(Deserialize)]
3628struct ListNotificationsParams {
3629 #[serde(default)]
3630 limit: Option<usize>,
3631 #[serde(default)]
3632 cursor: Option<String>,
3633}
3634
3635async fn list_notifications(
3636 State(app_state): State<AppState>,
3637 req: Request,
3638) -> Result<Response, StatusCode> {
3639 // let hydrant = &app_state.hydrant;
3640 let Some(did) = get_auth_did(&req) else {
3641 return Err(StatusCode::UNAUTHORIZED);
3642 };
3643
3644 let query_str = req.uri().query().unwrap_or("");
3645 let params = serde_urlencoded::from_str::<ListNotificationsParams>(query_str).unwrap_or(
3646 ListNotificationsParams {
3647 limit: None,
3648 cursor: None,
3649 },
3650 );
3651 let limit = params.limit.unwrap_or(50).min(100);
3652
3653 let prefix = format!("notif:{}:", did);
3654
3655 // Collect all notification keys into a vector so we can iterate in reverse
3656 let mut all_keys = Vec::new();
3657 for item in app_state.notifications.prefix(prefix.as_bytes()) {
3658 match item.into_inner() {
3659 Ok((k, v)) => {
3660 all_keys.push((k, v));
3661 }
3662 Err(e) => {
3663 tracing::warn!("failed to get notification item: {e}");
3664 }
3665 }
3666 }
3667
3668 // Sort keys just in case and reverse
3669 all_keys.sort_by(|a, b| b.0.cmp(&a.0));
3670
3671 // Pagination
3672 if let Some(cursor) = params.cursor {
3673 all_keys.retain(|(k, _)| String::from_utf8_lossy(k).as_ref() < cursor.as_str());
3674 }
3675
3676 let seen_key = format!("seen:{}", did);
3677 let last_seen = app_state
3678 .seen
3679 .get(seen_key.as_bytes())
3680 .ok()
3681 .flatten()
3682 .and_then(|b| String::from_utf8(b.to_vec()).ok())
3683 .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
3684
3685 let mut notifications = Vec::new();
3686 let mut next_cursor = None;
3687
3688 for (key, val_bytes) in all_keys.into_iter().take(limit) {
3689 next_cursor = Some(String::from_utf8_lossy(&key).to_string());
3690
3691 let val = match serde_json::from_slice::<serde_json::Value>(&val_bytes) {
3692 Ok(v) => v,
3693 Err(e) => {
3694 tracing::warn!("failed to parse notification json: {e}");
3695 continue;
3696 }
3697 };
3698
3699 let author_did = val.get("author_did").and_then(|a| a.as_str()).unwrap_or("");
3700 let indexed_at = val.get("indexedAt").and_then(|a| a.as_str()).unwrap_or("");
3701
3702 let is_read = indexed_at <= last_seen.as_str();
3703
3704 let mut notif = val.clone();
3705 match get_profile_internal(&app_state, author_did, None).await {
3706 Ok(author_profile) => {
3707 notif["author"] = author_profile;
3708 }
3709 Err(e) => {
3710 tracing::warn!(
3711 "failed to get author profile for notification from {author_did}: {e}"
3712 );
3713 continue;
3714 }
3715 }
3716
3717 notif["isRead"] = serde_json::json!(is_read);
3718
3719 notif.as_object_mut().unwrap().remove("author_did");
3720 notif.as_object_mut().unwrap().remove("id");
3721
3722 notifications.push(notif);
3723 }
3724
3725 Ok(Json(serde_json::json!({
3726 "notifications": notifications,
3727 "cursor": if notifications.len() == limit { next_cursor } else { None }
3728 }))
3729 .into_response())
3730}
3731
3732async fn get_unread_count(
3733 State(app_state): State<AppState>,
3734 req: Request,
3735) -> Result<Response, StatusCode> {
3736 let Some(did) = get_auth_did(&req) else {
3737 return Err(StatusCode::UNAUTHORIZED);
3738 };
3739
3740 let prefix = format!("notif:{}:", did);
3741 let seen_key = format!("seen:{}", did);
3742 let last_seen = app_state
3743 .seen
3744 .get(seen_key.as_bytes())
3745 .ok()
3746 .flatten()
3747 .and_then(|b| String::from_utf8(b.to_vec()).ok())
3748 .unwrap_or_else(|| "1970-01-01T00:00:00Z".to_string());
3749
3750 let mut count = 0;
3751 for item in app_state.notifications.prefix(prefix.as_bytes()) {
3752 let (_key, val_bytes) = match item.into_inner() {
3753 Ok(inner) => inner,
3754 Err(e) => {
3755 tracing::warn!("failed to get notification item for count: {e}");
3756 continue;
3757 }
3758 };
3759 match serde_json::from_slice::<serde_json::Value>(&val_bytes) {
3760 Ok(val) => {
3761 if let Some(indexed_at) = val.get("indexedAt").and_then(|a| a.as_str()) {
3762 if indexed_at > last_seen.as_str() {
3763 count += 1;
3764 }
3765 }
3766 }
3767 Err(e) => tracing::warn!("failed to parse notification json for count: {e}"),
3768 }
3769 }
3770
3771 Ok(Json(serde_json::json!({
3772 "count": count
3773 }))
3774 .into_response())
3775}
3776
3777#[derive(Deserialize)]
3778struct UpdateSeenReq {
3779 #[serde(rename = "seenAt")]
3780 seen_at: String,
3781}
3782
3783async fn update_seen(
3784 State(app_state): State<AppState>,
3785 req: Request,
3786) -> Result<Response, StatusCode> {
3787 let Some(did) = get_auth_did(&req) else {
3788 return Err(StatusCode::UNAUTHORIZED);
3789 };
3790
3791 let body_bytes = axum::body::to_bytes(req.into_body(), usize::MAX)
3792 .await
3793 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
3794 let payload: UpdateSeenReq = serde_json::from_slice(&body_bytes).map_err(|e| {
3795 tracing::error!("failed to parse update_seen request: {e}");
3796 StatusCode::BAD_REQUEST
3797 })?;
3798
3799 let seen_key = format!("seen:{}", did);
3800 app_state
3801 .seen
3802 .insert(seen_key.as_bytes(), payload.seen_at.as_bytes())
3803 .map_err(|e| {
3804 tracing::error!("failed to insert seen: {e}");
3805 StatusCode::INTERNAL_SERVER_ERROR
3806 })?;
3807
3808 Ok(Json(serde_json::json!({})).into_response())
3809}
3810
3811#[derive(Deserialize)]
3812struct GetFeedParams {
3813 feed: String,
3814 #[serde(default)]
3815 limit: Option<usize>,
3816 #[serde(default)]
3817 cursor: Option<String>,
3818}
3819
3820async fn get_feed(State(app_state): State<AppState>, req: Request) -> Result<Response, StatusCode> {
3821 // let hydrant = &app_state.hydrant;
3822 let query_str = req.uri().query().unwrap_or("");
3823 let params: GetFeedParams = serde_urlencoded::from_str(query_str).map_err(|e| {
3824 tracing::error!("failed to parse get_feed query params: {e}");
3825 StatusCode::BAD_REQUEST
3826 })?;
3827
3828 let uri = jacquard_common::types::string::AtUri::new(¶ms.feed).map_err(|e| {
3829 tracing::error!("failed to parse feed uri {}: {e}", params.feed);
3830 StatusCode::BAD_REQUEST
3831 })?;
3832 let author_ident = uri.authority();
3833 let rkey = uri
3834 .rkey()
3835 .ok_or_else(|| {
3836 tracing::error!("missing rkey in feed uri {}", params.feed);
3837 StatusCode::BAD_REQUEST
3838 })?
3839 .0
3840 .as_str()
3841 .to_string();
3842
3843 let repo = match app_state.hydrant.repos.resolve(author_ident).await {
3844 Ok(r) => r,
3845 Err(e) => {
3846 tracing::error!("failed to resolve repo for {author_ident}: {e}");
3847 return proxy_request(req).await;
3848 }
3849 };
3850
3851 let record = match repo.get_record("app.bsky.feed.generator", &rkey).await {
3852 Ok(Some(r)) => r,
3853 Ok(None) => {
3854 tracing::error!("feed generator record not found: {author_ident}/{rkey}");
3855 return proxy_request(req).await;
3856 }
3857 Err(e) => {
3858 tracing::error!("failed to get feed generator record {author_ident}/{rkey}: {e}");
3859 return proxy_request(req).await;
3860 }
3861 };
3862
3863 let val_json = serde_json::to_value(&record.value).unwrap_or(serde_json::json!({}));
3864 let Some(service_did) = val_json.get("did").and_then(|d| d.as_str()) else {
3865 tracing::error!("missing did in feed generator record {author_ident}/{rkey}");
3866 return Err(StatusCode::BAD_REQUEST);
3867 };
3868
3869 let service_did_parsed =
3870 jacquard_common::types::string::Did::new(service_did).map_err(|e| {
3871 tracing::error!("failed to parse service did {service_did}: {e}");
3872 StatusCode::BAD_REQUEST
3873 })?;
3874
3875 let (doc_data, _) = match app_state
3876 .hydrant
3877 .resolver()
3878 .resolve_raw_doc(&service_did_parsed)
3879 .await
3880 {
3881 Ok(d) => d,
3882 Err(e) => {
3883 tracing::error!("failed to resolve service did {service_did}: {e}");
3884 return proxy_request(req).await;
3885 }
3886 };
3887
3888 let doc_json = serde_json::to_value(&doc_data).unwrap_or(serde_json::json!({}));
3889 let mut endpoint = None;
3890
3891 if let Some(services) = doc_json.get("service").and_then(|s| s.as_array()) {
3892 for srv in services {
3893 if let Some(typ) = srv.get("type") {
3894 let is_fg = if let Some(t_str) = typ.as_str() {
3895 t_str == "BskyFeedGenerator"
3896 } else if let Some(t_arr) = typ.as_array() {
3897 t_arr
3898 .iter()
3899 .any(|v| v.as_str() == Some("BskyFeedGenerator"))
3900 } else {
3901 false
3902 };
3903
3904 if is_fg {
3905 if let Some(ep) = srv.get("serviceEndpoint").and_then(|e| e.as_str()) {
3906 endpoint = Some(ep.to_string());
3907 break;
3908 }
3909 }
3910 }
3911
3912 // fallback check by ID
3913 if srv.get("id").and_then(|i| i.as_str()) == Some("#bsky_fg") {
3914 if let Some(ep) = srv.get("serviceEndpoint").and_then(|e| e.as_str()) {
3915 endpoint = Some(ep.to_string());
3916 break;
3917 }
3918 }
3919 }
3920 }
3921
3922 let Some(ep) = endpoint else {
3923 return proxy_request(req).await;
3924 };
3925
3926 let mut skeleton_url = format!(
3927 "{}/xrpc/app.bsky.feed.getFeedSkeleton?feed={}",
3928 ep, params.feed
3929 );
3930 if let Some(limit) = params.limit {
3931 skeleton_url.push_str(&format!("&limit={}", limit));
3932 }
3933 if let Some(cursor) = params.cursor {
3934 skeleton_url.push_str(&format!("&cursor={}", cursor));
3935 }
3936
3937 let client = reqwest::Client::new();
3938 let mut req_builder = client.get(&skeleton_url);
3939 if let Some(auth) = req.headers().get("authorization") {
3940 req_builder = req_builder.header("authorization", auth);
3941 }
3942
3943 let res = match req_builder.send().await {
3944 Ok(r) => r,
3945 Err(e) => {
3946 tracing::error!("failed to send request to feed generator {skeleton_url}: {e}");
3947 return proxy_request(req).await;
3948 }
3949 };
3950
3951 let skeleton = match res.json::<serde_json::Value>().await {
3952 Ok(s) => s,
3953 Err(e) => {
3954 tracing::error!("failed to parse feed skeleton json: {e}");
3955 return proxy_request(req).await;
3956 }
3957 };
3958
3959 let mut hydrated_feed = Vec::new();
3960 if let Some(feed_items) = skeleton.get("feed").and_then(|f| f.as_array()) {
3961 for item in feed_items {
3962 if let Some(post_uri) = item.get("post").and_then(|p| p.as_str()) {
3963 match get_post_view(&app_state, post_uri, None).await {
3964 Ok(post_view) => {
3965 let mut feed_item = serde_json::json!({
3966 "post": post_view
3967 });
3968 if let Some(reason) = item.get("reason") {
3969 feed_item
3970 .as_object_mut()
3971 .unwrap()
3972 .insert("reason".to_string(), reason.clone());
3973 }
3974 hydrated_feed.push(feed_item);
3975 }
3976 Err(e) => {
3977 tracing::warn!("failed to get post view for {post_uri} in feed: {e}");
3978 }
3979 }
3980 }
3981 }
3982 }
3983
3984 Ok(Json(serde_json::json!({
3985 "feed": hydrated_feed,
3986 "cursor": skeleton.get("cursor")
3987 }))
3988 .into_response())
3989}
3990
3991async fn get_well_known_did(req: Request) -> Result<Response, StatusCode> {
3992 let host = req
3993 .headers()
3994 .get("host")
3995 .and_then(|h| h.to_str().ok())
3996 .unwrap_or("localhost:8000");
3997
3998 let did = if host.contains(':') {
3999 format!("did:web:{}", host.replace(':', "%3A"))
4000 } else {
4001 format!("did:web:{}", host)
4002 };
4003
4004 let scheme = if host.starts_with("localhost") || host.starts_with("127.0.0.1") {
4005 "http"
4006 } else {
4007 "https"
4008 };
4009 let service_endpoint = format!("{}://{}", scheme, host);
4010
4011 Ok(Json(serde_json::json!({
4012 "@context": [
4013 "https://www.w3.org/ns/did/v1"
4014 ],
4015 "id": did,
4016 "service": [
4017 {
4018 "id": "#bsky_notif",
4019 "type": "BskyNotificationService",
4020 "serviceEndpoint": service_endpoint
4021 },
4022 {
4023 "id": "#bsky_appview",
4024 "type": "BskyAppView",
4025 "serviceEndpoint": service_endpoint
4026 }
4027 ]
4028 }))
4029 .into_response())
4030}