very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use fjall::{Keyspace, OwnedWriteBatch};
2use jacquard_common::types::string::Did;
3use miette::{IntoDiagnostic, Result};
4
5use crate::db::types::TrimmedDid;
6use crate::filter::{FilterConfig, FilterMode};
7use crate::patch::SetUpdate;
8
9pub const MODE_KEY: &[u8] = b"m";
10pub const SIGNAL_PREFIX: u8 = b's';
11pub const COLLECTION_PREFIX: u8 = b'c';
12pub const EXCLUDE_PREFIX: u8 = b'x';
13pub const SEP: u8 = b'|';
14
15pub fn signal_key(val: &str) -> Result<Vec<u8>> {
16 let mut key = Vec::with_capacity(2 + val.len());
17 key.push(SIGNAL_PREFIX);
18 key.push(SEP);
19 key.extend_from_slice(val.as_bytes());
20 Ok(key)
21}
22
23pub fn collection_key(val: &str) -> Result<Vec<u8>> {
24 let mut key = Vec::with_capacity(2 + val.len());
25 key.push(COLLECTION_PREFIX);
26 key.push(SEP);
27 key.extend_from_slice(val.as_bytes());
28 Ok(key)
29}
30
31pub fn exclude_key(val: &str) -> Result<Vec<u8>> {
32 let did = Did::new(val).into_diagnostic()?;
33 let trimmed = TrimmedDid::from(&did);
34 let mut key = Vec::with_capacity(2 + trimmed.len());
35 key.push(EXCLUDE_PREFIX);
36 key.push(SEP);
37 trimmed.write_to_vec(&mut key);
38 Ok(key)
39}
40
41pub fn apply_patch(
42 batch: &mut OwnedWriteBatch,
43 ks: &Keyspace,
44 mode: Option<FilterMode>,
45 signals: Option<SetUpdate>,
46 collections: Option<SetUpdate>,
47 excludes: Option<SetUpdate>,
48) -> Result<()> {
49 if let Some(mode) = mode {
50 batch.insert(ks, MODE_KEY, rmp_serde::to_vec(&mode).into_diagnostic()?);
51 }
52
53 apply_set_update(batch, ks, SIGNAL_PREFIX, signals)?;
54 apply_set_update(batch, ks, COLLECTION_PREFIX, collections)?;
55 apply_set_update(batch, ks, EXCLUDE_PREFIX, excludes)?;
56
57 Ok(())
58}
59
60fn apply_set_update(
61 batch: &mut OwnedWriteBatch,
62 ks: &Keyspace,
63 prefix: u8,
64 update: Option<SetUpdate>,
65) -> Result<()> {
66 let Some(update) = update else { return Ok(()) };
67
68 let key_fn = match prefix {
69 SIGNAL_PREFIX => signal_key,
70 COLLECTION_PREFIX => collection_key,
71 EXCLUDE_PREFIX => exclude_key,
72 _ => unreachable!(),
73 };
74
75 match update {
76 SetUpdate::Set(values) => {
77 let scan_prefix = [prefix, SEP];
78 for guard in ks.prefix(scan_prefix) {
79 let (k, _) = guard.into_inner().into_diagnostic()?;
80 batch.remove(ks, k);
81 }
82 for val in values {
83 batch.insert(ks, key_fn(&val)?, []);
84 }
85 }
86 SetUpdate::Patch(map) => {
87 for (val, add) in map {
88 let key = key_fn(&val)?;
89 if add {
90 batch.insert(ks, key, []);
91 } else {
92 batch.remove(ks, key);
93 }
94 }
95 }
96 }
97
98 Ok(())
99}
100
101pub fn load(ks: &Keyspace) -> Result<FilterConfig> {
102 let mode = ks
103 .get(MODE_KEY)
104 .into_diagnostic()?
105 .map(|v| rmp_serde::from_slice(&v).into_diagnostic())
106 .transpose()?
107 .unwrap_or(FilterMode::Filter);
108
109 let mut config = FilterConfig::new(mode);
110
111 let signal_prefix = [SIGNAL_PREFIX, SEP];
112 for guard in ks.prefix(signal_prefix) {
113 let (k, _) = guard.into_inner().into_diagnostic()?;
114 let val = std::str::from_utf8(&k[signal_prefix.len()..]).into_diagnostic()?;
115 config.signals.push(val.into());
116 }
117
118 let col_prefix = [COLLECTION_PREFIX, SEP];
119 for guard in ks.prefix(col_prefix) {
120 let (k, _) = guard.into_inner().into_diagnostic()?;
121 let val = std::str::from_utf8(&k[col_prefix.len()..]).into_diagnostic()?;
122 config.collections.push(val.into());
123 }
124
125 Ok(config)
126}
127
128pub fn read_set(ks: &Keyspace, prefix: u8) -> Result<Vec<String>> {
129 let scan_prefix = [prefix, SEP];
130 let mut out = Vec::new();
131 for guard in ks.prefix(scan_prefix) {
132 let (k, _) = guard.into_inner().into_diagnostic()?;
133 let val_bytes = &k[2..];
134 let val = if prefix == EXCLUDE_PREFIX {
135 TrimmedDid::try_from(val_bytes)?.to_did().to_string()
136 } else {
137 std::str::from_utf8(val_bytes).into_diagnostic()?.to_owned()
138 };
139 out.push(val);
140 }
141 Ok(out)
142}
143
144#[cfg(test)]
145mod tests {
146 use smol_str::SmolStr;
147
148 use super::*;
149
150 #[test]
151 fn test_filter_keys() {
152 assert_eq!(
153 signal_key("app.bsky.feed.like").unwrap(),
154 b"s|app.bsky.feed.like"
155 );
156 assert_eq!(
157 collection_key("app.bsky.feed.post").unwrap(),
158 b"c|app.bsky.feed.post"
159 );
160 }
161
162 #[test]
163 fn test_exclude_key_trimmed() {
164 let did = "did:plc:yk4q3id7id6p5z3bypvshc64";
165 let key = exclude_key(did).unwrap();
166 assert_eq!(key[0], EXCLUDE_PREFIX);
167 assert_eq!(key[1], SEP);
168 // TAG_PLC (1) + 15 bytes
169 assert_eq!(key.len(), 2 + 1 + 15);
170
171 let parsed = TrimmedDid::try_from(&key[2..]).unwrap();
172 assert_eq!(parsed.to_did().as_str(), did);
173 }
174
175 #[test]
176 fn test_apply_and_load() -> Result<()> {
177 let tmp = tempfile::tempdir().into_diagnostic()?;
178 let keyspace = fjall::Database::builder(tmp.path())
179 .open()
180 .into_diagnostic()?;
181 let ks = keyspace
182 .keyspace("filter", Default::default)
183 .into_diagnostic()?;
184
185 let mut batch = keyspace.batch();
186 let signals = SetUpdate::Set(vec!["a.b.c".to_string()]);
187 let collections = SetUpdate::Set(vec!["d.e.f".to_string()]);
188 let excludes = SetUpdate::Set(vec!["did:plc:yk4q3id7id6p5z3bypvshc64".to_string()]);
189
190 apply_patch(
191 &mut batch,
192 &ks,
193 Some(FilterMode::Filter),
194 Some(signals),
195 Some(collections),
196 Some(excludes),
197 )?;
198 batch.commit().into_diagnostic()?;
199
200 let config = load(&ks)?;
201 assert_eq!(config.mode, FilterMode::Filter);
202 assert_eq!(config.signals, vec![SmolStr::new("a.b.c")]);
203 assert_eq!(config.collections, vec![SmolStr::new("d.e.f")]);
204
205 let excludes = read_set(&ks, EXCLUDE_PREFIX)?;
206 assert_eq!(excludes, vec!["did:plc:yk4q3id7id6p5z3bypvshc64"]);
207
208 Ok(())
209 }
210}