very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 447 lines 14 kB view raw
1use std::fmt::{Debug, Display}; 2 3use bytes::Bytes; 4use jacquard_common::types::cid::IpldCid; 5use jacquard_common::types::nsid::Nsid; 6use jacquard_common::types::string::{Did, Rkey}; 7use jacquard_common::types::tid::Tid; 8use jacquard_common::{CowStr, IntoStatic, types::string::Handle}; 9use jacquard_repo::commit::Commit as AtpCommit; 10use serde::{Deserialize, Serialize, Serializer}; 11use serde_json::Value; 12use smol_str::{SmolStr, ToSmolStr}; 13 14use crate::db::types::{DbTid, DidKey}; 15 16#[cfg(feature = "indexer_stream")] 17use crate::db::types::{DbAction, DbRkey, TrimmedDid}; 18use crate::resolver::MiniDoc; 19 20pub(crate) mod v2 { 21 use super::*; 22 23 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 24 pub enum RepoStatus { 25 Backfilling, 26 Synced, 27 Error(SmolStr), 28 Deactivated, 29 Takendown, 30 Suspended, 31 } 32 33 #[derive(Debug, Clone, Serialize, Deserialize)] 34 pub(crate) struct Commit { 35 pub version: i64, 36 pub rev: DbTid, 37 pub data: IpldCid, 38 pub prev: Option<IpldCid>, 39 #[serde(with = "jacquard_common::serde_bytes_helper")] 40 pub sig: Bytes, 41 } 42 43 #[derive(Debug, Clone, Serialize, Deserialize)] 44 #[serde(bound(deserialize = "'i: 'de"))] 45 pub(crate) struct RepoState<'i> { 46 pub status: RepoStatus, 47 pub root: Option<Commit>, 48 pub last_message_time: Option<i64>, 49 pub last_updated_at: i64, 50 pub tracked: bool, 51 pub index_id: u64, 52 #[serde(borrow)] 53 pub signing_key: Option<DidKey<'i>>, 54 #[serde(borrow)] 55 pub pds: Option<CowStr<'i>>, 56 #[serde(borrow)] 57 pub handle: Option<Handle<'i>>, 58 } 59} 60 61pub(crate) mod v4 { 62 use super::*; 63 pub(crate) use v2::Commit; 64 65 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] 66 pub enum RepoStatus { 67 /// repo is synced to latest commit from what we know of 68 Synced, 69 /// some unclassified fatal error 70 Error(SmolStr), 71 /// user has temporarily paused their overall account. content should 72 /// not be displayed or redistributed, but does not need to be deleted 73 /// from infrastructure. implied time-limited. also the initial state 74 /// for an account after migrating to another pds instance. 75 Deactivated, 76 /// host or service has takendown the account. implied permanent or 77 /// long-term, though may be reverted. 78 Takendown, 79 /// host or service has temporarily paused the account. implied 80 /// time-limited. 81 Suspended, 82 /// user or host has deleted the account, and content should be removed 83 /// from the network. implied permanent or long-term, though may be 84 /// reverted (deleted accounts may reactivate on the same or another 85 /// host). 86 /// 87 /// account is deleted; kept as a tombstone so stale commits arriving from the upstream 88 /// backfill window are not forwarded. active=false per spec. 89 Deleted, 90 /// host detected a repo sync problem. active may be true or false per spec; 91 /// the `active` field on `RepoState` is authoritative. 92 Desynchronized, 93 /// resource rate-limit exceeded. active may be true or false per spec; 94 /// the `active` field on `RepoState` is authoritative. 95 Throttled, 96 } 97 98 #[derive(Debug, Clone, Serialize, Deserialize)] 99 #[serde(bound(deserialize = "'i: 'de"))] 100 pub(crate) struct RepoState<'i> { 101 /// whether the upstream considers this account active. 102 /// services should use the `active` flag to control overall account visibility 103 pub active: bool, 104 pub status: RepoStatus, 105 pub root: Option<Commit>, 106 /// ms since epoch of the last firehose message we processed for this repo. 107 /// used to deduplicate identity / account events that can arrive from multiple relays at 108 /// different wall-clock times but represent the same underlying PDS event. 109 pub last_message_time: Option<i64>, 110 /// this is when we *ingested* any last updates 111 pub last_updated_at: i64, // unix timestamp 112 #[serde(borrow)] 113 pub signing_key: Option<DidKey<'i>>, 114 #[serde(borrow)] 115 pub pds: Option<CowStr<'i>>, 116 #[serde(borrow)] 117 pub handle: Option<Handle<'i>>, 118 } 119 120 #[derive(Debug, Clone, Serialize, Deserialize)] 121 pub(crate) struct RepoMetadata { 122 /// whether we are ingesting events for this repo 123 pub tracked: bool, 124 /// index id in pending keyspace (if backfilling) 125 pub index_id: u64, 126 } 127} 128 129pub(crate) use v4::*; 130 131impl<'c> From<AtpCommit<'c>> for Commit { 132 fn from(value: AtpCommit<'c>) -> Self { 133 Self { 134 data: value.data, 135 prev: value.prev, 136 rev: DbTid::from(&value.rev), 137 sig: value.sig, 138 version: value.version, 139 } 140 } 141} 142 143impl Commit { 144 pub(crate) fn into_atp_commit<'i>(self, did: Did<'i>) -> Option<AtpCommit<'i>> { 145 // version < 0 is a sentinel used in v2 migration for repos with no commit data 146 if self.version < 0 { 147 return None; 148 } 149 Some(AtpCommit { 150 did, 151 rev: self.rev.to_tid(), 152 data: self.data, 153 prev: self.prev, 154 sig: self.sig, 155 version: self.version, 156 }) 157 } 158} 159 160impl Display for RepoStatus { 161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 162 match self { 163 RepoStatus::Synced => write!(f, "synced"), 164 RepoStatus::Error(e) => write!(f, "error({e})"), 165 RepoStatus::Deactivated => write!(f, "deactivated"), 166 RepoStatus::Takendown => write!(f, "takendown"), 167 RepoStatus::Suspended => write!(f, "suspended"), 168 RepoStatus::Deleted => write!(f, "deleted"), 169 RepoStatus::Desynchronized => write!(f, "desynchronized"), 170 RepoStatus::Throttled => write!(f, "throttled"), 171 } 172 } 173} 174 175impl RepoMetadata {} 176 177#[cfg(feature = "indexer")] 178mod indexer { 179 use super::*; 180 181 impl RepoMetadata { 182 pub fn backfilling(index_id: u64) -> Self { 183 Self { 184 index_id, 185 tracked: true, 186 } 187 } 188 } 189 190 impl ResyncState { 191 pub fn next_backoff(retry_count: u32) -> i64 { 192 // exponential backoff: 1m, 2m, 4m, 8m... up to 1h 193 let base = 60; 194 let cap = 3600; 195 let mult = 2u64.pow(retry_count.min(10)) as i64; 196 let delay = (base * mult).min(cap); 197 198 // add +/- 10% jitter 199 let jitter = (rand::random::<f64>() * 0.2 - 0.1) * delay as f64; 200 let delay = (delay as f64 + jitter) as i64; 201 202 chrono::Utc::now().timestamp() + delay 203 } 204 } 205 206 #[cfg(feature = "indexer_stream")] 207 #[derive(Clone, Debug)] 208 pub(crate) enum BroadcastEvent { 209 Persisted(#[allow(dead_code)] u64), 210 /// a durable record event with optional inline block bytes for live tailing. 211 /// 212 /// used to avoid re-reading `events`/`blocks` from the database when tailing. 213 LiveRecord(std::sync::Arc<super::LiveRecordEvent>), 214 Ephemeral(Box<MarshallableEvt<'static>>), 215 } 216 217 #[derive(Debug, PartialEq, Eq, Clone, Copy)] 218 pub(crate) enum GaugeState { 219 Synced, 220 Pending, 221 Resync(Option<ResyncErrorKind>), 222 } 223 224 impl GaugeState { 225 pub fn is_resync(&self) -> bool { 226 matches!(self, GaugeState::Resync(_)) 227 } 228 } 229} 230 231#[cfg(feature = "indexer")] 232pub(crate) use indexer::*; 233 234impl<'i> RepoState<'i> { 235 pub fn backfilling() -> Self { 236 Self { 237 active: true, 238 status: RepoStatus::Desynchronized, 239 root: None, 240 last_updated_at: chrono::Utc::now().timestamp(), 241 handle: None, 242 pds: None, 243 signing_key: None, 244 last_message_time: None, 245 } 246 } 247 248 // advances the high-water mark to event_ms if it's newer than what we've seen 249 pub fn advance_message_time(&mut self, event_ms: i64) { 250 self.last_message_time = Some(event_ms.max(self.last_message_time.unwrap_or(0))); 251 } 252 253 // updates last_updated_at to now 254 pub fn touch(&mut self) { 255 self.last_updated_at = chrono::Utc::now().timestamp(); 256 } 257 258 pub fn update_from_doc(&mut self, doc: MiniDoc) -> bool { 259 let new_signing_key = doc.key.map(From::from); 260 let changed = self.pds.as_deref() != Some(doc.pds.as_str()) 261 || self.handle != doc.handle 262 || self.signing_key != new_signing_key; 263 self.pds = Some(CowStr::Owned(doc.pds.to_smolstr())); 264 self.handle = doc.handle; 265 self.signing_key = new_signing_key; 266 changed 267 } 268} 269 270impl<'i> IntoStatic for RepoState<'i> { 271 type Output = RepoState<'static>; 272 273 fn into_static(self) -> Self::Output { 274 RepoState { 275 active: self.active, 276 status: self.status, 277 root: self.root, 278 last_updated_at: self.last_updated_at, 279 handle: self.handle.map(IntoStatic::into_static), 280 pds: self.pds.map(IntoStatic::into_static), 281 signing_key: self.signing_key.map(IntoStatic::into_static), 282 last_message_time: self.last_message_time, 283 } 284 } 285} 286 287#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] 288pub(crate) enum ResyncErrorKind { 289 Ratelimited, 290 Transport, 291 Generic, 292} 293 294#[derive(Debug, Clone, Serialize, Deserialize)] 295pub(crate) enum ResyncState { 296 Error { 297 kind: ResyncErrorKind, 298 retry_count: u32, 299 next_retry: i64, // unix timestamp 300 }, 301 Gone { 302 status: RepoStatus, // deactivated, takendown, suspended 303 }, 304} 305 306#[derive(Debug, Serialize, Clone)] 307pub enum EventType { 308 Record, 309 Identity, 310 Account, 311} 312 313impl AsRef<str> for EventType { 314 fn as_ref(&self) -> &str { 315 match self { 316 Self::Record => "record", 317 Self::Identity => "identity", 318 Self::Account => "account", 319 } 320 } 321} 322 323fn event_type_ser_str<S: Serializer>(v: &EventType, s: S) -> Result<S::Ok, S::Error> { 324 s.serialize_str(v.as_ref()) 325} 326 327#[derive(Debug, Serialize, Clone)] 328pub struct MarshallableEvt<'i> { 329 pub id: u64, 330 #[serde(rename = "type")] 331 #[serde(serialize_with = "event_type_ser_str")] 332 pub kind: EventType, 333 #[serde(borrow)] 334 #[serde(skip_serializing_if = "Option::is_none")] 335 pub record: Option<RecordEvt<'i>>, 336 #[serde(borrow)] 337 #[serde(skip_serializing_if = "Option::is_none")] 338 pub identity: Option<IdentityEvt<'i>>, 339 #[serde(borrow)] 340 #[serde(skip_serializing_if = "Option::is_none")] 341 pub account: Option<AccountEvt<'i>>, 342} 343 344#[derive(Debug, Serialize, Clone)] 345pub struct RecordEvt<'i> { 346 pub live: bool, 347 #[serde(borrow)] 348 pub did: Did<'i>, 349 pub rev: Tid, 350 pub collection: Nsid<'i>, 351 pub rkey: Rkey<'i>, 352 pub action: CowStr<'i>, 353 #[serde(skip_serializing_if = "Option::is_none")] 354 pub record: Option<Value>, 355 #[serde(skip_serializing_if = "Option::is_none")] 356 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")] 357 pub cid: Option<IpldCid>, 358} 359 360#[derive(Debug, Serialize, Clone)] 361pub struct IdentityEvt<'i> { 362 #[serde(borrow)] 363 pub did: Did<'i>, 364 #[serde(skip_serializing_if = "Option::is_none")] 365 pub handle: Option<Handle<'i>>, 366} 367 368#[derive(Debug, Serialize, Clone)] 369pub struct AccountEvt<'i> { 370 #[serde(borrow)] 371 pub did: Did<'i>, 372 pub active: bool, 373 #[serde(skip_serializing_if = "Option::is_none")] 374 pub status: Option<CowStr<'i>>, 375} 376 377#[cfg(feature = "indexer_stream")] 378#[derive(Serialize, Deserialize, Clone)] 379pub(crate) enum StoredData { 380 Nothing, 381 Ptr(IpldCid), 382 #[serde(with = "jacquard_common::serde_bytes_helper")] 383 Block(Bytes), 384} 385 386#[cfg(feature = "indexer_stream")] 387impl StoredData { 388 pub fn is_nothing(&self) -> bool { 389 matches!(self, StoredData::Nothing) 390 } 391} 392 393#[cfg(feature = "indexer_stream")] 394impl Default for StoredData { 395 fn default() -> Self { 396 Self::Nothing 397 } 398} 399 400#[cfg(feature = "indexer_stream")] 401impl Debug for StoredData { 402 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 403 match self { 404 Self::Nothing => f.write_str("nothing"), 405 Self::Block(_) => f.write_str("<block>"), 406 Self::Ptr(cid) => write!(f, "{cid}"), 407 } 408 } 409} 410 411#[cfg(feature = "indexer_stream")] 412#[derive(Debug, Serialize, Deserialize, Clone)] 413#[serde(bound(deserialize = "'i: 'de"))] 414pub(crate) struct StoredEvent<'i> { 415 #[serde(default)] 416 pub live: bool, 417 #[serde(borrow)] 418 pub did: TrimmedDid<'i>, 419 pub rev: DbTid, 420 #[serde(borrow)] 421 pub collection: CowStr<'i>, 422 pub rkey: DbRkey, 423 pub action: DbAction, 424 #[serde(default)] 425 #[serde(skip_serializing_if = "StoredData::is_nothing")] 426 pub data: StoredData, 427} 428 429/// a durable record event that is also emitted on the in-memory stream after commit. 430/// 431/// `inline_block` is only used for live tailing to avoid loading the record block from `blocks`. 432/// cursor replay continues to read from the database. 433#[cfg(feature = "indexer_stream")] 434#[derive(Debug, Clone)] 435pub(crate) struct LiveRecordEvent { 436 pub id: u64, 437 pub stored: StoredEvent<'static>, 438 pub inline_block: Option<Bytes>, 439} 440 441#[cfg(feature = "relay")] 442#[derive(Clone)] 443pub(crate) enum RelayBroadcast { 444 Persisted(#[allow(dead_code)] u64), 445 #[allow(dead_code)] 446 Ephemeral(u64, bytes::Bytes), 447}