very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust
fjall
at-protocol
atproto
indexer
1use std::fmt::{Debug, Display};
2
3use bytes::Bytes;
4use jacquard_common::types::cid::IpldCid;
5use jacquard_common::types::nsid::Nsid;
6use jacquard_common::types::string::{Did, Rkey};
7use jacquard_common::types::tid::Tid;
8use jacquard_common::{CowStr, IntoStatic, types::string::Handle};
9use jacquard_repo::commit::Commit as AtpCommit;
10use serde::{Deserialize, Serialize, Serializer};
11use serde_json::Value;
12use smol_str::{SmolStr, ToSmolStr};
13
14use crate::db::types::{DbTid, DidKey};
15
16#[cfg(feature = "indexer_stream")]
17use crate::db::types::{DbAction, DbRkey, TrimmedDid};
18use crate::resolver::MiniDoc;
19
20pub(crate) mod v2 {
21 use super::*;
22
23 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24 pub enum RepoStatus {
25 Backfilling,
26 Synced,
27 Error(SmolStr),
28 Deactivated,
29 Takendown,
30 Suspended,
31 }
32
33 #[derive(Debug, Clone, Serialize, Deserialize)]
34 pub(crate) struct Commit {
35 pub version: i64,
36 pub rev: DbTid,
37 pub data: IpldCid,
38 pub prev: Option<IpldCid>,
39 #[serde(with = "jacquard_common::serde_bytes_helper")]
40 pub sig: Bytes,
41 }
42
43 #[derive(Debug, Clone, Serialize, Deserialize)]
44 #[serde(bound(deserialize = "'i: 'de"))]
45 pub(crate) struct RepoState<'i> {
46 pub status: RepoStatus,
47 pub root: Option<Commit>,
48 pub last_message_time: Option<i64>,
49 pub last_updated_at: i64,
50 pub tracked: bool,
51 pub index_id: u64,
52 #[serde(borrow)]
53 pub signing_key: Option<DidKey<'i>>,
54 #[serde(borrow)]
55 pub pds: Option<CowStr<'i>>,
56 #[serde(borrow)]
57 pub handle: Option<Handle<'i>>,
58 }
59}
60
61pub(crate) mod v4 {
62 use super::*;
63 pub(crate) use v2::Commit;
64
65 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
66 pub enum RepoStatus {
67 /// repo is synced to latest commit from what we know of
68 Synced,
69 /// some unclassified fatal error
70 Error(SmolStr),
71 /// user has temporarily paused their overall account. content should
72 /// not be displayed or redistributed, but does not need to be deleted
73 /// from infrastructure. implied time-limited. also the initial state
74 /// for an account after migrating to another pds instance.
75 Deactivated,
76 /// host or service has takendown the account. implied permanent or
77 /// long-term, though may be reverted.
78 Takendown,
79 /// host or service has temporarily paused the account. implied
80 /// time-limited.
81 Suspended,
82 /// user or host has deleted the account, and content should be removed
83 /// from the network. implied permanent or long-term, though may be
84 /// reverted (deleted accounts may reactivate on the same or another
85 /// host).
86 ///
87 /// account is deleted; kept as a tombstone so stale commits arriving from the upstream
88 /// backfill window are not forwarded. active=false per spec.
89 Deleted,
90 /// host detected a repo sync problem. active may be true or false per spec;
91 /// the `active` field on `RepoState` is authoritative.
92 Desynchronized,
93 /// resource rate-limit exceeded. active may be true or false per spec;
94 /// the `active` field on `RepoState` is authoritative.
95 Throttled,
96 }
97
98 #[derive(Debug, Clone, Serialize, Deserialize)]
99 #[serde(bound(deserialize = "'i: 'de"))]
100 pub(crate) struct RepoState<'i> {
101 /// whether the upstream considers this account active.
102 /// services should use the `active` flag to control overall account visibility
103 pub active: bool,
104 pub status: RepoStatus,
105 pub root: Option<Commit>,
106 /// ms since epoch of the last firehose message we processed for this repo.
107 /// used to deduplicate identity / account events that can arrive from multiple relays at
108 /// different wall-clock times but represent the same underlying PDS event.
109 pub last_message_time: Option<i64>,
110 /// this is when we *ingested* any last updates
111 pub last_updated_at: i64, // unix timestamp
112 #[serde(borrow)]
113 pub signing_key: Option<DidKey<'i>>,
114 #[serde(borrow)]
115 pub pds: Option<CowStr<'i>>,
116 #[serde(borrow)]
117 pub handle: Option<Handle<'i>>,
118 }
119
120 #[derive(Debug, Clone, Serialize, Deserialize)]
121 pub(crate) struct RepoMetadata {
122 /// whether we are ingesting events for this repo
123 pub tracked: bool,
124 /// index id in pending keyspace (if backfilling)
125 pub index_id: u64,
126 }
127}
128
129pub(crate) use v4::*;
130
131impl<'c> From<AtpCommit<'c>> for Commit {
132 fn from(value: AtpCommit<'c>) -> Self {
133 Self {
134 data: value.data,
135 prev: value.prev,
136 rev: DbTid::from(&value.rev),
137 sig: value.sig,
138 version: value.version,
139 }
140 }
141}
142
143impl Commit {
144 pub(crate) fn into_atp_commit<'i>(self, did: Did<'i>) -> Option<AtpCommit<'i>> {
145 // version < 0 is a sentinel used in v2 migration for repos with no commit data
146 if self.version < 0 {
147 return None;
148 }
149 Some(AtpCommit {
150 did,
151 rev: self.rev.to_tid(),
152 data: self.data,
153 prev: self.prev,
154 sig: self.sig,
155 version: self.version,
156 })
157 }
158}
159
160impl Display for RepoStatus {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 RepoStatus::Synced => write!(f, "synced"),
164 RepoStatus::Error(e) => write!(f, "error({e})"),
165 RepoStatus::Deactivated => write!(f, "deactivated"),
166 RepoStatus::Takendown => write!(f, "takendown"),
167 RepoStatus::Suspended => write!(f, "suspended"),
168 RepoStatus::Deleted => write!(f, "deleted"),
169 RepoStatus::Desynchronized => write!(f, "desynchronized"),
170 RepoStatus::Throttled => write!(f, "throttled"),
171 }
172 }
173}
174
175impl RepoMetadata {}
176
177#[cfg(feature = "indexer")]
178mod indexer {
179 use super::*;
180
181 impl RepoMetadata {
182 pub fn backfilling(index_id: u64) -> Self {
183 Self {
184 index_id,
185 tracked: true,
186 }
187 }
188 }
189
190 impl ResyncState {
191 pub fn next_backoff(retry_count: u32) -> i64 {
192 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h
193 let base = 60;
194 let cap = 3600;
195 let mult = 2u64.pow(retry_count.min(10)) as i64;
196 let delay = (base * mult).min(cap);
197
198 // add +/- 10% jitter
199 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64;
200 let delay = (delay as f64 + jitter) as i64;
201
202 chrono::Utc::now().timestamp() + delay
203 }
204 }
205
206 #[cfg(feature = "indexer_stream")]
207 #[derive(Clone, Debug)]
208 pub(crate) enum BroadcastEvent {
209 Persisted(#[allow(dead_code)] u64),
210 /// a durable record event with optional inline block bytes for live tailing.
211 ///
212 /// used to avoid re-reading `events`/`blocks` from the database when tailing.
213 LiveRecord(std::sync::Arc<super::LiveRecordEvent>),
214 Ephemeral(Box<MarshallableEvt<'static>>),
215 }
216
217 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
218 pub(crate) enum GaugeState {
219 Synced,
220 Pending,
221 Resync(Option<ResyncErrorKind>),
222 }
223
224 impl GaugeState {
225 pub fn is_resync(&self) -> bool {
226 matches!(self, GaugeState::Resync(_))
227 }
228 }
229}
230
231#[cfg(feature = "indexer")]
232pub(crate) use indexer::*;
233
234impl<'i> RepoState<'i> {
235 pub fn backfilling() -> Self {
236 Self {
237 active: true,
238 status: RepoStatus::Desynchronized,
239 root: None,
240 last_updated_at: chrono::Utc::now().timestamp(),
241 handle: None,
242 pds: None,
243 signing_key: None,
244 last_message_time: None,
245 }
246 }
247
248 // advances the high-water mark to event_ms if it's newer than what we've seen
249 pub fn advance_message_time(&mut self, event_ms: i64) {
250 self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0)));
251 }
252
253 // updates last_updated_at to now
254 pub fn touch(&mut self) {
255 self.last_updated_at = chrono::Utc::now().timestamp();
256 }
257
258 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool {
259 let new_signing_key = doc.key.map(From::from);
260 let changed = self.pds.as_deref() != Some(doc.pds.as_str())
261 || self.handle != doc.handle
262 || self.signing_key != new_signing_key;
263 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr()));
264 self.handle = doc.handle;
265 self.signing_key = new_signing_key;
266 changed
267 }
268}
269
270impl<'i> IntoStatic for RepoState<'i> {
271 type Output = RepoState<'static>;
272
273 fn into_static(self) -> Self::Output {
274 RepoState {
275 active: self.active,
276 status: self.status,
277 root: self.root,
278 last_updated_at: self.last_updated_at,
279 handle: self.handle.map(IntoStatic::into_static),
280 pds: self.pds.map(IntoStatic::into_static),
281 signing_key: self.signing_key.map(IntoStatic::into_static),
282 last_message_time: self.last_message_time,
283 }
284 }
285}
286
287#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
288pub(crate) enum ResyncErrorKind {
289 Ratelimited,
290 Transport,
291 Generic,
292}
293
294#[derive(Debug, Clone, Serialize, Deserialize)]
295pub(crate) enum ResyncState {
296 Error {
297 kind: ResyncErrorKind,
298 retry_count: u32,
299 next_retry: i64, // unix timestamp
300 },
301 Gone {
302 status: RepoStatus, // deactivated, takendown, suspended
303 },
304}
305
306#[derive(Debug, Serialize, Clone)]
307pub enum EventType {
308 Record,
309 Identity,
310 Account,
311}
312
313impl AsRef<str> for EventType {
314 fn as_ref(&self) -> &str {
315 match self {
316 Self::Record => "record",
317 Self::Identity => "identity",
318 Self::Account => "account",
319 }
320 }
321}
322
323fn event_type_ser_str<S: Serializer>(v: &EventType, s: S) -> Result<S::Ok, S::Error> {
324 s.serialize_str(v.as_ref())
325}
326
327#[derive(Debug, Serialize, Clone)]
328pub struct MarshallableEvt<'i> {
329 pub id: u64,
330 #[serde(rename = "type")]
331 #[serde(serialize_with = "event_type_ser_str")]
332 pub kind: EventType,
333 #[serde(borrow)]
334 #[serde(skip_serializing_if = "Option::is_none")]
335 pub record: Option<RecordEvt<'i>>,
336 #[serde(borrow)]
337 #[serde(skip_serializing_if = "Option::is_none")]
338 pub identity: Option<IdentityEvt<'i>>,
339 #[serde(borrow)]
340 #[serde(skip_serializing_if = "Option::is_none")]
341 pub account: Option<AccountEvt<'i>>,
342}
343
344#[derive(Debug, Serialize, Clone)]
345pub struct RecordEvt<'i> {
346 pub live: bool,
347 #[serde(borrow)]
348 pub did: Did<'i>,
349 pub rev: Tid,
350 pub collection: Nsid<'i>,
351 pub rkey: Rkey<'i>,
352 pub action: CowStr<'i>,
353 #[serde(skip_serializing_if = "Option::is_none")]
354 pub record: Option<Value>,
355 #[serde(skip_serializing_if = "Option::is_none")]
356 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")]
357 pub cid: Option<IpldCid>,
358}
359
360#[derive(Debug, Serialize, Clone)]
361pub struct IdentityEvt<'i> {
362 #[serde(borrow)]
363 pub did: Did<'i>,
364 #[serde(skip_serializing_if = "Option::is_none")]
365 pub handle: Option<Handle<'i>>,
366}
367
368#[derive(Debug, Serialize, Clone)]
369pub struct AccountEvt<'i> {
370 #[serde(borrow)]
371 pub did: Did<'i>,
372 pub active: bool,
373 #[serde(skip_serializing_if = "Option::is_none")]
374 pub status: Option<CowStr<'i>>,
375}
376
377#[cfg(feature = "indexer_stream")]
378#[derive(Serialize, Deserialize, Clone)]
379pub(crate) enum StoredData {
380 Nothing,
381 Ptr(IpldCid),
382 #[serde(with = "jacquard_common::serde_bytes_helper")]
383 Block(Bytes),
384}
385
386#[cfg(feature = "indexer_stream")]
387impl StoredData {
388 pub fn is_nothing(&self) -> bool {
389 matches!(self, StoredData::Nothing)
390 }
391}
392
393#[cfg(feature = "indexer_stream")]
394impl Default for StoredData {
395 fn default() -> Self {
396 Self::Nothing
397 }
398}
399
400#[cfg(feature = "indexer_stream")]
401impl Debug for StoredData {
402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
403 match self {
404 Self::Nothing => f.write_str("nothing"),
405 Self::Block(_) => f.write_str("<block>"),
406 Self::Ptr(cid) => write!(f, "{cid}"),
407 }
408 }
409}
410
411#[cfg(feature = "indexer_stream")]
412#[derive(Debug, Serialize, Deserialize, Clone)]
413#[serde(bound(deserialize = "'i: 'de"))]
414pub(crate) struct StoredEvent<'i> {
415 #[serde(default)]
416 pub live: bool,
417 #[serde(borrow)]
418 pub did: TrimmedDid<'i>,
419 pub rev: DbTid,
420 #[serde(borrow)]
421 pub collection: CowStr<'i>,
422 pub rkey: DbRkey,
423 pub action: DbAction,
424 #[serde(default)]
425 #[serde(skip_serializing_if = "StoredData::is_nothing")]
426 pub data: StoredData,
427}
428
429/// a durable record event that is also emitted on the in-memory stream after commit.
430///
431/// `inline_block` is only used for live tailing to avoid loading the record block from `blocks`.
432/// cursor replay continues to read from the database.
433#[cfg(feature = "indexer_stream")]
434#[derive(Debug, Clone)]
435pub(crate) struct LiveRecordEvent {
436 pub id: u64,
437 pub stored: StoredEvent<'static>,
438 pub inline_block: Option<Bytes>,
439}
440
441#[cfg(feature = "relay")]
442#[derive(Clone)]
443pub(crate) enum RelayBroadcast {
444 Persisted(#[allow(dead_code)] u64),
445 #[allow(dead_code)]
446 Ephemeral(u64, bytes::Bytes),
447}