very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[all] implement backlinks

dawn 1a5b4640 c737e0b5

+1276 -11
+2
Cargo.toml
··· 4 4 edition = "2024" 5 5 6 6 [features] 7 + default = ["backlinks"] 7 8 sync_all = [] 9 + backlinks = [] 8 10 9 11 [dependencies] 10 12 tokio = { version = "1.0", features = ["full"] }
+33
README.md
··· 168 168 169 169 returns `{ count }`. 170 170 171 + #### blue.microcosm.links.* 172 + 173 + hydrant implements a subset of [microcosm constellation](https://constellation.microcosm.blue/) when it's built with the `backlinks` cargo feature (`cargo build --features backlinks`). 174 + 175 + when enabled, hydrant indexes all AT URI and DID references found inside stored records into a reverse index. this lets you efficiently answer "what records link to this subject?". 176 + 177 + ##### blue.microcosm.links.getBacklinks 178 + 179 + return records that link to a given subject. 180 + 181 + | param | required | description | 182 + | :--- | :--- | :--- | 183 + | `subject` | yes | AT URI or DID to look up backlinks for. | 184 + | `source` | no | filter by source collection, e.g. `app.bsky.feed.like`. also accepts `collection:path` form to further filter by field path, e.g. `app.bsky.feed.like:subject.uri`. the path is matched against the dotted field path within the record (`.` is prepended automatically). | 185 + | `limit` | no | max results to return (default 50, max 100). | 186 + | `cursor` | no | opaque pagination cursor from a previous response. | 187 + | `reverse` | no | if `true`, return results in reverse order (default `false`). | 188 + 189 + returns `{ backlinks: [{ uri, cid }], cursor? }`. 190 + 191 + results are ordered by source record rkey (ascending by default, descending when `reverse=true`). the cursor is stable across new insertions for TID rkey records. 192 + 193 + ##### blue.microcosm.links.getBacklinksCount 194 + 195 + return the number of records that link to a given subject. 196 + 197 + | param | required | description | 198 + | :--- | :--- | :--- | 199 + | `subject` | yes | AT URI or DID to count backlinks for. | 200 + | `source` | no | filter by source collection (same format as `getBacklinks`). | 201 + 202 + returns `{ count }`. 203 + 171 204 ### event stream 172 205 173 206 - `GET /stream`: subscribe to the event stream.
+10 -2
src/api/mod.rs
··· 15 15 mod xrpc; 16 16 17 17 pub async fn serve(hydrant: Hydrant, port: u16) -> miette::Result<()> { 18 - let app = Router::new() 18 + #[allow(unused_mut)] 19 + let mut app = Router::new() 19 20 .route("/health", get(|| async { "OK" })) 20 21 .route("/stats", get(stats::get_stats)) 21 22 .nest("/stream", stream::router()) ··· 23 24 .merge(filter::router()) 24 25 .merge(repos::router()) 25 26 .merge(ingestion::router()) 26 - .merge(db::router()) 27 + .merge(db::router()); 28 + 29 + #[cfg(feature = "backlinks")] 30 + { 31 + app = app.merge(crate::backlinks::api::router()); 32 + } 33 + 34 + let app = app 27 35 .with_state(hydrant) 28 36 .layer(TraceLayer::new_for_http()) 29 37 .layer(CorsLayer::permissive());
+19
src/backfill/mod.rs
··· 627 627 if !ephemeral { 628 628 batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref()); 629 629 batch.insert(&app_state.db.records, db_key, cid_raw); 630 + #[cfg(feature = "backlinks")] 631 + if let Ok(value) = serde_ipld_dagcbor::from_slice::<jacquard_common::Data>(val.as_ref()) { 632 + crate::backlinks::store::index_record( 633 + &mut batch, 634 + &app_state.db.backlinks, 635 + did.as_str(), 636 + collection, 637 + &rkey.to_smolstr(), 638 + &value, 639 + )?; 640 + } 630 641 } 631 642 632 643 added_blocks += 1; ··· 662 673 &app_state.db.records, 663 674 keys::record_key(&did, &collection, &rkey), 664 675 ); 676 + #[cfg(feature = "backlinks")] 677 + crate::backlinks::store::delete_record( 678 + &mut batch, 679 + &app_state.db.backlinks, 680 + did.as_str(), 681 + &collection, 682 + &rkey.to_smolstr(), 683 + )?; 665 684 666 685 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 667 686 let evt = StoredEvent {
+115
src/backlinks/api.rs
··· 1 + use axum::{Json, Router, extract::State, http::StatusCode, routing::get}; 2 + use serde::{Deserialize, Serialize}; 3 + use smol_str::SmolStr; 4 + 5 + use crate::control::Hydrant; 6 + 7 + pub fn router() -> Router<Hydrant> { 8 + Router::new() 9 + .route( 10 + "/xrpc/blue.microcosm.links.getBacklinks", 11 + get(handle_get_backlinks), 12 + ) 13 + .route( 14 + "/xrpc/blue.microcosm.links.getBacklinksCount", 15 + get(handle_get_backlinks_count), 16 + ) 17 + } 18 + 19 + #[derive(Deserialize)] 20 + pub struct GetBacklinksParams { 21 + pub subject: String, 22 + /// filter by source collection, optionally with a path suffix `collection:path` 23 + pub source: Option<String>, 24 + pub limit: Option<u64>, 25 + pub cursor: Option<String>, 26 + pub reverse: Option<bool>, 27 + } 28 + 29 + #[derive(Serialize)] 30 + pub struct Backlink { 31 + pub uri: SmolStr, 32 + pub cid: SmolStr, 33 + } 34 + 35 + #[derive(Serialize)] 36 + pub struct GetBacklinksOutput { 37 + pub backlinks: Vec<Backlink>, 38 + #[serde(skip_serializing_if = "Option::is_none")] 39 + pub cursor: Option<String>, 40 + } 41 + 42 + #[derive(Deserialize)] 43 + pub struct GetBacklinksCountParams { 44 + pub subject: String, 45 + pub source: Option<String>, 46 + } 47 + 48 + #[derive(Serialize)] 49 + pub struct GetBacklinksCountOutput { 50 + pub count: u64, 51 + } 52 + 53 + pub async fn handle_get_backlinks( 54 + State(hydrant): State<Hydrant>, 55 + axum::extract::Query(params): axum::extract::Query<GetBacklinksParams>, 56 + ) -> Result<Json<GetBacklinksOutput>, StatusCode> { 57 + let limit = params.limit.unwrap_or(50).min(100) as usize; 58 + let reverse = params.reverse.unwrap_or(false); 59 + 60 + let cursor_bytes = params 61 + .cursor 62 + .as_deref() 63 + .and_then(|c| data_encoding::BASE64URL_NOPAD.decode(c.as_bytes()).ok()); 64 + 65 + let mut fetch = hydrant 66 + .backlinks 67 + .fetch(params.subject) 68 + .limit(limit) 69 + .reverse(reverse); 70 + if let Some(ref src) = params.source { 71 + fetch = fetch.source(src); 72 + } 73 + if let Some(c) = cursor_bytes { 74 + fetch = fetch.cursor(c); 75 + } 76 + 77 + let page = fetch 78 + .run() 79 + .await 80 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 81 + 82 + let next_cursor = page 83 + .next_cursor 84 + .map(|b| data_encoding::BASE64URL_NOPAD.encode(&b)); 85 + let backlinks = page 86 + .backlinks 87 + .into_iter() 88 + .map(|e| Backlink { 89 + uri: e.uri, 90 + cid: e.cid, 91 + }) 92 + .collect(); 93 + 94 + Ok(Json(GetBacklinksOutput { 95 + backlinks, 96 + cursor: next_cursor, 97 + })) 98 + } 99 + 100 + pub async fn handle_get_backlinks_count( 101 + State(hydrant): State<Hydrant>, 102 + axum::extract::Query(params): axum::extract::Query<GetBacklinksCountParams>, 103 + ) -> Result<Json<GetBacklinksCountOutput>, StatusCode> { 104 + let mut count_q = hydrant.backlinks.count(params.subject); 105 + if let Some(ref src) = params.source { 106 + count_q = count_q.source(src); 107 + } 108 + 109 + let count = count_q 110 + .run() 111 + .await 112 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 113 + 114 + Ok(Json(GetBacklinksCountOutput { count })) 115 + }
+206
src/backlinks/links.rs
··· 1 + use jacquard_common::Data; 2 + use jacquard_common::types::string::AtprotoStr; 3 + use jacquard_common::types::uri::Uri; 4 + use smol_str::SmolStr; 5 + 6 + pub struct Link { 7 + /// dotted field path within the record, e.g. `.subject.uri` or `.facets[].features[app.bsky.richtext.facet#link].uri` 8 + pub path: SmolStr, 9 + pub target: SmolStr, 10 + } 11 + 12 + /// extract all link targets from a record, with their field paths. 13 + /// 14 + /// paths follow the same convention as constellation: 15 + /// - start with `.` (root-level field `subject` → `.subject`) 16 + /// - object keys: `{parent}.{key}` 17 + /// - plain array elements: `{parent}[]` 18 + /// - typed array elements (objects with `$type`): `{parent}[{$type}]` 19 + /// 20 + /// results are sorted by (path, target) and deduplicated. 21 + pub fn extract_links(value: &Data) -> Vec<Link> { 22 + let mut links = Vec::new(); 23 + walk(value, "", &mut links); 24 + links.sort_unstable_by(|a, b| a.path.cmp(&b.path).then(a.target.cmp(&b.target))); 25 + links.dedup_by(|a, b| a.path == b.path && a.target == b.target); 26 + links 27 + } 28 + 29 + fn walk(value: &Data<'_>, path: &str, links: &mut Vec<Link>) { 30 + match value { 31 + Data::String(s) => { 32 + let target = match s { 33 + AtprotoStr::AtUri(uri) => uri.as_str(), 34 + AtprotoStr::Did(did) => did.as_str(), 35 + AtprotoStr::Uri(Uri::At(uri)) => uri.as_str(), 36 + AtprotoStr::Uri(Uri::Did(did)) => did.as_str(), 37 + _ => return, 38 + }; 39 + links.push(Link { 40 + path: SmolStr::new(path), 41 + target: SmolStr::new(target), 42 + }); 43 + } 44 + Data::Array(arr) => { 45 + for item in &arr.0 { 46 + let child_path = match item { 47 + Data::Object(obj) => { 48 + let type_tag = obj.0.get("$type").and_then(|v| match v { 49 + Data::String(s) => Some(s.as_str()), 50 + _ => None, 51 + }); 52 + match type_tag { 53 + Some(t) => format!("{path}[{t}]"), 54 + None => format!("{path}[]"), 55 + } 56 + } 57 + _ => format!("{path}[]"), 58 + }; 59 + walk(item, &child_path, links); 60 + } 61 + } 62 + Data::Object(obj) => { 63 + for (k, v) in &obj.0 { 64 + walk(v, &format!("{path}.{k}"), links); 65 + } 66 + } 67 + _ => {} 68 + } 69 + } 70 + 71 + #[cfg(test)] 72 + mod tests { 73 + use super::*; 74 + 75 + fn make(s: &str) -> Data<'static> { 76 + Data::from_json_owned(serde_json::Value::String(s.to_string())).unwrap() 77 + } 78 + 79 + fn targets(links: &[Link]) -> Vec<&str> { 80 + links.iter().map(|l| l.target.as_str()).collect() 81 + } 82 + 83 + fn paths(links: &[Link]) -> Vec<&str> { 84 + links.iter().map(|l| l.path.as_str()).collect() 85 + } 86 + 87 + #[test] 88 + fn extracts_at_uri() { 89 + let val = make("at://did:plc:abc123/app.bsky.feed.post/tid123"); 90 + let links = extract_links(&val); 91 + assert_eq!( 92 + targets(&links), 93 + vec!["at://did:plc:abc123/app.bsky.feed.post/tid123"] 94 + ); 95 + assert_eq!(paths(&links), vec![""]); 96 + } 97 + 98 + #[test] 99 + fn extracts_did() { 100 + let val = make("did:plc:abc123"); 101 + let links = extract_links(&val); 102 + assert_eq!(targets(&links), vec!["did:plc:abc123"]); 103 + } 104 + 105 + #[test] 106 + fn skips_plain_string() { 107 + assert!(extract_links(&make("hello world")).is_empty()); 108 + } 109 + 110 + #[test] 111 + fn skips_handle() { 112 + assert!(extract_links(&make("alice.bsky.social")).is_empty()); 113 + } 114 + 115 + #[test] 116 + fn object_paths() { 117 + let json = serde_json::json!({ 118 + "subject": { 119 + "uri": "at://did:plc:abc/app.bsky.feed.post/tid", 120 + "cid": "bafyreiclp443lavogvhj3d2ob2cxbfuscni2k5jk7bebjzg7khl3esabwq" 121 + } 122 + }); 123 + let val = Data::from_json_owned(json).unwrap(); 124 + let links = extract_links(&val); 125 + assert_eq!(links.len(), 1); 126 + assert_eq!( 127 + links[0].target.as_str(), 128 + "at://did:plc:abc/app.bsky.feed.post/tid" 129 + ); 130 + assert_eq!(links[0].path.as_str(), ".subject.uri"); 131 + } 132 + 133 + #[test] 134 + fn array_paths_plain() { 135 + let json = serde_json::json!([ 136 + "at://did:plc:a/app.bsky.feed.post/tid1", 137 + "at://did:plc:b/app.bsky.feed.post/tid2" 138 + ]); 139 + let val = Data::from_json_owned(json).unwrap(); 140 + let links = extract_links(&val); 141 + assert_eq!(links.len(), 2); 142 + assert!(links.iter().all(|l| l.path.as_str() == "[]")); 143 + } 144 + 145 + #[test] 146 + fn array_paths_typed_object() { 147 + let json = serde_json::json!({ 148 + "features": [ 149 + { 150 + "$type": "app.bsky.richtext.facet#link", 151 + "uri": "at://did:plc:abc/app.bsky.feed.post/tid" 152 + } 153 + ] 154 + }); 155 + let val = Data::from_json_owned(json).unwrap(); 156 + let links = extract_links(&val); 157 + assert_eq!(links.len(), 1); 158 + assert_eq!( 159 + links[0].path.as_str(), 160 + ".features[app.bsky.richtext.facet#link].uri" 161 + ); 162 + } 163 + 164 + #[test] 165 + fn array_paths_untyped_object() { 166 + let json = serde_json::json!({ 167 + "items": [ 168 + { "subject": "did:plc:abc" } 169 + ] 170 + }); 171 + let val = Data::from_json_owned(json).unwrap(); 172 + let links = extract_links(&val); 173 + assert_eq!(links.len(), 1); 174 + assert_eq!(links[0].path.as_str(), ".items[].subject"); 175 + } 176 + 177 + #[test] 178 + fn deduplicates_same_path_and_target() { 179 + let uri = "at://did:plc:abc123/app.bsky.feed.post/tid123"; 180 + let json = serde_json::json!([uri, uri]); 181 + let val = Data::from_json_owned(json).unwrap(); 182 + assert_eq!(extract_links(&val).len(), 1); 183 + } 184 + 185 + #[test] 186 + fn keeps_same_target_at_different_paths() { 187 + let uri = "at://did:plc:abc/app.bsky.feed.post/tid"; 188 + let json = serde_json::json!({ 189 + "root": { "uri": uri }, 190 + "parent": { "uri": uri } 191 + }); 192 + let val = Data::from_json_owned(json).unwrap(); 193 + let links = extract_links(&val); 194 + assert_eq!(links.len(), 2); 195 + let ps: Vec<_> = links.iter().map(|l| l.path.as_str()).collect(); 196 + assert!(ps.contains(&".parent.uri")); 197 + assert!(ps.contains(&".root.uri")); 198 + } 199 + 200 + #[test] 201 + fn skips_cid_link() { 202 + let json = serde_json::json!({"$link": "bafyreiclp443lavogvhj3d2ob2cxbfuscni2k5jk7bebjzg7khl3esabwq"}); 203 + let val = Data::from_json_owned(json).unwrap(); 204 + assert!(extract_links(&val).is_empty()); 205 + } 206 + }
+232
src/backlinks/mod.rs
··· 1 + pub mod links; 2 + pub mod store; 3 + 4 + pub(crate) mod api; 5 + 6 + use std::ops::Bound; 7 + use std::sync::Arc; 8 + 9 + use miette::{IntoDiagnostic, Result}; 10 + use smol_str::SmolStr; 11 + use tracing::warn; 12 + 13 + use crate::state::AppState; 14 + 15 + /// handle for querying the backlinks index. 16 + #[derive(Clone)] 17 + pub struct BacklinksControl(pub(crate) Arc<AppState>); 18 + 19 + pub struct BacklinkEntry { 20 + pub uri: SmolStr, 21 + pub cid: SmolStr, 22 + } 23 + 24 + pub struct BacklinksPage { 25 + pub backlinks: Vec<BacklinkEntry>, 26 + /// raw key bytes of the last returned entry, used as cursor for next page 27 + pub next_cursor: Option<Vec<u8>>, 28 + } 29 + 30 + impl BacklinksControl { 31 + /// begin building a paginated fetch of backlinks pointing to `subject`. 32 + pub fn fetch(&self, subject: impl Into<String>) -> BacklinksFetch { 33 + BacklinksFetch { 34 + state: self.0.clone(), 35 + subject: subject.into(), 36 + collection: None, 37 + path: None, 38 + limit: 50, 39 + reverse: false, 40 + cursor: None, 41 + } 42 + } 43 + 44 + /// begin building a backlinks count query for `subject`. 45 + pub fn count(&self, subject: impl Into<String>) -> BacklinksCount { 46 + BacklinksCount { 47 + state: self.0.clone(), 48 + subject: subject.into(), 49 + collection: None, 50 + path: None, 51 + } 52 + } 53 + } 54 + 55 + /// a paginated fetch against the backlinks reverse index. 56 + /// 57 + /// obtain via [`BacklinksControl::fetch`] and execute with [`BacklinksFetch::run`]. 58 + pub struct BacklinksFetch { 59 + state: Arc<AppState>, 60 + subject: String, 61 + collection: Option<String>, 62 + path: Option<String>, 63 + limit: usize, 64 + reverse: bool, 65 + cursor: Option<Vec<u8>>, 66 + } 67 + 68 + impl BacklinksFetch { 69 + /// filter results to the given source collection NSID. 70 + pub fn collection(mut self, collection: impl Into<String>) -> Self { 71 + self.collection = Some(collection.into()); 72 + self 73 + } 74 + 75 + /// filter results to a specific dotted field path within the record (e.g. `.subject.uri`). 76 + /// has no effect unless a collection is also set. 77 + pub fn path(mut self, path: impl Into<String>) -> Self { 78 + self.path = Some(path.into()); 79 + self 80 + } 81 + 82 + /// convenience: parse a `source` parameter of the form `collection` or `collection:path` 83 + /// and apply both filters. the path component (if present) has `.` prepended automatically. 84 + pub fn source(self, source: &str) -> Self { 85 + match source.split_once(':') { 86 + Some((col, p)) => self.collection(col).path(format!(".{p}")), 87 + None => self.collection(source), 88 + } 89 + } 90 + 91 + /// maximum number of results to return (default 50). 92 + pub fn limit(mut self, limit: usize) -> Self { 93 + self.limit = limit; 94 + self 95 + } 96 + 97 + /// if `true`, return results in reverse key order (default `false`). 98 + pub fn reverse(mut self, reverse: bool) -> Self { 99 + self.reverse = reverse; 100 + self 101 + } 102 + 103 + /// resume from the raw key bytes returned as `next_cursor` from a previous page. 104 + pub fn cursor(mut self, cursor: Vec<u8>) -> Self { 105 + self.cursor = Some(cursor); 106 + self 107 + } 108 + 109 + /// execute and return a page of backlink entries. 110 + pub async fn run(self) -> Result<BacklinksPage> { 111 + tokio::task::spawn_blocking(move || { 112 + let db = &self.state.db; 113 + let scan_prefix = store::reverse_scan_prefix( 114 + &self.subject, 115 + self.collection.as_deref(), 116 + self.path.as_deref(), 117 + ); 118 + 119 + let iter: Box<dyn Iterator<Item = _>> = if !self.reverse { 120 + if let Some(ref cursor_bytes) = self.cursor { 121 + Box::new(db.backlinks.range::<&[u8], _>(( 122 + Bound::Excluded(cursor_bytes.as_slice()), 123 + Bound::Unbounded, 124 + ))) 125 + } else { 126 + Box::new(db.backlinks.range(scan_prefix.as_slice()..)) 127 + } 128 + } else { 129 + // for reverse scans, bound the end at the cursor (exclusive) or the 130 + // prefix end (increment last byte to get an exclusive upper bound) 131 + let end: Vec<u8> = if let Some(ref cursor_bytes) = self.cursor { 132 + cursor_bytes.clone() 133 + } else { 134 + let mut end = scan_prefix.clone(); 135 + if let Some(last) = end.last_mut() { 136 + *last = last.saturating_add(1); 137 + } 138 + end 139 + }; 140 + Box::new( 141 + db.backlinks 142 + .range(scan_prefix.as_slice()..end.as_slice()) 143 + .rev(), 144 + ) 145 + }; 146 + 147 + let mut backlinks = Vec::with_capacity(self.limit); 148 + let mut next_cursor = None; 149 + 150 + for item in iter { 151 + let (key, _) = item.into_inner().into_diagnostic()?; 152 + if !key.starts_with(scan_prefix.as_slice()) { 153 + break; 154 + } 155 + if backlinks.len() >= self.limit { 156 + next_cursor = Some(key.to_vec()); 157 + break; 158 + } 159 + 160 + let Some((col, _path, did, rkey)) = store::source_from_reverse_key(&key) else { 161 + warn!("backlinks: could not extract source from reverse key"); 162 + continue; 163 + }; 164 + let Some(cid) = store::lookup_cid_from_ks(&db.records, did, col, rkey) else { 165 + // record deleted after backlink was written 166 + continue; 167 + }; 168 + let at_uri = format!("at://{did}/{col}/{rkey}"); 169 + backlinks.push(BacklinkEntry { 170 + uri: SmolStr::new(at_uri), 171 + cid: SmolStr::new(cid), 172 + }); 173 + } 174 + 175 + Ok(BacklinksPage { 176 + backlinks, 177 + next_cursor, 178 + }) 179 + }) 180 + .await 181 + .into_diagnostic()? 182 + } 183 + } 184 + 185 + /// a count query against the backlinks reverse index. 186 + /// 187 + /// obtain via [`BacklinksControl::count`] and execute with [`BacklinksCount::run`]. 188 + pub struct BacklinksCount { 189 + state: Arc<AppState>, 190 + subject: String, 191 + collection: Option<String>, 192 + path: Option<String>, 193 + } 194 + 195 + impl BacklinksCount { 196 + /// filter to the given source collection NSID. 197 + pub fn collection(mut self, collection: impl Into<String>) -> Self { 198 + self.collection = Some(collection.into()); 199 + self 200 + } 201 + 202 + /// filter to a specific dotted field path within the record (e.g. `.subject.uri`). 203 + /// has no effect unless a collection is also set. 204 + pub fn path(mut self, path: impl Into<String>) -> Self { 205 + self.path = Some(path.into()); 206 + self 207 + } 208 + 209 + /// parse a `source` parameter of the form `collection` or `collection:path` 210 + /// and apply both filters. the path component (if present) has `.` prepended automatically. 211 + pub fn source(self, source: &str) -> Self { 212 + match source.split_once(':') { 213 + Some((col, p)) => self.collection(col).path(format!(".{p}")), 214 + None => self.collection(source), 215 + } 216 + } 217 + 218 + /// execute and return the total count of matching entries. 219 + pub async fn run(self) -> Result<u64> { 220 + tokio::task::spawn_blocking(move || { 221 + let db = &self.state.db; 222 + let scan_prefix = store::reverse_scan_prefix( 223 + &self.subject, 224 + self.collection.as_deref(), 225 + self.path.as_deref(), 226 + ); 227 + Ok(db.backlinks.prefix(&scan_prefix).count() as u64) 228 + }) 229 + .await 230 + .into_diagnostic()? 231 + } 232 + }
+317
src/backlinks/store.rs
··· 1 + use fjall::{Keyspace, OwnedWriteBatch}; 2 + use jacquard_common::types::string::Did; 3 + use jacquard_common::{Data, types::cid::Cid}; 4 + use miette::{IntoDiagnostic, Result}; 5 + 6 + use crate::db::keys::{self, SEP}; 7 + use crate::db::types::DbRkey; 8 + 9 + /// key format: `f|{did}|{collection}|{rkey}` 10 + /// 11 + /// value: msgpack-encoded `Vec<(String, String)>`, list of (path, target) pairs found in this record. 12 + pub fn forward_key(did: &str, collection: &str, rkey: &str) -> Vec<u8> { 13 + let mut key = Vec::with_capacity(2 + did.len() + 1 + collection.len() + 1 + rkey.len()); 14 + key.push(b'f'); 15 + key.push(SEP); 16 + key.extend_from_slice(did.as_bytes()); 17 + key.push(SEP); 18 + key.extend_from_slice(collection.as_bytes()); 19 + key.push(SEP); 20 + key.extend_from_slice(rkey.as_bytes()); 21 + key 22 + } 23 + 24 + /// key format: `r|{target}|{collection}|{path}|{did}|{rkey}` 25 + /// 26 + /// value: empty — the key itself carries all information. 27 + /// 28 + /// `|` separators are unambiguous because targets (AT URIs, DIDs), collection 29 + /// NSIDs, field paths, DIDs, and rkeys cannot contain `|`. 30 + /// prefix scans: 31 + /// - all backlinks to a target: `r|{target}|` 32 + /// - backlinks from a specific collection: `r|{target}|{collection}|` 33 + /// - backlinks from a specific collection+field path: `r|{target}|{collection}|{path}|` 34 + pub fn reverse_key(target: &str, collection: &str, path: &str, did: &str, rkey: &str) -> Vec<u8> { 35 + let mut key = Vec::with_capacity( 36 + 2 + target.len() + 1 + collection.len() + 1 + path.len() + 1 + did.len() + 1 + rkey.len(), 37 + ); 38 + key.push(b'r'); 39 + key.push(SEP); 40 + key.extend_from_slice(target.as_bytes()); 41 + key.push(SEP); 42 + key.extend_from_slice(collection.as_bytes()); 43 + key.push(SEP); 44 + key.extend_from_slice(path.as_bytes()); 45 + key.push(SEP); 46 + key.extend_from_slice(did.as_bytes()); 47 + key.push(SEP); 48 + key.extend_from_slice(rkey.as_bytes()); 49 + key 50 + } 51 + 52 + /// scan prefix for all reverse entries pointing to `target`. 53 + /// 54 + /// - no collection: `r|{target}|` 55 + /// - collection only: `r|{target}|{collection}|` (matches all paths for that collection) 56 + /// - collection + path: `r|{target}|{collection}|{path}|` (exact path match) 57 + /// 58 + /// path should include the leading `.` (e.g. `.subject.uri`). always ends with `|`. 59 + pub fn reverse_scan_prefix(target: &str, collection: Option<&str>, path: Option<&str>) -> Vec<u8> { 60 + let col_len = collection.map_or(0, |c| c.len() + 1); 61 + let path_len = if collection.is_some() { 62 + path.map_or(0, |p| p.len() + 1) 63 + } else { 64 + 0 65 + }; 66 + let mut prefix = Vec::with_capacity(2 + target.len() + 1 + col_len + path_len); 67 + prefix.push(b'r'); 68 + prefix.push(SEP); 69 + prefix.extend_from_slice(target.as_bytes()); 70 + prefix.push(SEP); 71 + if let Some(col) = collection { 72 + prefix.extend_from_slice(col.as_bytes()); 73 + prefix.push(SEP); 74 + if let Some(p) = path { 75 + prefix.extend_from_slice(p.as_bytes()); 76 + prefix.push(SEP); 77 + } 78 + } 79 + prefix 80 + } 81 + 82 + /// scan prefix for all forward entries from the given DID. 83 + pub fn forward_did_prefix(did: &Did) -> Vec<u8> { 84 + let did_str = did.as_str(); 85 + let mut prefix = Vec::with_capacity(2 + did_str.len() + 1); 86 + prefix.push(b'f'); 87 + prefix.push(SEP); 88 + prefix.extend_from_slice(did_str.as_bytes()); 89 + prefix.push(SEP); 90 + prefix 91 + } 92 + 93 + /// extract (collection, path, did, rkey) from a reverse key. 94 + /// 95 + /// reverse key format: `r|{target}|{collection}|{path}|{did}|{rkey}` 96 + /// parses the last four `|`-separated segments from the right. 97 + pub fn source_from_reverse_key(key: &[u8]) -> Option<(&str, &str, &str, &str)> { 98 + let rkey_sep = key.iter().rposition(|&b| b == SEP)?; 99 + let rkey = std::str::from_utf8(&key[rkey_sep + 1..]).ok()?; 100 + let did_sep = key[..rkey_sep].iter().rposition(|&b| b == SEP)?; 101 + let did = std::str::from_utf8(&key[did_sep + 1..rkey_sep]).ok()?; 102 + let path_sep = key[..did_sep].iter().rposition(|&b| b == SEP)?; 103 + let path = std::str::from_utf8(&key[path_sep + 1..did_sep]).ok()?; 104 + let col_sep = key[..path_sep].iter().rposition(|&b| b == SEP)?; 105 + let col = std::str::from_utf8(&key[col_sep + 1..path_sep]).ok()?; 106 + Some((col, path, did, rkey)) 107 + } 108 + 109 + /// index all links found in a record. 110 + /// 111 + /// on update, stale reverse entries from the previous version are removed first. 112 + /// on create with no links, no entries are written. 113 + pub fn index_record( 114 + batch: &mut OwnedWriteBatch, 115 + backlinks_ks: &Keyspace, 116 + did: &str, 117 + collection: &str, 118 + rkey: &str, 119 + value: &Data, 120 + ) -> Result<()> { 121 + let fwd_key = forward_key(did, collection, rkey); 122 + 123 + // remove reverse entries from the previous version of this record if any 124 + if let Some(old_bytes) = backlinks_ks.get(&fwd_key).into_diagnostic()? { 125 + let old_links: Vec<(String, String)> = 126 + rmp_serde::from_slice(&old_bytes).into_diagnostic()?; 127 + for (path, target) in &old_links { 128 + batch.remove( 129 + backlinks_ks, 130 + reverse_key(target, collection, path, did, rkey), 131 + ); 132 + } 133 + } 134 + 135 + let new_links = crate::backlinks::links::extract_links(value); 136 + if new_links.is_empty() { 137 + batch.remove(backlinks_ks, fwd_key); 138 + } else { 139 + for link in &new_links { 140 + batch.insert( 141 + backlinks_ks, 142 + reverse_key(&link.target, collection, &link.path, did, rkey), 143 + &[] as &[u8], 144 + ); 145 + } 146 + let fwd_pairs: Vec<(&str, &str)> = new_links 147 + .iter() 148 + .map(|l| (l.path.as_str(), l.target.as_str())) 149 + .collect(); 150 + let fwd_val = rmp_serde::to_vec(&fwd_pairs).into_diagnostic()?; 151 + batch.insert(backlinks_ks, fwd_key, fwd_val); 152 + } 153 + 154 + Ok(()) 155 + } 156 + 157 + /// remove all backlink index entries for a deleted record. 158 + pub fn delete_record( 159 + batch: &mut OwnedWriteBatch, 160 + backlinks_ks: &Keyspace, 161 + did: &str, 162 + collection: &str, 163 + rkey: &str, 164 + ) -> Result<()> { 165 + let fwd_key = forward_key(did, collection, rkey); 166 + let Some(old_bytes) = backlinks_ks.get(&fwd_key).into_diagnostic()? else { 167 + return Ok(()); 168 + }; 169 + let links: Vec<(String, String)> = rmp_serde::from_slice(&old_bytes).into_diagnostic()?; 170 + for (path, target) in &links { 171 + batch.remove( 172 + backlinks_ks, 173 + reverse_key(target, collection, path, did, rkey), 174 + ); 175 + } 176 + batch.remove(backlinks_ks, fwd_key); 177 + Ok(()) 178 + } 179 + 180 + /// remove all backlink index entries for every record in a deleted repo. 181 + /// 182 + /// scans the forward index by the `f|{did}|` prefix and removes both forward 183 + /// and reverse entries for each record found. 184 + pub fn delete_repo(batch: &mut OwnedWriteBatch, backlinks_ks: &Keyspace, did: &Did) -> Result<()> { 185 + let prefix = forward_did_prefix(did); 186 + let did_str = did.as_str(); 187 + 188 + for guard in backlinks_ks.prefix(&prefix) { 189 + let (fwd_key, fwd_val) = guard.into_inner().into_diagnostic()?; 190 + // fwd_key suffix after f|{did}| is {collection}|{rkey} 191 + let suffix = &fwd_key[prefix.len()..]; 192 + let Some(sep) = suffix.iter().position(|&b| b == SEP) else { 193 + tracing::warn!("backlinks: malformed forward key during delete_repo"); 194 + continue; 195 + }; 196 + let (Ok(col), Ok(rkey)) = ( 197 + std::str::from_utf8(&suffix[..sep]), 198 + std::str::from_utf8(&suffix[sep + 1..]), 199 + ) else { 200 + tracing::warn!("backlinks: invalid utf8 in forward key during delete_repo"); 201 + continue; 202 + }; 203 + 204 + let links: Vec<(String, String)> = match rmp_serde::from_slice(&fwd_val) { 205 + Ok(t) => t, 206 + Err(e) => { 207 + tracing::warn!( 208 + "backlinks: failed to decode forward value for {did_str}/{col}/{rkey}: {e}" 209 + ); 210 + continue; 211 + } 212 + }; 213 + for (path, target) in &links { 214 + batch.remove(backlinks_ks, reverse_key(target, col, path, did_str, rkey)); 215 + } 216 + batch.remove(backlinks_ks, fwd_key); 217 + } 218 + 219 + Ok(()) 220 + } 221 + 222 + /// look up the CID string for a record from the records keyspace. 223 + /// returns `None` if the record is not found (e.g. deleted after backlink was written). 224 + pub fn lookup_cid_from_ks( 225 + records: &Keyspace, 226 + did: &str, 227 + collection: &str, 228 + rkey: &str, 229 + ) -> Option<String> { 230 + let did = Did::new(did).ok()?; 231 + let db_rkey = DbRkey::new(rkey); 232 + let record_key = keys::record_key(&did, collection, &db_rkey); 233 + let cid_bytes = records.get(record_key).ok()??; 234 + Cid::new(&cid_bytes).ok().map(|c| c.as_str().to_string()) 235 + } 236 + 237 + #[cfg(test)] 238 + mod tests { 239 + use super::*; 240 + 241 + const TARGET: &str = "at://did:plc:abc/app.bsky.feed.post/tid"; 242 + const COLLECTION: &str = "app.bsky.feed.like"; 243 + const PATH: &str = ".subject.uri"; 244 + const SRC_DID: &str = "did:plc:xyz"; 245 + const SRC_RKEY: &str = "rkey1"; 246 + 247 + #[test] 248 + fn reverse_key_scan_prefix_no_collection() { 249 + let key = reverse_key(TARGET, COLLECTION, PATH, SRC_DID, SRC_RKEY); 250 + let prefix = reverse_scan_prefix(TARGET, None, None); 251 + assert!(key.starts_with(&prefix)); 252 + } 253 + 254 + #[test] 255 + fn reverse_key_scan_prefix_with_collection() { 256 + let key = reverse_key(TARGET, COLLECTION, PATH, SRC_DID, SRC_RKEY); 257 + let prefix = reverse_scan_prefix(TARGET, Some(COLLECTION), None); 258 + assert!(key.starts_with(&prefix)); 259 + } 260 + 261 + #[test] 262 + fn reverse_key_scan_prefix_with_collection_and_path() { 263 + let key = reverse_key(TARGET, COLLECTION, PATH, SRC_DID, SRC_RKEY); 264 + let prefix = reverse_scan_prefix(TARGET, Some(COLLECTION), Some(PATH)); 265 + assert!(key.starts_with(&prefix)); 266 + } 267 + 268 + #[test] 269 + fn reverse_key_collection_prefix_does_not_match_other_collection() { 270 + let collection2 = "app.bsky.feed.like2"; 271 + let key2 = reverse_key(TARGET, collection2, PATH, SRC_DID, SRC_RKEY); 272 + let prefix1 = reverse_scan_prefix(TARGET, Some(COLLECTION), None); 273 + assert!(!key2.starts_with(&prefix1)); 274 + } 275 + 276 + #[test] 277 + fn reverse_key_path_prefix_does_not_match_other_path() { 278 + let path2 = ".subject.uri2"; 279 + let key2 = reverse_key(TARGET, COLLECTION, path2, SRC_DID, SRC_RKEY); 280 + let prefix = reverse_scan_prefix(TARGET, Some(COLLECTION), Some(PATH)); 281 + assert!(!key2.starts_with(&prefix)); 282 + } 283 + 284 + #[test] 285 + fn collection_only_prefix_matches_all_paths() { 286 + let path2 = ".embed.uri"; 287 + let key2 = reverse_key(TARGET, COLLECTION, path2, SRC_DID, SRC_RKEY); 288 + let col_prefix = reverse_scan_prefix(TARGET, Some(COLLECTION), None); 289 + assert!(key2.starts_with(&col_prefix)); 290 + } 291 + 292 + #[test] 293 + fn source_from_reverse_key_roundtrip() { 294 + let key = reverse_key(TARGET, COLLECTION, PATH, SRC_DID, SRC_RKEY); 295 + let (col, path, did, rkey) = source_from_reverse_key(&key).unwrap(); 296 + assert_eq!(col, COLLECTION); 297 + assert_eq!(path, PATH); 298 + assert_eq!(did, SRC_DID); 299 + assert_eq!(rkey, SRC_RKEY); 300 + } 301 + 302 + #[test] 303 + fn forward_did_prefix_matches_key() { 304 + let did = jacquard_common::types::string::Did::new("did:plc:abc123").unwrap(); 305 + let prefix = forward_did_prefix(&did); 306 + let key = forward_key("did:plc:abc123", "app.bsky.feed.post", "tid123"); 307 + assert!(key.starts_with(&prefix)); 308 + } 309 + 310 + #[test] 311 + fn forward_did_prefix_does_not_match_other_did() { 312 + let did = jacquard_common::types::string::Did::new("did:plc:abc123").unwrap(); 313 + let prefix = forward_did_prefix(&did); 314 + let other_key = forward_key("did:plc:xyz999", "app.bsky.feed.post", "tid123"); 315 + assert!(!other_key.starts_with(&prefix)); 316 + } 317 + }
+9
src/config.rs
··· 157 157 pub filter_signals: Option<Vec<String>>, 158 158 pub filter_collections: Option<Vec<String>>, 159 159 pub filter_excludes: Option<Vec<String>>, 160 + /// enable backlinks indexing (only meaningful in non-ephemeral mode). 161 + /// set via `HYDRANT_ENABLE_BACKLINKS=true`. 162 + pub enable_backlinks: bool, 160 163 /// crawler sources: each entry pairs a URL with a discovery mode. 161 164 /// 162 165 /// set via `HYDRANT_CRAWLER_URLS` as a comma-separated list of `[mode::]url` entries, ··· 300 303 .filter(|s| !s.is_empty()) 301 304 .collect() 302 305 }); 306 + 307 + let enable_backlinks: bool = cfg!("ENABLE_BACKLINKS", false); 303 308 304 309 let default_mode = CrawlerMode::default_for(full_network); 305 310 let crawler_sources = match std::env::var("HYDRANT_CRAWLER_URLS") { ··· 353 358 filter_signals, 354 359 filter_collections, 355 360 filter_excludes, 361 + enable_backlinks, 356 362 crawler_sources, 357 363 }) 358 364 } ··· 467 473 } 468 474 if let Some(excludes) = &self.filter_excludes { 469 475 config_line!(f, "filter excludes", format_args!("{:?}", excludes))?; 476 + } 477 + if self.enable_backlinks { 478 + config_line!(f, "backlinks", "enabled")?; 470 479 } 471 480 Ok(()) 472 481 }
+4
src/control.rs
··· 74 74 pub filter: FilterControl, 75 75 pub repos: ReposControl, 76 76 pub db: DbControl, 77 + #[cfg(feature = "backlinks")] 78 + pub backlinks: crate::backlinks::BacklinksControl, 77 79 pub(crate) state: Arc<AppState>, 78 80 config: Arc<Config>, 79 81 started: Arc<AtomicBool>, ··· 148 150 filter: FilterControl(state.clone()), 149 151 repos: ReposControl(state.clone()), 150 152 db: DbControl(state.clone()), 153 + #[cfg(feature = "backlinks")] 154 + backlinks: crate::backlinks::BacklinksControl(state.clone()), 151 155 state, 152 156 config: Arc::new(config), 153 157 started: Arc::new(AtomicBool::new(false)),
+33 -9
src/db/mod.rs
··· 48 48 pub counts: Keyspace, 49 49 pub filter: Keyspace, 50 50 pub crawler: Keyspace, 51 + #[cfg(feature = "backlinks")] 52 + pub backlinks: Keyspace, 51 53 pub event_tx: broadcast::Sender<BroadcastEvent>, 52 54 pub next_event_id: Arc<AtomicU64>, 53 55 pub counts_map: HashMap<SmolStr, u64>, ··· 99 101 }}; 100 102 } 101 103 104 + const fn kb(v: u32) -> u32 { 105 + v * 1024 106 + } 107 + const fn mb(v: u64) -> u64 { 108 + v * 1024 * 1024 109 + } 110 + 102 111 impl Db { 103 112 pub fn open(cfg: &crate::config::Config) -> Result<Self> { 104 - const fn kb(v: u32) -> u32 { 105 - v * 1024 106 - } 107 - const fn mb(v: u64) -> u64 { 108 - v * 1024 * 1024 109 - } 110 - 111 113 let db = Database::builder(&cfg.database_path) 112 114 .cache_size(cfg.cache_size * 2_u64.pow(20) / 2) 113 115 .manual_journal_persist(true) ··· 152 154 } 153 155 None 154 156 }; 155 - let dicts = ["repos", "blocks", "events"].into_iter().fold( 157 + let dicts = ["repos", "blocks", "events", "backlinks"].into_iter().fold( 156 158 std::collections::HashMap::new(), 157 159 |mut acc, name| { 158 160 let Some(dict) = load_dict(name) else { ··· 258 260 // cids arent compressable, most rkeys are TIDs so they will get compressed 259 261 // by prefix truncation anyway 260 262 .data_block_compression_policy(CompressionPolicy::disabled()) 261 - .data_block_restart_interval_policy(RestartIntervalPolicy::new([9, 18])), 263 + .data_block_restart_interval_policy(RestartIntervalPolicy::new([16, 32])), 262 264 )?; 263 265 let cursors = open_ks( 264 266 "cursors", ··· 354 356 .data_block_restart_interval_policy(RestartIntervalPolicy::all(2)), 355 357 )?; 356 358 359 + #[cfg(feature = "backlinks")] 360 + let backlinks = open_ks( 361 + "backlinks", 362 + opts() 363 + // lets assume we hit backlinks, getBacklinks will use iterator anyway 364 + // so we can disable bloom filter okay 365 + .expect_point_read_hits(true) 366 + .max_memtable_size(mb(cfg.db_records_memtable_size_mb)) 367 + // same as records basically 368 + .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(32)])) 369 + .data_block_compression_policy(CompressionPolicy::new([ 370 + CompressionType::None, 371 + get_compression("backlinks", 3), 372 + ])) 373 + .data_block_restart_interval_policy(RestartIntervalPolicy::new([16, 32])), 374 + )?; 375 + 357 376 // when adding new keyspaces, make sure to add them to the /stats endpoint 358 377 // and also update any relevant /debug/* endpoints 359 378 ··· 411 430 counts, 412 431 filter, 413 432 crawler, 433 + #[cfg(feature = "backlinks")] 434 + backlinks, 414 435 event_tx, 415 436 counts_map, 416 437 next_event_id: Arc::new(AtomicU64::new(last_id + 1)), ··· 422 443 "blocks" => &self.blocks, 423 444 "events" => &self.events, 424 445 "repos" => &self.repos, 446 + "backlinks" => &self.backlinks, 425 447 _ => miette::bail!("unknown keyspace for training: {ks_name}"), 426 448 }; 427 449 ··· 523 545 compact(self.filter.clone()), 524 546 compact(self.crawler.clone()), 525 547 )?; 548 + #[cfg(feature = "backlinks")] 549 + compact(self.backlinks.clone()).await?; 526 550 Ok(()) 527 551 } 528 552
+2
src/lib.rs
··· 5 5 6 6 pub(crate) mod api; 7 7 pub(crate) mod backfill; 8 + #[cfg(feature = "backlinks")] 9 + pub(crate) mod backlinks; 8 10 pub(crate) mod crawler; 9 11 pub(crate) mod db; 10 12 pub(crate) mod ingest;
+26
src/ops.rs
··· 2 2 use fjall::Slice; 3 3 4 4 use jacquard_common::CowStr; 5 + #[cfg(feature = "backlinks")] 6 + use jacquard_common::Data; 5 7 use jacquard_common::IntoStatic; 6 8 use jacquard_common::types::cid::Cid; 7 9 use jacquard_common::types::crypto::PublicKey; ··· 117 119 let k = guard.key().into_diagnostic()?; 118 120 batch.remove(&db.counts, k); 119 121 } 122 + 123 + // 5. remove backlinks for all records in this repo 124 + #[cfg(feature = "backlinks")] 125 + crate::backlinks::store::delete_repo(batch, &db.backlinks, did)?; 120 126 121 127 Ok(()) 122 128 } ··· 310 316 records_delta += 1; 311 317 *collection_deltas.entry(collection).or_default() += 1; 312 318 } 319 + #[cfg(feature = "backlinks")] 320 + if let Ok(value) = serde_ipld_dagcbor::from_slice::<Data>(bytes.as_ref()) { 321 + crate::backlinks::store::index_record( 322 + batch, 323 + &db.backlinks, 324 + did.as_str(), 325 + collection, 326 + &rkey.to_smolstr(), 327 + &value, 328 + )?; 329 + } 313 330 None 314 331 } else if action == DbAction::Create || action == DbAction::Update { 315 332 Some(bytes.clone()) ··· 324 341 // accumulate counts 325 342 records_delta -= 1; 326 343 *collection_deltas.entry(collection).or_default() -= 1; 344 + 345 + #[cfg(feature = "backlinks")] 346 + crate::backlinks::store::delete_record( 347 + batch, 348 + &db.backlinks, 349 + did.as_str(), 350 + collection, 351 + &rkey.to_smolstr(), 352 + )?; 327 353 } 328 354 329 355 None
+7
tests/common.nu
··· 56 56 "target/debug/hydrant" 57 57 } 58 58 59 + # build the hydrant binary with extra cargo features (space-separated string) 60 + export def build-hydrant-features [features: string] { 61 + print $"building hydrant with features: ($features)..." 62 + cargo build --features $features 63 + "target/debug/hydrant" 64 + } 65 + 59 66 # start hydrant in the background 60 67 export def start-hydrant [binary: string, db_path: string, port: int] { 61 68 let log_file = $"($db_path)/hydrant.log"