···1010iroh-car = "0.5.1"
1111jacquard-api = { version = "0.9.5", default-features = false, features = ["com_atproto"] }
1212jacquard-axum = { version = "0.9.6", default-features = false, features = ["tracing"] }
1313-jacquard-common = { version = "0.9.5", features = ["websocket"] }
1313+jacquard-common = { version = "0.9.5", features = ["websocket", "reqwest-client"] }
1414jacquard-repo = "0.9.6"
1515metrics = "0.24.3"
1616metrics-exporter-prometheus = { version = "0.18.1", features = ["http-listener"] }
1717+bytes = "1"
1818+reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
1719serde = { version = "1", features = ["derive"] }
1820thiserror = "2.0.18"
1921tokio = { version = "1.49.0", features = ["full"] }
+146-10
src/backfill/probe.rs
···11//! `getRecord`-probing for large-repo backfill.
22//!
33//! MST keys have the form `<collection>/<rkey>`, where `collection` is an NSID
44-//! and `rkey` is a Record Key, both subject to format restrictions and a total
55-//! byte-length cap defined in the AT Protocol specs.
44+//! and `rkey` is a Record Key, both subject to format restrictions defined in
55+//! the AT Protocol specs.
66//!
77-//! `getRecord` always includes the keys adjacent to the queried key in its CAR
88-//! slice response, even when the record does not exist. The probing algorithm
99-//! exploits this to enumerate every collection with one request per collection:
77+//! `getRecord` usually includes the keys adjacent to the queried key in its CAR
88+//! slice response, and always does when the requested key does not exist. The
99+//! probing algorithm exploits this to enumerate every collection with one
1010+//! request per collection:
1011//!
1112//! 1. Query `getRecord` with the **minimum legal MST key** — the
1213//! lexicographically lowest string that is a valid `<collection>/<rkey>`.
···1617//! 2. For that collection, compute the **maximum legal rkey** and query
1718//! `getRecord` with `<collection>/<max_rkey>`. The right-adjacent key in
1819//! the response is the first key of the *next* collection in the repo.
1919-//! 3. Repeat step 2 for each newly discovered collection until no right-adjacent
2020-//! key is returned, signalling that all collections have been found.
2020+//! 3. Repeat step 2 for each newly discovered collection until no right-
2121+//! adjacent key is returned, signalling that all collections have been
2222+//! found.
2323+//!
2424+//! It is *possible* for the maximum legal rkey of a collection to be present in
2525+//! a repo, and for its immediate right-adjacent key *not* to be present in the
2626+//! CAR slice response. In such cases, we can compute the very next legal
2727+//! collection and request it with the minimum legal rkey.
2828+//!
2929+//! Repositories can update while being probed. This is detectable because every
3030+//! probe response includes at least one parent block on any changed key's path.
3131+//! Need to get in the weeds but I think if we maintain a sparse tree from the
3232+//! probes, we might even be able to know whether an update added or removed any
3333+//! collections within the area we've already covered?
3434+//!
3535+//! TODO also: there is a `getBlocks` endpoint, which might be an alternative
3636+//! to probing: we could do one probe to get the root, then walk down the tree
3737+//! (on parallel paths even) to build out the sparse collection-boundary
3838+//! skeleton tree. Is this more efficient than probing with min/max rkeys?
3939+//!
4040+//! in either case, if handling repo updates leads to too many re-fetches, we
4141+//! should fall back to `getRepo` and full mst walking.
2142//!
2243//! Each discovered `(did, collection)` pair is written to the rbc/cbr index
2344//! via `db::index::insert`.
24454646+use bytes::Bytes;
4747+use jacquard_api::com_atproto::sync::get_record::{GetRecord, GetRecordError};
4848+use jacquard_common::{
4949+ error::ClientErrorKind,
5050+ types::string::{Did, Nsid, RecordKey, Rkey},
5151+ xrpc::{XrpcError, XrpcExt},
5252+};
5353+2554use crate::db::DbRef;
2626-use crate::error::Result;
5555+use crate::error::{Error, Result};
5656+5757+/// minimum legal NSID
5858+///
5959+/// - whole domain authority must be lowercase
6060+/// - top level domain must start with an alphabetic character
6161+/// - other domain segments cannot begin or end with hyphens
6262+/// - max 253 chars of domain authority before the last name segment
6363+/// - name segment accepts uppercase and must begin with an alphabetic
6464+const MIN_COLLECTION: &str = "a-------------------------------------------------------------0.0-------------------------------------------------------------0.0-------------------------------------------------------------0.0-----------------------------------------------------------0.A";
6565+6666+/// minimum legal rkey: `-` = ordinal 45 (.:_~ are 46, 58, 95, 127 respectively)
6767+const MIN_RKEY: &str = "-";
6868+6969+/// maximum legal rkey: 512 of the max legal character
7070+const MAX_RKEY: &str = "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~";
7171+7272+/// Extract the collection segment from an MST key of the form
7373+/// `<collection>/<rkey>`.
7474+fn collection_from_key(key: &str) -> Option<&str> {
7575+ key.split_once('/').map(|(col, _)| col)
7676+}
27772878/// Probe `did` to enumerate its collections via sequential `getRecord` requests.
2979///
···3181/// collection at a time until the end of the repo is reached. One XRPC request
3282/// is issued per collection present in the repo.
3383pub async fn probe_repo(host: &str, did: &str, db: DbRef) -> Result<()> {
3434- let _ = (host, did, db);
3535- todo!("sequential getRecord probing: walk right-adjacent keys to enumerate collections")
8484+ // MAX_RKEY is 512 '~' characters — the lexicographically largest valid rkey.
8585+ let max_rkey: String = "~".repeat(512);
8686+8787+ let client = reqwest::Client::new();
8888+ let base: jacquard_common::url::Url = format!("https://{}", host)
8989+ .parse()
9090+ .map_err(|e: jacquard_common::url::ParseError| Error::Other(e.to_string()))?;
9191+9292+ // Step 1: probe the minimum legal MST key to discover the first collection.
9393+ let probe_key = format!("{}/{}", MIN_COLLECTION, MIN_RKEY);
9494+ let car = match fetch_car(&client, &base, did, MIN_COLLECTION, MIN_RKEY).await? {
9595+ Some(bytes) => bytes,
9696+ None => return Ok(()), // repo is inaccessible or does not exist
9797+ };
9898+ let adjacent = crate::mst::adjacent::extract_adjacent(&car, &probe_key).await?;
9999+ let mut current_collection = match adjacent.next.as_deref().and_then(collection_from_key) {
100100+ Some(col) => col.to_owned(),
101101+ None => return Ok(()), // repo has no records
102102+ };
103103+104104+ // Steps 2+: for each discovered collection, insert it and walk to the next.
105105+ loop {
106106+ crate::db::index::insert(&db, did, ¤t_collection)?;
107107+108108+ let probe_key = format!("{}/{}", current_collection, max_rkey);
109109+ let car = match fetch_car(&client, &base, did, ¤t_collection, &max_rkey).await? {
110110+ Some(bytes) => bytes,
111111+ None => break, // repo became inaccessible mid-probe
112112+ };
113113+ let adjacent = crate::mst::adjacent::extract_adjacent(&car, &probe_key).await?;
114114+ let next_collection = match adjacent.next.as_deref().and_then(collection_from_key) {
115115+ Some(col) => col.to_owned(),
116116+ None => break, // no more collections
117117+ };
118118+119119+ if next_collection == current_collection {
120120+ // Safety guard: the adjacent key should always be in the next
121121+ // collection, but avoid an infinite loop if it is not.
122122+ break;
123123+ }
124124+ current_collection = next_collection;
125125+ }
126126+127127+ Ok(())
128128+}
129129+130130+/// Make one `com.atproto.sync.getRecord` request and return the raw CAR bytes.
131131+///
132132+/// Returns `None` if the repository is inaccessible (taken down, suspended,
133133+/// deactivated, or not found) or if an unexpected HTTP error occurs.
134134+async fn fetch_car(
135135+ client: &reqwest::Client,
136136+ base: &jacquard_common::url::Url,
137137+ did: &str,
138138+ collection: &str,
139139+ rkey: &str,
140140+) -> Result<Option<Bytes>> {
141141+ let req = GetRecord {
142142+ collection: Nsid::new_owned(collection).map_err(|e| Error::Other(e.to_string()))?,
143143+ did: Did::new_owned(did).map_err(|e| Error::Other(e.to_string()))?,
144144+ rkey: RecordKey(Rkey::new_owned(rkey).map_err(|e| Error::Other(e.to_string()))?),
145145+ };
146146+147147+ let resp = match client.xrpc(base.clone()).send(&req).await {
148148+ Ok(resp) => resp,
149149+ Err(e) => {
150150+ return match e.kind() {
151151+ // Network or unexpected HTTP-level errors: skip this repo.
152152+ ClientErrorKind::Transport | ClientErrorKind::Http { .. } => Ok(None),
153153+ _ => Err(Error::Other(e.to_string())),
154154+ };
155155+ }
156156+ };
157157+158158+ // resp is HTTP 200 or 400 at this point (401 with WWW-Authenticate is
159159+ // already surfaced as Err by send()).
160160+ match resp.parse() {
161161+ Ok(output) => Ok(Some(output.body)),
162162+ Err(XrpcError::Xrpc(err)) => match err {
163163+ GetRecordError::RepoNotFound(_)
164164+ | GetRecordError::RepoTakendown(_)
165165+ | GetRecordError::RepoSuspended(_)
166166+ | GetRecordError::RepoDeactivated(_)
167167+ | GetRecordError::RecordNotFound(_)
168168+ | GetRecordError::Unknown(_) => Ok(None),
169169+ },
170170+ Err(e) => Err(Error::Other(e.to_string())),
171171+ }
36172}
+94-7
src/mst/adjacent.rs
···11//! Extract adjacent MST keys from a CAR slice.
22//!
33-//! Given the raw blocks included in a `getRecord` or firehose commit CAR slice,
44-//! this module uses `jacquard-repo` primitives to find neighbouring record keys
55-//! and detect collection boundaries.
33+//! Given the raw blocks included in a `getRecord` CAR slice, this module
44+//! uses `jacquard-repo` primitives to find neighbouring record keys and detect
55+//! collection boundaries.
66+//!
77+//! `getRecord` CAR slices are *partial* snapshots of the MST: they include only
88+//! the blocks on the path from the root to the queried key. Sibling subtrees
99+//! are referenced by CID but their blocks are absent. The cursor walks only
1010+//! the blocks that are present, skipping any subtree whose block is missing.
1111+1212+use std::sync::Arc;
1313+1414+use jacquard_repo::{
1515+ MemoryBlockStore, Mst,
1616+ car::parse_car_bytes,
1717+ commit::Commit,
1818+ mst::{CursorPosition, MstCursor},
1919+};
2020+2121+use crate::error::{Error, Result};
622723/// The adjacent keys returned by the MST for a given probe key.
824#[derive(Debug, Clone)]
···15311632/// Extract adjacent keys for `probe_key` from the given CAR block bytes.
1733///
1818-/// Loads the blocks into an in-memory block store and uses `MstCursor` from
1919-/// `jacquard-repo` to walk the MST and find neighbours.
2020-pub fn extract_adjacent(_car_bytes: &[u8], _probe_key: &str) -> crate::error::Result<AdjacentKeys> {
2121- todo!("load CAR blocks into MemoryBlockStore, use MstCursor to find adjacent keys")
3434+/// Parses the MST from the CAR, then walks the tree in sorted order using
3535+/// `MstCursor`, collecting the largest visible key less than `probe_key`
3636+/// (prev) and the smallest visible key greater than `probe_key` (next).
3737+///
3838+/// Subtrees whose blocks are absent from the CAR are silently skipped; this
3939+/// is expected for `getRecord` slices, which contain only the proof path.
4040+pub async fn extract_adjacent(car_bytes: &[u8], probe_key: &str) -> Result<AdjacentKeys> {
4141+ // Parse the CAR bytes into a root CID + block map.
4242+ let parsed = parse_car_bytes(car_bytes)
4343+ .await
4444+ .map_err(|e| Error::Other(e.to_string()))?;
4545+4646+ // The CAR root is the signed commit; its `data` field is the MST root CID.
4747+ let mst_root = {
4848+ let commit_bytes = parsed
4949+ .blocks
5050+ .get(&parsed.root)
5151+ .ok_or_else(|| Error::Other("getRecord CAR has no commit block".into()))?;
5252+ let commit = Commit::from_cbor(commit_bytes.as_ref())
5353+ .map_err(|e| Error::Other(format!("bad commit in getRecord CAR: {}", e)))?;
5454+ *commit.data()
5555+ };
5656+5757+ // Load all CAR blocks into an in-memory store and mount the MST lazily.
5858+ let storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
5959+ let mst = Mst::load(storage, mst_root, None);
6060+6161+ // Walk the tree with a cursor in sorted key order.
6262+ // The cursor starts pointing at the MST root (a Tree position); we process
6363+ // each position as we encounter it.
6464+ let mut cursor = MstCursor::new(mst);
6565+ let mut prev: Option<String> = None;
6666+ let mut next: Option<String> = None;
6767+6868+ loop {
6969+ match cursor.current() {
7070+ CursorPosition::End => break,
7171+7272+ CursorPosition::Leaf { key, .. } => {
7373+ let k = key.as_str();
7474+ if k < probe_key {
7575+ // This is a candidate for prev; keep the latest one seen.
7676+ prev = Some(k.to_owned());
7777+ // Advance past this leaf (step_over; never fails).
7878+ if cursor.advance().await.is_err() {
7979+ break;
8080+ }
8181+ } else if k > probe_key {
8282+ // First key greater than probe_key in sorted order.
8383+ next = Some(k.to_owned());
8484+ break;
8585+ } else {
8686+ // k == probe_key: the record actually exists; skip it.
8787+ if cursor.advance().await.is_err() {
8888+ break;
8989+ }
9090+ }
9191+ }
9292+9393+ CursorPosition::Tree { .. } => {
9494+ // Descend into the subtree. If the block is absent from the
9595+ // CAR (the cursor returns Err), skip the subtree instead.
9696+ match cursor.advance().await {
9797+ Ok(()) => {}
9898+ Err(_) => {
9999+ if cursor.skip_subtree().await.is_err() {
100100+ break;
101101+ }
102102+ }
103103+ }
104104+ }
105105+ }
106106+ }
107107+108108+ Ok(AdjacentKeys { prev, next })
22109}
2311024111/// Estimate whether this is a small repo by inspecting the MST level of the