lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

sketch out

phil fce3c3c0 a65ff1d8

+734 -2
+1
Cargo.lock
··· 2193 2193 "jacquard-repo", 2194 2194 "metrics", 2195 2195 "metrics-exporter-prometheus", 2196 + "serde", 2196 2197 "thiserror 2.0.18", 2197 2198 "tokio", 2198 2199 ]
+1
Cargo.toml
··· 14 14 jacquard-repo = "0.9.6" 15 15 metrics = "0.24.3" 16 16 metrics-exporter-prometheus = { version = "0.18.1", features = ["http-listener"] } 17 + serde = { version = "1", features = ["derive"] } 17 18 thiserror = "2.0.18" 18 19 tokio = { version = "1.49.0", features = ["full"] }
+41
src/api/handler.rs
··· 1 + //! XRPC handlers. 2 + //! 3 + //! Uses `jacquard-axum`'s `ExtractXrpc` extractor to deserialise query parameters 4 + //! directly into the lexicon-generated request types. 5 + 6 + use axum::{extract::State, http::StatusCode, Json}; 7 + use jacquard_api::com_atproto::sync::{ 8 + get_repo_status::{GetRepoStatusOutput, GetRepoStatusRequest}, 9 + list_repos_by_collection::{ListReposByCollectionOutput, ListReposByCollectionRequest}, 10 + }; 11 + use jacquard_axum::ExtractXrpc; 12 + 13 + use crate::db::DbRef; 14 + 15 + /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 16 + /// 17 + /// Performs a cursor-paginated prefix scan over the rbc keyspace via 18 + /// `db::index::scan_rbc`, returning up to `limit` (default 500, max 2000) 19 + /// DIDs that have at least one record in the requested collection. 20 + pub async fn list_repos_by_collection( 21 + State(_db): State<DbRef>, 22 + ExtractXrpc(_req): ExtractXrpc<ListReposByCollectionRequest>, 23 + ) -> Result<Json<ListReposByCollectionOutput<'static>>, StatusCode> { 24 + // req.collection — Nsid<'static> 25 + // req.cursor — Option<CowStr<'static>> 26 + // req.limit — Option<i64> 27 + todo!("scan rbc index and return paginated repo list") 28 + } 29 + 30 + /// Handler for `GET /xrpc/com.atproto.sync.getRepoStatus`. 31 + /// 32 + /// Looks up the per-repo state from `db::repo::get` and returns whether the 33 + /// repo is active. Returns a `RepoNotFound` XRPC error (404) if the DID has 34 + /// never been indexed. 35 + pub async fn get_repo_status( 36 + State(_db): State<DbRef>, 37 + ExtractXrpc(_req): ExtractXrpc<GetRepoStatusRequest>, 38 + ) -> Result<Json<GetRepoStatusOutput<'static>>, StatusCode> { 39 + // req.did — Did<'static> 40 + todo!("look up RepoRecord in db and return active/status") 41 + }
+32
src/api/mod.rs
··· 1 + //! XRPC API server. 2 + //! 3 + //! Serves XRPC endpoints via axum routers built with `jacquard-axum`'s 4 + //! `IntoRouter` helper. 5 + 6 + mod handler; 7 + 8 + use std::net::SocketAddr; 9 + 10 + use jacquard_api::com_atproto::sync::{ 11 + get_repo_status::GetRepoStatusRequest, 12 + list_repos_by_collection::ListReposByCollectionRequest, 13 + }; 14 + use jacquard_axum::IntoRouter; 15 + 16 + use crate::db::DbRef; 17 + use crate::error::Result; 18 + 19 + /// Build and serve the axum application on `addr`. 20 + /// 21 + /// Routes: 22 + /// GET /xrpc/com.atproto.sync.listReposByCollection 23 + /// GET /xrpc/com.atproto.sync.getRepoStatus 24 + pub async fn serve(addr: SocketAddr, db: DbRef) -> Result<()> { 25 + let app = ListReposByCollectionRequest::into_router(handler::list_repos_by_collection) 26 + .merge(GetRepoStatusRequest::into_router(handler::get_repo_status)) 27 + .with_state(db); 28 + 29 + let listener = tokio::net::TcpListener::bind(addr).await?; 30 + axum::serve(listener, app).await?; 31 + Ok(()) 32 + }
+17
src/backfill/list_repos.rs
··· 1 + //! Walk `com.atproto.sync.listRepos` with cursor pagination. 2 + //! 3 + //! For each DID encountered, either enqueues it for probing (large repos) or 4 + //! dispatches it to the small-repo fast path. 5 + 6 + use crate::db::DbRef; 7 + use crate::error::Result; 8 + 9 + /// Walk the full `listRepos` feed for `host`, persisting progress after each 10 + /// page so it can be resumed on restart. 11 + /// 12 + /// Uses `jacquard-api`'s `com_atproto::sync::list_repos` XRPC call with cursor 13 + /// pagination. Per-page progress is written via `db::cursor::set_list_repos_progress`. 14 + pub async fn run(host: &str, db: DbRef) -> Result<()> { 15 + let _ = (host, db); 16 + todo!("paginate listRepos, probe each DID, persist cursor after each page") 17 + }
+23
src/backfill/mod.rs
··· 1 + //! Backfill subsystem. 2 + //! 3 + //! Walks `com.atproto.sync.listRepos` and probes each repository to populate 4 + //! the rbc/cbr index before or alongside the live firehose feed. 5 + //! 6 + //! Large repos are enumerated via binary-search `getRecord` probing (`probe`). 7 + //! Small repos take the fast path of fetching the full repo CAR (`small_repo`). 8 + 9 + pub mod list_repos; 10 + pub mod probe; 11 + pub mod small_repo; 12 + 13 + use crate::db::DbRef; 14 + use crate::error::Result; 15 + 16 + /// Run the backfill subsystem for `host`. 17 + /// 18 + /// Resumes from the last-saved listRepos cursor if one exists, then pages 19 + /// through all repos and probes each one. Runs indefinitely until an error 20 + /// occurs (fatal errors) or until the full backfill completes. 21 + pub async fn run(host: String, db: DbRef) -> Result<()> { 22 + list_repos::run(&host, db).await 23 + }
+21
src/backfill/probe.rs
··· 1 + //! Binary-search `getRecord` probing for large-repo backfill. 2 + //! 3 + //! Since every ATProto collection has a known minimum and maximum possible rkey, 4 + //! `getRecord` returns adjacent keys even when the requested key does not exist. 5 + //! This lets us binary-search the MST to enumerate all collections without 6 + //! fetching the full repo CAR. 7 + 8 + use crate::db::DbRef; 9 + use crate::error::Result; 10 + 11 + /// Probe `did` to enumerate its collections via `getRecord` binary search. 12 + /// 13 + /// 1. Issue a `getRecord` for the midpoint of the NSID key space. 14 + /// 2. Feed the returned CAR slice to `mst::adjacent::extract_adjacent`. 15 + /// 3. Use adjacent keys to narrow the search and recurse until all collection 16 + /// boundaries are discovered. 17 + /// 4. Write results to the rbc/cbr index via `db::index::insert`. 18 + pub async fn probe_repo(host: &str, did: &str, db: DbRef) -> Result<()> { 19 + let _ = (host, did, db); 20 + todo!("binary-search getRecord probing to enumerate collections") 21 + }
+20
src/backfill/small_repo.rs
··· 1 + //! Fast path for small repositories: fetch the full repo CAR and extract all 2 + //! collections in a single pass. 3 + //! 4 + //! Small repos are detected by inspecting MST node levels from an initial 5 + //! `getRecord` probe — a high-level node present in a partial CAR slice means 6 + //! the repo is large; absence of high-level nodes suggests a small repo. 7 + 8 + use crate::db::DbRef; 9 + use crate::error::Result; 10 + 11 + /// Fetch the entire repo CAR for `did` via `com.atproto.sync.getRepo` and 12 + /// extract all collection NSIDs in one pass. 13 + /// 14 + /// Streams the CAR response (via `iroh-car` or `jacquard-repo`'s CAR reader), 15 + /// walks every MST leaf to collect all record keys, groups them by collection, 16 + /// and writes the result to the rbc/cbr index. 17 + pub async fn index_small_repo(host: &str, did: &str, db: DbRef) -> Result<()> { 18 + let _ = (host, did, db); 19 + todo!("stream getRepo CAR, parse all MST leaves, index collections") 20 + }
+46
src/db/cursor.rs
··· 1 + //! Firehose cursor and listRepos backfill progress tracking. 2 + 3 + use crate::db::{keys, DbRef}; 4 + use crate::error::Result; 5 + 6 + /// Read the last-saved firehose cursor for `host`. 7 + /// 8 + /// Returns `None` if no cursor has been persisted yet (start from live). 9 + pub fn get_subscribe_cursor(db: &DbRef, host: &str) -> Result<Option<u64>> { 10 + let key = keys::subscribe_cursor(host); 11 + match db.cursors.get(key)? { 12 + None => Ok(None), 13 + Some(v) => { 14 + let bytes: [u8; 8] = v.as_ref().try_into().map_err(|_| { 15 + crate::error::Error::Other("corrupt cursor value".into()) 16 + })?; 17 + Ok(Some(u64::from_be_bytes(bytes))) 18 + } 19 + } 20 + } 21 + 22 + /// Persist the firehose cursor for `host`. 23 + pub fn set_subscribe_cursor(db: &DbRef, host: &str, cursor: u64) -> Result<()> { 24 + let key = keys::subscribe_cursor(host); 25 + db.cursors.insert(key, cursor.to_be_bytes())?; 26 + Ok(()) 27 + } 28 + 29 + /// Progress record for the listRepos backfill walk. 30 + #[derive(Debug, Clone)] 31 + pub struct ListReposProgress { 32 + /// The current pagination cursor (empty string = start from beginning). 33 + pub cursor: String, 34 + /// Set when the full walk has completed at least once. 35 + pub completed_at: Option<String>, 36 + } 37 + 38 + /// Read the listRepos backfill progress for `host`. 39 + pub fn get_list_repos_progress(_db: &DbRef, _host: &str) -> Result<Option<ListReposProgress>> { 40 + todo!("deserialize ListReposProgress") 41 + } 42 + 43 + /// Write the listRepos backfill progress for `host`. 44 + pub fn set_list_repos_progress(_db: &DbRef, _host: &str, _progress: &ListReposProgress) -> Result<()> { 45 + todo!("serialize and write ListReposProgress") 46 + }
+86
src/db/index.rs
··· 1 + //! rbc / cbr index operations. 2 + 3 + use crate::db::{keys, DbRef}; 4 + use crate::error::Result; 5 + 6 + /// Insert a `(collection, did)` pair into both the rbc and cbr indexes. 7 + pub fn insert(db: &DbRef, did: &str, collection: &str) -> Result<()> { 8 + let mut batch = db.database.batch(); 9 + batch.insert(&db.rbc, keys::rbc(collection, did), b""); 10 + batch.insert(&db.cbr, keys::cbr(did, collection), b""); 11 + batch.commit()?; 12 + Ok(()) 13 + } 14 + 15 + /// Remove a `(collection, did)` pair from both indexes. 16 + pub fn remove(db: &DbRef, did: &str, collection: &str) -> Result<()> { 17 + let mut batch = db.database.batch(); 18 + batch.remove(&db.rbc, keys::rbc(collection, did)); 19 + batch.remove(&db.cbr, keys::cbr(did, collection)); 20 + batch.commit()?; 21 + Ok(()) 22 + } 23 + 24 + /// Remove all index entries for `did` (account deletion / full resync). 25 + /// 26 + /// Reads all collections from the cbr index, then deletes both sides in a batch. 27 + pub fn remove_all(db: &DbRef, did: &str) -> Result<()> { 28 + let prefix = keys::cbr_prefix(did); 29 + // fjall::Iter yields Guard; call into_inner() to get the KvPair. 30 + let collections: Vec<Vec<u8>> = db 31 + .cbr 32 + .prefix(prefix) 33 + .map(|guard| guard.into_inner().map(|(k, _v)| k.to_vec())) 34 + .collect::<fjall::Result<_>>()?; 35 + 36 + if collections.is_empty() { 37 + return Ok(()); 38 + } 39 + 40 + let mut batch = db.database.batch(); 41 + let prefix_len = keys::cbr_prefix(did).len(); 42 + for cbr_key in &collections { 43 + // Extract the collection suffix from the cbr key. 44 + // Key layout: "cbr"\0<did>\0<collection> 45 + let col_bytes: &[u8] = &cbr_key[prefix_len..]; 46 + if let Ok(collection) = std::str::from_utf8(col_bytes) { 47 + batch.remove(&db.rbc, keys::rbc(collection, did)); 48 + } 49 + batch.remove(&db.cbr, cbr_key.as_slice()); 50 + } 51 + batch.commit()?; 52 + Ok(()) 53 + } 54 + 55 + /// Iterate over DIDs in the rbc index for `collection`, starting after `cursor`. 56 + /// 57 + /// Returns at most `limit` DID strings. 58 + pub fn scan_rbc(db: &DbRef, collection: &str, cursor: Option<&str>, limit: usize) -> Result<Vec<String>> { 59 + let prefix = keys::rbc_prefix(collection); 60 + let prefix_len = prefix.len(); 61 + 62 + let start_key: Vec<u8> = match cursor { 63 + None => prefix.clone(), 64 + Some(did) => keys::rbc(collection, did), 65 + }; 66 + 67 + let mut dids = Vec::with_capacity(limit); 68 + for guard in db.rbc.range(start_key..) { 69 + let (k, _v) = guard.into_inner()?; 70 + if !k.starts_with(&prefix) { 71 + break; 72 + } 73 + if dids.len() >= limit { 74 + break; 75 + } 76 + // Skip the cursor key itself. 77 + if cursor.is_some_and(|c| k.as_ref() == keys::rbc(collection, c).as_slice()) { 78 + continue; 79 + } 80 + let col_bytes: &[u8] = &k[prefix_len..]; 81 + if let Ok(did) = std::str::from_utf8(col_bytes) { 82 + dids.push(did.to_owned()); 83 + } 84 + } 85 + Ok(dids) 86 + }
+123
src/db/keys.rs
··· 1 + //! Key encoding for all fjall partitions. 2 + //! 3 + //! Separator byte `\0` is used between key components since it cannot appear 4 + //! in NSIDs, DIDs, or hostnames. 5 + 6 + // Partition key prefixes 7 + pub const PREFIX_RBC: &[u8] = b"rbc"; 8 + pub const PREFIX_CBR: &[u8] = b"cbr"; 9 + pub const PREFIX_REPO: &[u8] = b"repo"; 10 + pub const PREFIX_REPO_PREV: &[u8] = b"repoPrev"; 11 + pub const PREFIX_SUBSCRIBE_REPOS: &[u8] = b"subscribeRepos"; 12 + pub const PREFIX_LIST_REPOS: &[u8] = b"listRepos"; 13 + pub const PREFIX_RESYNC_QUEUE: &[u8] = b"repoResyncQueue"; 14 + 15 + const SEP: u8 = b'\0'; 16 + 17 + /// `"rbc"\0<collection>\0<did>` — main collection index. 18 + /// 19 + /// Prefix-scan with `"rbc"\0<collection>\0` to enumerate all repos in a collection. 20 + pub fn rbc(collection: &str, did: &str) -> Vec<u8> { 21 + let mut key = Vec::with_capacity(PREFIX_RBC.len() + 1 + collection.len() + 1 + did.len()); 22 + key.extend_from_slice(PREFIX_RBC); 23 + key.push(SEP); 24 + key.extend_from_slice(collection.as_bytes()); 25 + key.push(SEP); 26 + key.extend_from_slice(did.as_bytes()); 27 + key 28 + } 29 + 30 + /// `"rbc"\0<collection>\0` — prefix for scanning all repos in a collection. 31 + pub fn rbc_prefix(collection: &str) -> Vec<u8> { 32 + let mut key = Vec::with_capacity(PREFIX_RBC.len() + 1 + collection.len() + 1); 33 + key.extend_from_slice(PREFIX_RBC); 34 + key.push(SEP); 35 + key.extend_from_slice(collection.as_bytes()); 36 + key.push(SEP); 37 + key 38 + } 39 + 40 + /// `"cbr"\0<did>\0<collection>` — reversed index for per-repo collection lookup. 41 + /// 42 + /// Prefix-scan with `"cbr"\0<did>\0` to enumerate all collections for a DID. 43 + pub fn cbr(did: &str, collection: &str) -> Vec<u8> { 44 + let mut key = Vec::with_capacity(PREFIX_CBR.len() + 1 + did.len() + 1 + collection.len()); 45 + key.extend_from_slice(PREFIX_CBR); 46 + key.push(SEP); 47 + key.extend_from_slice(did.as_bytes()); 48 + key.push(SEP); 49 + key.extend_from_slice(collection.as_bytes()); 50 + key 51 + } 52 + 53 + /// `"cbr"\0<did>\0` — prefix for scanning all collections for a repo. 54 + pub fn cbr_prefix(did: &str) -> Vec<u8> { 55 + let mut key = Vec::with_capacity(PREFIX_CBR.len() + 1 + did.len() + 1); 56 + key.extend_from_slice(PREFIX_CBR); 57 + key.push(SEP); 58 + key.extend_from_slice(did.as_bytes()); 59 + key.push(SEP); 60 + key 61 + } 62 + 63 + /// `"repo"\0<did>` — per-repo state entry. 64 + pub fn repo(did: &str) -> Vec<u8> { 65 + let mut key = Vec::with_capacity(PREFIX_REPO.len() + 1 + did.len()); 66 + key.extend_from_slice(PREFIX_REPO); 67 + key.push(SEP); 68 + key.extend_from_slice(did.as_bytes()); 69 + key 70 + } 71 + 72 + /// `"repoPrev"\0<did>` — per-repo transient sync state (rev + prevData CID). 73 + pub fn repo_prev(did: &str) -> Vec<u8> { 74 + let mut key = Vec::with_capacity(PREFIX_REPO_PREV.len() + 1 + did.len()); 75 + key.extend_from_slice(PREFIX_REPO_PREV); 76 + key.push(SEP); 77 + key.extend_from_slice(did.as_bytes()); 78 + key 79 + } 80 + 81 + /// `"subscribeRepos"\0<host>\0"cursor"` — firehose cursor for a relay host. 82 + pub fn subscribe_cursor(host: &str) -> Vec<u8> { 83 + const SUFFIX: &[u8] = b"cursor"; 84 + let mut key = Vec::with_capacity(PREFIX_SUBSCRIBE_REPOS.len() + 1 + host.len() + 1 + SUFFIX.len()); 85 + key.extend_from_slice(PREFIX_SUBSCRIBE_REPOS); 86 + key.push(SEP); 87 + key.extend_from_slice(host.as_bytes()); 88 + key.push(SEP); 89 + key.extend_from_slice(SUFFIX); 90 + key 91 + } 92 + 93 + /// `"listRepos"\0<host>` — listRepos backfill progress for a relay host. 94 + pub fn list_repos_progress(host: &str) -> Vec<u8> { 95 + let mut key = Vec::with_capacity(PREFIX_LIST_REPOS.len() + 1 + host.len()); 96 + key.extend_from_slice(PREFIX_LIST_REPOS); 97 + key.push(SEP); 98 + key.extend_from_slice(host.as_bytes()); 99 + key 100 + } 101 + 102 + /// `"repoResyncQueue"\0<ts_be:u64>\0<did>` — timestamp-ordered resync queue. 103 + /// 104 + /// Big-endian timestamp gives natural chronological ordering. 105 + pub fn resync_queue(ts_be: u64, did: &str) -> Vec<u8> { 106 + let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1 + 8 + 1 + did.len()); 107 + key.extend_from_slice(PREFIX_RESYNC_QUEUE); 108 + key.push(SEP); 109 + key.extend_from_slice(&ts_be.to_be_bytes()); 110 + key.push(SEP); 111 + key.extend_from_slice(did.as_bytes()); 112 + key 113 + } 114 + 115 + /// `"repoResyncQueue"\0<ts_be:u64>\0` — prefix for scanning resync queue up to a timestamp. 116 + pub fn resync_queue_prefix(ts_be: u64) -> Vec<u8> { 117 + let mut key = Vec::with_capacity(PREFIX_RESYNC_QUEUE.len() + 1 + 8 + 1); 118 + key.extend_from_slice(PREFIX_RESYNC_QUEUE); 119 + key.push(SEP); 120 + key.extend_from_slice(&ts_be.to_be_bytes()); 121 + key.push(SEP); 122 + key 123 + }
+51
src/db/mod.rs
··· 1 + pub mod cursor; 2 + pub mod index; 3 + pub mod keys; 4 + pub mod repo; 5 + pub mod resync; 6 + 7 + use std::path::Path; 8 + use std::sync::Arc; 9 + 10 + use crate::error::Result; 11 + 12 + /// Shared handle to the fjall database and its per-concern keyspaces. 13 + /// 14 + /// In fjall 3.x, `Database` is the top-level multi-keyspace container and 15 + /// `Keyspace` is an individual column-family (the old `PartitionHandle`). 16 + pub struct Db { 17 + pub(crate) database: fjall::Database, 18 + /// Main collection index: `rbc\0<collection>\0<did>`. 19 + pub(crate) rbc: fjall::Keyspace, 20 + /// Reversed index: `cbr\0<did>\0<collection>`. 21 + pub(crate) cbr: fjall::Keyspace, 22 + /// Per-repo state + metadata. 23 + pub(crate) repos: fjall::Keyspace, 24 + /// Firehose cursor + listRepos progress. 25 + pub(crate) cursors: fjall::Keyspace, 26 + /// Timestamp-ordered resync queue. 27 + pub(crate) resync: fjall::Keyspace, 28 + } 29 + 30 + /// Cheaply-cloneable reference to the shared database. 31 + pub type DbRef = Arc<Db>; 32 + 33 + /// Open (or create) the fjall database at `path` and return a shared handle. 34 + pub fn open(path: &Path) -> Result<DbRef> { 35 + let database = fjall::Database::builder(path).open()?; 36 + 37 + let rbc = database.keyspace("rbc", fjall::KeyspaceCreateOptions::default)?; 38 + let cbr = database.keyspace("cbr", fjall::KeyspaceCreateOptions::default)?; 39 + let repos = database.keyspace("repos", fjall::KeyspaceCreateOptions::default)?; 40 + let cursors = database.keyspace("cursors", fjall::KeyspaceCreateOptions::default)?; 41 + let resync = database.keyspace("resync", fjall::KeyspaceCreateOptions::default)?; 42 + 43 + Ok(Arc::new(Db { 44 + database, 45 + rbc, 46 + cbr, 47 + repos, 48 + cursors, 49 + resync, 50 + })) 51 + }
+72
src/db/repo.rs
··· 1 + //! Per-repo state storage. 2 + 3 + use crate::db::{keys, DbRef}; 4 + use crate::error::Result; 5 + 6 + /// High-level lifecycle state of a repo. 7 + #[derive(Debug, Clone, PartialEq, Eq)] 8 + pub enum RepoState { 9 + /// Repo is being indexed for the first time. 10 + Indexing, 11 + /// Repo is fully indexed and up to date. 12 + Active, 13 + /// Repo has been tombstoned / deactivated. 14 + Tombstoned, 15 + } 16 + 17 + /// Processing status used to drive backfill / resync decisions. 18 + #[derive(Debug, Clone, PartialEq, Eq)] 19 + pub enum RepoStatus { 20 + /// No record yet. 21 + Unknown, 22 + /// Queued for backfill probing. 23 + Pending, 24 + /// Backfill complete. 25 + Complete, 26 + /// An error occurred; `error` field has details. 27 + Error, 28 + } 29 + 30 + /// Stored record for a repository. 31 + #[derive(Debug, Clone)] 32 + pub struct RepoRecord { 33 + pub state: RepoState, 34 + pub status: RepoStatus, 35 + pub error: Option<String>, 36 + } 37 + 38 + /// Read the [`RepoRecord`] for `did`, returning `None` if no record exists. 39 + pub fn get(_db: &DbRef, _did: &str) -> Result<Option<RepoRecord>> { 40 + todo!("deserialize RepoRecord from fjall value") 41 + } 42 + 43 + /// Write a [`RepoRecord`] for `did`. 44 + pub fn put(_db: &DbRef, _did: &str, _record: &RepoRecord) -> Result<()> { 45 + todo!("serialize RepoRecord and write to fjall") 46 + } 47 + 48 + /// Transient sync state for proof validation. 49 + #[derive(Debug, Clone)] 50 + pub struct RepoPrev { 51 + /// The last-seen `rev` string. 52 + pub rev: String, 53 + /// The last-seen `prevData` CID (as raw bytes). 54 + pub prev_data: Vec<u8>, 55 + } 56 + 57 + /// Read the transient [`RepoPrev`] for `did`. 58 + pub fn get_prev(_db: &DbRef, _did: &str) -> Result<Option<RepoPrev>> { 59 + todo!("deserialize RepoPrev") 60 + } 61 + 62 + /// Write the transient [`RepoPrev`] for `did`. 63 + pub fn put_prev(_db: &DbRef, _did: &str, _prev: &RepoPrev) -> Result<()> { 64 + todo!("serialize and write RepoPrev") 65 + } 66 + 67 + /// Delete the transient [`RepoPrev`] for `did`. 68 + pub fn delete_prev(db: &DbRef, did: &str) -> Result<()> { 69 + let key = keys::repo_prev(did); 70 + db.repos.remove(key)?; 71 + Ok(()) 72 + }
+34
src/db/resync.rs
··· 1 + //! Timestamp-ordered resync queue. 2 + //! 3 + //! Keys: `"repoResyncQueue"\0<ts_be:u64>\0<did>` 4 + //! Values: CBOR payload with the triggering commit, retry count, and retry reason. 5 + 6 + use crate::db::DbRef; 7 + use crate::error::Result; 8 + 9 + /// An item waiting in the resync queue. 10 + #[derive(Debug, Clone)] 11 + pub struct ResyncItem { 12 + pub did: String, 13 + pub retry_count: u16, 14 + pub retry_reason: String, 15 + /// Raw CBOR of the triggering firehose commit. 16 + pub commit_cbor: Vec<u8>, 17 + } 18 + 19 + /// Enqueue a repo for resync at the given Unix timestamp (seconds). 20 + pub fn enqueue(_db: &DbRef, _ts: u64, _item: &ResyncItem) -> Result<()> { 21 + todo!("serialize ResyncItem to CBOR and insert into resync partition") 22 + } 23 + 24 + /// Dequeue and return the next item whose timestamp is ≤ `now`. 25 + /// 26 + /// Removes the entry from the queue atomically before returning it. 27 + pub fn dequeue_ready(_db: &DbRef, _now: u64) -> Result<Option<ResyncItem>> { 28 + todo!("scan resync partition up to `now`, remove and return the first entry") 29 + } 30 + 31 + /// Remove all queue entries for `did` (e.g., after a successful resync). 32 + pub fn remove_did(_db: &DbRef, _did: &str) -> Result<()> { 33 + todo!("scan resync partition and remove all entries matching did") 34 + }
+13
src/error.rs
··· 1 + use thiserror::Error; 2 + 3 + #[derive(Debug, Error)] 4 + pub enum Error { 5 + #[error("database: {0}")] 6 + Db(#[from] fjall::Error), 7 + #[error("I/O: {0}")] 8 + Io(#[from] std::io::Error), 9 + #[error("{0}")] 10 + Other(String), 11 + } 12 + 13 + pub type Result<T> = std::result::Result<T, Error>;
+20
src/firehose/mod.rs
··· 1 + //! Firehose subsystem. 2 + //! 3 + //! Connects to an ATProto relay, validates incoming commits via sync1.1 inductive 4 + //! proofs, and updates the rbc/cbr index on collection additions/removals. 5 + 6 + mod subscriber; 7 + 8 + pub use subscriber::Subscriber; 9 + 10 + use crate::db::DbRef; 11 + use crate::error::Result; 12 + 13 + /// Spawn the firehose subscriber task for `host` and run until it returns. 14 + /// 15 + /// This is the top-level entry point called from `main`. The subscriber handles 16 + /// reconnection internally, so this future only resolves on a fatal error. 17 + pub async fn run(host: String, db: DbRef) -> Result<()> { 18 + let mut sub = Subscriber::new(host, db); 19 + sub.run().await 20 + }
+51
src/firehose/subscriber.rs
··· 1 + //! Firehose WebSocket subscriber. 2 + //! 3 + //! Connects to an ATProto relay using `jacquard-common`'s `SubscriptionExt` + 4 + //! `TungsteniteClient`, persists/restores the sequence cursor via `db::cursor`, 5 + //! and dispatches decoded events to the appropriate handlers. 6 + 7 + use crate::db::DbRef; 8 + use crate::error::Result; 9 + 10 + /// Manages a single WebSocket connection to a relay firehose. 11 + pub struct Subscriber { 12 + host: String, 13 + db: DbRef, 14 + } 15 + 16 + impl Subscriber { 17 + pub fn new(host: String, db: DbRef) -> Self { 18 + Self { host, db } 19 + } 20 + 21 + /// Connect and run the subscriber loop, reconnecting on disconnect. 22 + /// 23 + /// Restores the last cursor from `db::cursor::get_subscribe_cursor` before 24 + /// connecting, and persists it after each successfully processed event. 25 + pub async fn run(&mut self) -> Result<()> { 26 + todo!( 27 + "connect via jacquard_common::TungsteniteClient + SubscriptionExt; \ 28 + decode events with decode_cbor_msg; dispatch #commit / #sync / #account" 29 + ) 30 + } 31 + 32 + /// Process a single `#commit` firehose event. 33 + /// 34 + /// Validates the inductive proof via `jacquard_repo::commit::proof::verify_proofs`, 35 + /// then diffs the MST to determine collection additions/removals. 36 + async fn handle_commit(&self, _event_bytes: &[u8]) -> Result<()> { 37 + todo!("validate sync1.1 proof and update rbc/cbr index") 38 + } 39 + 40 + /// Process a single `#sync` firehose event. 41 + /// 42 + /// Probes the repo to diff against recorded collections. 43 + async fn handle_sync(&self, _event_bytes: &[u8]) -> Result<()> { 44 + todo!("probe repo to detect collection changes after #sync") 45 + } 46 + 47 + /// Process an `#account` (tombstone / reactivation) event. 48 + async fn handle_account(&self, _event_bytes: &[u8]) -> Result<()> { 49 + todo!("update RepoState and remove index entries if tombstoned") 50 + } 51 + }
+41 -2
src/main.rs
··· 1 - fn main() { 2 - println!("Hello, world!"); 1 + mod api; 2 + mod backfill; 3 + mod db; 4 + mod error; 5 + mod firehose; 6 + mod mst; 7 + 8 + use clap::Parser; 9 + use std::net::SocketAddr; 10 + use std::path::PathBuf; 11 + 12 + use error::Result; 13 + 14 + #[derive(Parser, Debug)] 15 + #[command(name = "lightrail", about = "listReposByCollection indexing service")] 16 + struct Args { 17 + /// ATProto relay or PDS host to subscribe to. 18 + #[arg(long, env = "LIGHTRAIL_SUBSCRIBE")] 19 + subscribe: String, 20 + 21 + /// Path to the fjall database directory. 22 + #[arg(long, env = "LIGHTRAIL_DB_PATH", default_value = "lightrail.db")] 23 + db_path: PathBuf, 24 + 25 + /// TCP address for the XRPC API server. 26 + #[arg(long, env = "LIGHTRAIL_LISTEN", default_value = "0.0.0.0:3000")] 27 + listen: SocketAddr, 28 + } 29 + 30 + #[tokio::main] 31 + async fn main() -> Result<()> { 32 + let args = Args::parse(); 33 + let db = db::open(&args.db_path)?; 34 + 35 + tokio::select! { 36 + result = firehose::run(args.subscribe.clone(), db.clone()) => result?, 37 + result = backfill::run(args.subscribe, db.clone()) => result?, 38 + result = api::serve(args.listen, db) => result?, 39 + } 40 + 41 + Ok(()) 3 42 }
+33
src/mst/adjacent.rs
··· 1 + //! Extract adjacent MST keys from a CAR slice. 2 + //! 3 + //! Given the raw blocks included in a `getRecord` or firehose commit CAR slice, 4 + //! this module uses `jacquard-repo` primitives to find neighbouring record keys 5 + //! and detect collection boundaries. 6 + 7 + /// The adjacent keys returned by the MST for a given probe key. 8 + #[derive(Debug, Clone)] 9 + pub struct AdjacentKeys { 10 + /// The key immediately before the probe key in the MST, if any. 11 + pub prev: Option<String>, 12 + /// The key immediately after the probe key in the MST, if any. 13 + pub next: Option<String>, 14 + } 15 + 16 + /// Extract adjacent keys for `probe_key` from the given CAR block bytes. 17 + /// 18 + /// Loads the blocks into an in-memory block store and uses `MstCursor` from 19 + /// `jacquard-repo` to walk the MST and find neighbours. 20 + pub fn extract_adjacent(_car_bytes: &[u8], _probe_key: &str) -> crate::error::Result<AdjacentKeys> { 21 + todo!("load CAR blocks into MemoryBlockStore, use MstCursor to find adjacent keys") 22 + } 23 + 24 + /// Estimate whether this is a small repo by inspecting the MST level of the 25 + /// highest-level node present in `car_bytes`. 26 + /// 27 + /// Uses `jacquard_repo::mst::util::layer_for_key()` on the highest-level key 28 + /// found in the partial MST slice. Small repos are statistically unlikely to 29 + /// contain high-level keys, since every CAR slice must include all maximum-level 30 + /// keys of the repository. 31 + pub fn is_small_repo(_car_bytes: &[u8], _threshold_level: u8) -> crate::error::Result<bool> { 32 + todo!("parse MST nodes from CAR blocks and check maximum key level") 33 + }
+8
src/mst/mod.rs
··· 1 + //! MST (Merkle Search Tree) utilities. 2 + //! 3 + //! Thin wrappers around `jacquard-repo` primitives for: 4 + //! - extracting adjacent keys from CAR slices 5 + //! - detecting collection boundaries 6 + //! - small-repo heuristic via MST level inspection 7 + 8 + pub mod adjacent;