very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 210 lines 6.2 kB view raw
1use fjall::{Keyspace, OwnedWriteBatch}; 2use jacquard_common::types::string::Did; 3use miette::{IntoDiagnostic, Result}; 4 5use crate::db::types::TrimmedDid; 6use crate::filter::{FilterConfig, FilterMode}; 7use crate::patch::SetUpdate; 8 9pub const MODE_KEY: &[u8] = b"m"; 10pub const SIGNAL_PREFIX: u8 = b's'; 11pub const COLLECTION_PREFIX: u8 = b'c'; 12pub const EXCLUDE_PREFIX: u8 = b'x'; 13pub const SEP: u8 = b'|'; 14 15pub fn signal_key(val: &str) -> Result<Vec<u8>> { 16 let mut key = Vec::with_capacity(2 + val.len()); 17 key.push(SIGNAL_PREFIX); 18 key.push(SEP); 19 key.extend_from_slice(val.as_bytes()); 20 Ok(key) 21} 22 23pub fn collection_key(val: &str) -> Result<Vec<u8>> { 24 let mut key = Vec::with_capacity(2 + val.len()); 25 key.push(COLLECTION_PREFIX); 26 key.push(SEP); 27 key.extend_from_slice(val.as_bytes()); 28 Ok(key) 29} 30 31pub fn exclude_key(val: &str) -> Result<Vec<u8>> { 32 let did = Did::new(val).into_diagnostic()?; 33 let trimmed = TrimmedDid::from(&did); 34 let mut key = Vec::with_capacity(2 + trimmed.len()); 35 key.push(EXCLUDE_PREFIX); 36 key.push(SEP); 37 trimmed.write_to_vec(&mut key); 38 Ok(key) 39} 40 41pub fn apply_patch( 42 batch: &mut OwnedWriteBatch, 43 ks: &Keyspace, 44 mode: Option<FilterMode>, 45 signals: Option<SetUpdate>, 46 collections: Option<SetUpdate>, 47 excludes: Option<SetUpdate>, 48) -> Result<()> { 49 if let Some(mode) = mode { 50 batch.insert(ks, MODE_KEY, rmp_serde::to_vec(&mode).into_diagnostic()?); 51 } 52 53 apply_set_update(batch, ks, SIGNAL_PREFIX, signals)?; 54 apply_set_update(batch, ks, COLLECTION_PREFIX, collections)?; 55 apply_set_update(batch, ks, EXCLUDE_PREFIX, excludes)?; 56 57 Ok(()) 58} 59 60fn apply_set_update( 61 batch: &mut OwnedWriteBatch, 62 ks: &Keyspace, 63 prefix: u8, 64 update: Option<SetUpdate>, 65) -> Result<()> { 66 let Some(update) = update else { return Ok(()) }; 67 68 let key_fn = match prefix { 69 SIGNAL_PREFIX => signal_key, 70 COLLECTION_PREFIX => collection_key, 71 EXCLUDE_PREFIX => exclude_key, 72 _ => unreachable!(), 73 }; 74 75 match update { 76 SetUpdate::Set(values) => { 77 let scan_prefix = [prefix, SEP]; 78 for guard in ks.prefix(scan_prefix) { 79 let (k, _) = guard.into_inner().into_diagnostic()?; 80 batch.remove(ks, k); 81 } 82 for val in values { 83 batch.insert(ks, key_fn(&val)?, []); 84 } 85 } 86 SetUpdate::Patch(map) => { 87 for (val, add) in map { 88 let key = key_fn(&val)?; 89 if add { 90 batch.insert(ks, key, []); 91 } else { 92 batch.remove(ks, key); 93 } 94 } 95 } 96 } 97 98 Ok(()) 99} 100 101pub fn load(ks: &Keyspace) -> Result<FilterConfig> { 102 let mode = ks 103 .get(MODE_KEY) 104 .into_diagnostic()? 105 .map(|v| rmp_serde::from_slice(&v).into_diagnostic()) 106 .transpose()? 107 .unwrap_or(FilterMode::Filter); 108 109 let mut config = FilterConfig::new(mode); 110 111 let signal_prefix = [SIGNAL_PREFIX, SEP]; 112 for guard in ks.prefix(signal_prefix) { 113 let (k, _) = guard.into_inner().into_diagnostic()?; 114 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?; 115 config.signals.push(val.into()); 116 } 117 118 let col_prefix = [COLLECTION_PREFIX, SEP]; 119 for guard in ks.prefix(col_prefix) { 120 let (k, _) = guard.into_inner().into_diagnostic()?; 121 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?; 122 config.collections.push(val.into()); 123 } 124 125 Ok(config) 126} 127 128pub fn read_set(ks: &Keyspace, prefix: u8) -> Result<Vec<String>> { 129 let scan_prefix = [prefix, SEP]; 130 let mut out = Vec::new(); 131 for guard in ks.prefix(scan_prefix) { 132 let (k, _) = guard.into_inner().into_diagnostic()?; 133 let val_bytes = &k[2..]; 134 let val = if prefix == EXCLUDE_PREFIX { 135 TrimmedDid::try_from(val_bytes)?.to_did().to_string() 136 } else { 137 std::str::from_utf8(val_bytes).into_diagnostic()?.to_owned() 138 }; 139 out.push(val); 140 } 141 Ok(out) 142} 143 144#[cfg(test)] 145mod tests { 146 use smol_str::SmolStr; 147 148 use super::*; 149 150 #[test] 151 fn test_filter_keys() { 152 assert_eq!( 153 signal_key("app.bsky.feed.like").unwrap(), 154 b"s|app.bsky.feed.like" 155 ); 156 assert_eq!( 157 collection_key("app.bsky.feed.post").unwrap(), 158 b"c|app.bsky.feed.post" 159 ); 160 } 161 162 #[test] 163 fn test_exclude_key_trimmed() { 164 let did = "did:plc:yk4q3id7id6p5z3bypvshc64"; 165 let key = exclude_key(did).unwrap(); 166 assert_eq!(key[0], EXCLUDE_PREFIX); 167 assert_eq!(key[1], SEP); 168 // TAG_PLC (1) + 15 bytes 169 assert_eq!(key.len(), 2 + 1 + 15); 170 171 let parsed = TrimmedDid::try_from(&key[2..]).unwrap(); 172 assert_eq!(parsed.to_did().as_str(), did); 173 } 174 175 #[test] 176 fn test_apply_and_load() -> Result<()> { 177 let tmp = tempfile::tempdir().into_diagnostic()?; 178 let keyspace = fjall::Database::builder(tmp.path()) 179 .open() 180 .into_diagnostic()?; 181 let ks = keyspace 182 .keyspace("filter", Default::default) 183 .into_diagnostic()?; 184 185 let mut batch = keyspace.batch(); 186 let signals = SetUpdate::Set(vec!["a.b.c".to_string()]); 187 let collections = SetUpdate::Set(vec!["d.e.f".to_string()]); 188 let excludes = SetUpdate::Set(vec!["did:plc:yk4q3id7id6p5z3bypvshc64".to_string()]); 189 190 apply_patch( 191 &mut batch, 192 &ks, 193 Some(FilterMode::Filter), 194 Some(signals), 195 Some(collections), 196 Some(excludes), 197 )?; 198 batch.commit().into_diagnostic()?; 199 200 let config = load(&ks)?; 201 assert_eq!(config.mode, FilterMode::Filter); 202 assert_eq!(config.signals, vec![SmolStr::new("a.b.c")]); 203 assert_eq!(config.collections, vec![SmolStr::new("d.e.f")]); 204 205 let excludes = read_set(&ks, EXCLUDE_PREFIX)?; 206 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]); 207 208 Ok(()) 209 } 210}