very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
60
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] implement com.atproto.sync.getRepo

dawn 558934f5 29a25c48

+340 -18
+2
Cargo.lock
··· 1481 1481 "glob", 1482 1482 "hex", 1483 1483 "humantime", 1484 + "iroh-car", 1484 1485 "jacquard-api", 1485 1486 "jacquard-common", 1486 1487 "jacquard-derive", ··· 1506 1507 "thiserror 2.0.18", 1507 1508 "tokio", 1508 1509 "tokio-tungstenite 0.28.0", 1510 + "tokio-util", 1509 1511 "tower-http", 1510 1512 "tracing", 1511 1513 "tracing-subscriber",
+2
Cargo.toml
··· 10 10 11 11 [dependencies] 12 12 tokio = { version = "1.0", features = ["full"] } 13 + tokio-util = { version = "0.7", features = ["io"] } 13 14 14 15 tracing = "0.1" 15 16 tracing-subscriber = { version = "0.3", features = ["env-filter"] } ··· 46 47 scc = "3.6.6" 47 48 data-encoding = "2.10.0" 48 49 cid = "0.11.1" 50 + iroh-car = "0.5.1" 49 51 thiserror = "2.0.18" 50 52 rand = "0.10.0" 51 53 glob = "0.3"
+1
README.md
··· 307 307 the following are implemented currently: 308 308 - `com.atproto.repo.getRecord` 309 309 - `com.atproto.repo.listRecords` 310 + - `com.atproto.sync.getRepo` (`since` parameter not implemented!) 310 311 - `com.atproto.sync.getHostStatus` 311 312 - `com.atproto.sync.listHosts` 312 313 - `com.atproto.sync.getRepoStatus`
+9 -3
src/api/xrpc/get_latest_commit.rs
··· 1 1 use jacquard_api::com_atproto::sync::get_latest_commit::{ 2 2 GetLatestCommitError, GetLatestCommitOutput, GetLatestCommitRequest, GetLatestCommitResponse, 3 3 }; 4 - use jacquard_common::{CowStr, types::cid::Cid}; 4 + use jacquard_common::{CowStr, cowstr::ToCowStr, types::cid::Cid}; 5 5 6 6 use crate::types::RepoStatus; 7 7 ··· 50 50 }); 51 51 }; 52 52 53 + let Some(atp_commit) = commit.into_atp_commit(req.did) else { 54 + return Err(internal_error(nsid, "repo needs migration")); 55 + }; 56 + 57 + let commit_cid = atp_commit.to_cid().map_err(|e| internal_error(nsid, e))?; 58 + 53 59 Ok(Json(GetLatestCommitOutput { 54 - cid: Cid::from(commit.data), 55 - rev: commit.rev.to_tid(), 60 + cid: Cid::Str(commit_cid.to_cowstr().into_static()), 61 + rev: atp_commit.rev, 56 62 extra_data: None, 57 63 })) 58 64 }
+3 -1
src/api/xrpc/get_record.rs
··· 1 + use jacquard_common::{cowstr::ToCowStr, types::cid::Cid}; 2 + 1 3 use super::*; 2 4 3 5 pub async fn handle( ··· 26 28 req.rkey.0.as_str(), 27 29 ) 28 30 .unwrap(), 29 - cid: Some(record.cid), 31 + cid: Some(Cid::Str(record.cid.to_cowstr().into_static())), 30 32 value: record.value, 31 33 extra_data: Default::default(), 32 34 }))
+61
src/api/xrpc/get_repo.rs
··· 1 + use axum::{http::header, response::IntoResponse}; 2 + use jacquard_api::com_atproto::sync::get_repo::{GetRepoError, GetRepoRequest}; 3 + use jacquard_common::CowStr; 4 + 5 + use crate::types::RepoStatus; 6 + 7 + use super::*; 8 + 9 + pub async fn handle( 10 + State(hydrant): State<Hydrant>, 11 + ExtractXrpc(req): ExtractXrpc<GetRepoRequest>, 12 + ) -> Result<impl IntoResponse, XrpcErrorResponse<GetRepoError<'static>>> { 13 + let nsid = GetRepoRequest::PATH; 14 + 15 + if req.since.is_some() { 16 + return Err(bad_request( 17 + nsid, 18 + "hydrant does not support since paramater", 19 + )); 20 + } 21 + 22 + let repo = hydrant.repos.get(&req.did); 23 + 24 + let Some(state) = repo.state().await.map_err(|e| internal_error(nsid, e))? else { 25 + return Err(XrpcErrorResponse { 26 + status: StatusCode::NOT_FOUND, 27 + error: XrpcError::Xrpc(GetRepoError::RepoNotFound(None)), 28 + }); 29 + }; 30 + 31 + let xrpc_err = match &state.status { 32 + RepoStatus::Takendown => Some(GetRepoError::RepoTakendown(None)), 33 + RepoStatus::Suspended => Some(GetRepoError::RepoSuspended(None)), 34 + RepoStatus::Deactivated => Some(GetRepoError::RepoDeactivated(None)), 35 + _ => None, 36 + }; 37 + if let Some(err) = xrpc_err { 38 + return Err(XrpcErrorResponse { 39 + status: StatusCode::FORBIDDEN, 40 + error: XrpcError::Xrpc(err), 41 + }); 42 + } 43 + 44 + let Some(car_bytes) = repo 45 + .generate_car() 46 + .await 47 + .map_err(|e| internal_error(nsid, e))? 48 + else { 49 + return Err(XrpcErrorResponse { 50 + status: StatusCode::NOT_FOUND, 51 + error: XrpcError::Xrpc(GetRepoError::RepoNotFound(Some(CowStr::Borrowed( 52 + "repo still backfilling", 53 + )))), 54 + }); 55 + }; 56 + 57 + Ok(( 58 + [(header::CONTENT_TYPE, "application/vnd.ipld.car")], 59 + axum::body::Body::from_stream(car_bytes), 60 + )) 61 + }
+12 -2
src/api/xrpc/list_repos.rs
··· 38 38 continue; 39 39 }; 40 40 41 + let Some(atp_commit) = commit.into_atp_commit(did.clone()) else { 42 + tracing::warn!(did = %did, "repo needs migration"); 43 + continue; 44 + }; 45 + 46 + let Ok(commit_cid) = atp_commit.to_cid() else { 47 + tracing::warn!(did = %did, "failed to compute commit CID"); 48 + continue; 49 + }; 50 + 41 51 let (active, status) = repo_status_to_api(state.status); 42 52 repos.push(Repo { 43 53 active: Some(active), 44 54 did: did.clone(), 45 - head: Cid::from(commit.data), 46 - rev: commit.rev.to_tid(), 55 + head: Cid::from(commit_cid), 56 + rev: atp_commit.rev, 47 57 status, 48 58 extra_data: None, 49 59 });
+3
src/api/xrpc/mod.rs
··· 11 11 }; 12 12 use jacquard_api::com_atproto::sync::get_host_status::GetHostStatusRequest; 13 13 use jacquard_api::com_atproto::sync::get_latest_commit::GetLatestCommitRequest; 14 + use jacquard_api::com_atproto::sync::get_repo::GetRepoRequest; 14 15 use jacquard_api::com_atproto::sync::get_repo_status::GetRepoStatusRequest; 15 16 use jacquard_api::com_atproto::sync::list_hosts::ListHostsRequest; 16 17 use jacquard_api::com_atproto::sync::list_repos::ListReposRequest; ··· 31 32 mod get_host_status; 32 33 mod get_latest_commit; 33 34 mod get_record; 35 + mod get_repo; 34 36 mod get_repo_status; 35 37 mod list_hosts; 36 38 mod list_records; ··· 45 47 .route(GetHostStatusRequest::PATH, get(get_host_status::handle)) 46 48 .route(ListHostsRequest::PATH, get(list_hosts::handle)) 47 49 .route(GetLatestCommitRequest::PATH, get(get_latest_commit::handle)) 50 + .route(GetRepoRequest::PATH, get(get_repo::handle)) 48 51 .route(GetRepoStatusRequest::PATH, get(get_repo_status::handle)) 49 52 .route(ListReposRequest::PATH, get(list_repos::handle)) 50 53 }
+140 -1
src/control/repos.rs
··· 3 3 4 4 use chrono::{DateTime, Utc}; 5 5 use fjall::OwnedWriteBatch; 6 + use futures::TryFutureExt; 6 7 use jacquard_common::cowstr::ToCowStr; 7 8 use jacquard_common::types::cid::{Cid, IpldCid}; 8 9 use jacquard_common::types::ident::AtIdentifier; ··· 34 35 /// the revision of the root commit of this repository. 35 36 #[serde(skip_serializing_if = "Option::is_none")] 36 37 pub rev: Option<Tid>, 37 - /// the CID of the root commit of this repository. 38 + /// the CID of the MST root of this repository. 38 39 #[serde(serialize_with = "crate::util::opt_cid_serialize_str")] 39 40 #[serde(skip_serializing_if = "Option::is_none")] 40 41 pub data: Option<IpldCid>, ··· 690 691 Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())).expect("that rkey is validated") 691 692 }), 692 693 }) 694 + } 695 + 696 + /// generates a streaming CAR v1 response body for this repository. 697 + /// 698 + /// returns `None` if the repo has no commit yet (still backfilling) or is an 699 + /// unmigrated repo that does not have the necessary data to reconstruct the 700 + /// root commit from. 701 + /// 702 + /// ## notes 703 + /// - calling this if you are using collection allowlist will always result 704 + /// in an error since the commit root won't match the reconstructed CID. 705 + /// - calling this for big repositories will incur more resource cost due to 706 + /// hydrant's structure, the whole MST is always reconstructed. 707 + pub async fn generate_car( 708 + &self, 709 + ) -> Result<Option<impl futures::Stream<Item = std::io::Result<bytes::Bytes>> + Send + 'static>> 710 + { 711 + use iroh_car::{CarHeader, CarWriter}; 712 + use jacquard_repo::{BlockStore, MemoryBlockStore, Mst}; 713 + use miette::WrapErr; 714 + use std::sync::Arc; 715 + 716 + let commit = match self.state().await? { 717 + Some(state) => match state.root { 718 + Some(c) => c, 719 + None => return Ok(None), 720 + }, 721 + None => return Ok(None), 722 + }; 723 + 724 + let atp_commit = match commit.into_atp_commit(self.did.clone().into_static()) { 725 + Some(c) => c, 726 + None => return Ok(None), 727 + }; 728 + let commit_cid = atp_commit.to_cid().into_diagnostic()?; 729 + let commit_cbor = atp_commit.to_cbor().into_diagnostic()?; 730 + 731 + let did = self.did.clone().into_static(); 732 + let app_state = self.state.clone(); 733 + 734 + // build mst and populate the block store in a single blocking pass 735 + let store = Arc::new(MemoryBlockStore::new()); 736 + let mst = Mst::new(store.clone()); 737 + let handle = tokio::runtime::Handle::current(); 738 + 739 + let mst = tokio::task::spawn_blocking(move || -> Result<_> { 740 + let mut mst = mst; 741 + let prefix = keys::record_prefix_did(&did); 742 + 743 + for guard in app_state.db.records.prefix(&prefix) { 744 + let (key, cid_bytes) = guard.into_inner().into_diagnostic()?; 745 + 746 + let rest = &key[prefix.len()..]; 747 + let mut parts = rest.splitn(2, |b: &u8| *b == keys::SEP); 748 + let collection_raw = parts 749 + .next() 750 + .ok_or_else(|| miette::miette!("missing collection in record key"))?; 751 + let rkey_raw = parts 752 + .next() 753 + .ok_or_else(|| miette::miette!("missing rkey in record key"))?; 754 + 755 + let collection = std::str::from_utf8(collection_raw) 756 + .into_diagnostic() 757 + .wrap_err("collection is not valid utf8")?; 758 + let rkey = keys::parse_rkey(rkey_raw)?; 759 + let mst_key = format!("{collection}/{rkey}"); 760 + 761 + let ipld_cid = cid::Cid::read_bytes(cid_bytes.as_ref()) 762 + .into_diagnostic() 763 + .wrap_err_with(|| format!("invalid cid bytes for record {mst_key}"))?; 764 + 765 + let block_key = keys::block_key(collection, cid_bytes.as_ref()); 766 + let block_bytes = app_state 767 + .db 768 + .blocks 769 + .get(&block_key) 770 + .into_diagnostic()? 771 + .ok_or_else(|| miette::miette!("block missing for record {mst_key}"))?; 772 + 773 + handle 774 + .block_on(mst.add_mut(&mst_key, ipld_cid)) 775 + .into_diagnostic()?; 776 + // we use put_many here to skip calculating the CID again 777 + handle 778 + .block_on(mst.storage().put_many([( 779 + ipld_cid, 780 + bytes::Bytes::copy_from_slice(block_bytes.as_ref()), 781 + )])) 782 + .into_diagnostic()?; 783 + } 784 + 785 + handle.block_on(mst.persist()).into_diagnostic()?; 786 + 787 + Result::<_>::Ok(mst) 788 + }) 789 + .await 790 + .into_diagnostic()??; 791 + 792 + // sanity check: rebuilt root should match stored commit data in full-index mode 793 + let computed_root = mst.get_pointer().await.into_diagnostic()?; 794 + if computed_root != atp_commit.data { 795 + tracing::warn!( 796 + computed = %computed_root, 797 + stored = %atp_commit.data, 798 + did = %self.did, 799 + "mst root mismatch (expected in filter mode)", 800 + ); 801 + } 802 + 803 + store 804 + .put_many([(commit_cid, bytes::Bytes::from(commit_cbor))]) 805 + .await 806 + .into_diagnostic()?; 807 + 808 + // stream the car directly to the response 809 + let (reader, writer) = tokio::io::duplex(64 * 1024); 810 + tokio::spawn( 811 + async move { 812 + let header = CarHeader::new_v1(vec![commit_cid]); 813 + let mut car_writer = CarWriter::new(header, writer); 814 + 815 + // write commit first, then mst nodes + leaf blocks 816 + let commit_data = store.get(&commit_cid).await?; 817 + if let Some(data) = commit_data { 818 + car_writer 819 + .write(commit_cid, &data) 820 + .await 821 + .into_diagnostic()?; 822 + } 823 + mst.write_blocks_to_car(&mut car_writer).await?; 824 + car_writer.finish().await.into_diagnostic()?; 825 + 826 + Result::<_, miette::Report>::Ok(()) 827 + } 828 + .inspect_err(|e| tracing::error!("can't generate car: {e}")), 829 + ); 830 + 831 + Ok(Some(tokio_util::io::ReaderStream::new(reader))) 693 832 } 694 833 695 834 /// gets how many records of a collection this repository has.
+20 -2
src/types.rs
··· 5 5 use jacquard_common::types::string::{Did, Rkey}; 6 6 use jacquard_common::types::tid::Tid; 7 7 use jacquard_common::{CowStr, IntoStatic, types::string::Handle}; 8 + use jacquard_repo::commit::Commit as AtpCommit; 8 9 use serde::{Deserialize, Serialize, Serializer}; 9 10 use serde_json::Value; 10 11 use smol_str::{SmolStr, ToSmolStr}; ··· 76 77 } 77 78 } 78 79 79 - impl<'c> From<jacquard_repo::commit::Commit<'c>> for Commit { 80 - fn from(value: jacquard_repo::commit::Commit<'c>) -> Self { 80 + impl<'c> From<AtpCommit<'c>> for Commit { 81 + fn from(value: AtpCommit<'c>) -> Self { 81 82 Self { 82 83 data: value.data, 83 84 prev: value.prev, ··· 85 86 sig: value.sig, 86 87 version: value.version, 87 88 } 89 + } 90 + } 91 + 92 + impl Commit { 93 + pub(crate) fn into_atp_commit<'i>(self, did: Did<'i>) -> Option<AtpCommit<'i>> { 94 + // from a migration 95 + if self.version < 0 { 96 + return None; 97 + } 98 + Some(AtpCommit { 99 + did, 100 + rev: self.rev.to_tid(), 101 + data: self.data, 102 + prev: self.prev, 103 + sig: self.sig, 104 + version: self.version, 105 + }) 88 106 } 89 107 } 90 108
+87 -9
tests/repo_sync_integrity.nu
··· 39 39 for i in 0..($hydrant_count - 1) { 40 40 let h_record = ($hydrant_records | get $i) 41 41 let p_record = ($pds_records | get $i) 42 - 42 + 43 43 if $h_record.cid != $p_record.cid { 44 44 let h_rkey = ($h_record.uri | split row "/" | last) 45 45 let p_rkey = ($p_record.uri | split row "/" | last) ··· 61 61 "app.bsky.feed.post" 62 62 "app.bsky.actor.profile" 63 63 ] 64 - 64 + 65 65 mut success = true 66 66 67 67 for coll in $collections { 68 68 print $" checking count for ($coll)..." 69 - 69 + 70 70 # 1. get cached count from API 71 71 let api_count = try { 72 72 (http get $"($hydrant_url)/xrpc/systems.gaze.hydrant.countRecords?identifier=($did)&collection=($coll)").count ··· 93 93 $success 94 94 } 95 95 96 + # sanity check: compare getLatestCommit cid and rev between hydrant and pds 97 + def check-latest-commit [hydrant_url: string, pds_url: string, did: string] { 98 + print "checking getLatestCommit..." 99 + 100 + let hydrant_commit = try { 101 + http get $"($hydrant_url)/xrpc/com.atproto.sync.getLatestCommit?did=($did)" 102 + } catch { 103 + print " error fetching getLatestCommit from hydrant" 104 + return false 105 + } 106 + 107 + let pds_commit = try { 108 + http get $"($pds_url)/xrpc/com.atproto.sync.getLatestCommit?did=($did)" 109 + } catch { 110 + print " error fetching getLatestCommit from pds" 111 + return false 112 + } 113 + 114 + print $" hydrant: cid=($hydrant_commit.cid) rev=($hydrant_commit.rev)" 115 + print $" pds: cid=($pds_commit.cid) rev=($pds_commit.rev)" 116 + 117 + if $hydrant_commit.cid != $pds_commit.cid or $hydrant_commit.rev != $pds_commit.rev { 118 + print " MISMATCH: commit differs!" 119 + return false 120 + } 121 + 122 + print " ok" 123 + true 124 + } 125 + 126 + # parse `goat repo inspect` output into a record 127 + def parse-goat-inspect []: string -> record { 128 + $in | lines | parse "{key}: {value}" | transpose -r -d 129 + } 130 + 131 + # fetch getRepo CARs from hydrant and pds and compare via `goat repo inspect` 132 + def check-car [hydrant_url: string, pds_url: string, did: string] { 133 + print "checking getRepo CAR..." 134 + 135 + let hydrant_car = try { 136 + http get $"($hydrant_url)/xrpc/com.atproto.sync.getRepo?did=($did)" 137 + } catch { 138 + print " error fetching CAR from hydrant" 139 + return false 140 + } 141 + 142 + let pds_car = try { 143 + http get $"($pds_url)/xrpc/com.atproto.sync.getRepo?did=($did)" 144 + } catch { 145 + print " error fetching CAR from pds" 146 + return false 147 + } 148 + 149 + let h_tmp = (mktemp --suffix ".car") 150 + let p_tmp = (mktemp --suffix ".car") 151 + $hydrant_car | save --force $h_tmp 152 + $pds_car | save --force $p_tmp 153 + 154 + print $"hydrant car: ($h_tmp)" 155 + print $"pds car: ($p_tmp)" 156 + 157 + let h_info = (nix-shell -p atproto-goat --run $"goat repo inspect ($h_tmp)" | parse-goat-inspect) 158 + let p_info = (nix-shell -p atproto-goat --run $"goat repo inspect ($p_tmp)" | parse-goat-inspect) 159 + rm $h_tmp $p_tmp 160 + 161 + print $" hydrant: data=($h_info.'Data CID') rev=($h_info.Revision)" 162 + print $" pds: data=($p_info.'Data CID') rev=($p_info.Revision)" 163 + 164 + if $h_info.'Data CID' != $p_info.'Data CID' or $h_info.Revision != $p_info.Revision { 165 + print " MISMATCH: CARs differ!" 166 + return false 167 + } 168 + 169 + print " ok" 170 + true 171 + } 172 + 96 173 def main [] { 97 174 let did = "did:plc:dfl62fgb7wtjj3fcbb72naae" 98 175 let pds = "https://zwsp.xyz" ··· 109 186 let instance = start-hydrant $binary $db_path $port 110 187 111 188 mut success = false 112 - 189 + 113 190 if (wait-for-api $url) { 114 191 # track the repo via API 115 192 print $"adding repo ($did) to tracking..." 116 193 http put -t application/json $"($url)/repos" [ { did: ($did) } ] 117 194 118 195 if (wait-for-backfill $url) { 119 - # Run both consistency checks 120 196 let integrity_passed = (check-consistency $url $pds $did) 121 197 let count_passed = (check-count $url $debug_url $did) 198 + let commit_passed = (check-latest-commit $url $pds $did) 199 + let car_passed = if $commit_passed { check-car $url $pds $did } else { false } 122 200 123 - if $integrity_passed and $count_passed { 201 + if $integrity_passed and $count_passed and $commit_passed and $car_passed { 124 202 print "all integrity checks passed!" 125 203 $success = true 126 204 } else { 127 - print $"integrity checks failed. consistency: ($integrity_passed), count: ($count_passed)" 205 + print $"integrity checks failed. consistency: ($integrity_passed), count: ($count_passed), commit: ($commit_passed), car: ($car_passed)" 128 206 } 129 207 } else { 130 208 print "backfill failed or timed out." ··· 136 214 let hydrant_pid = $instance.pid 137 215 print $"stopping hydrant - pid: ($hydrant_pid)..." 138 216 try { kill $hydrant_pid } 139 - 217 + 140 218 if not $success { exit 1 } 141 - } 219 + }