very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib] add xrpc query methods to RepoHandle

dawn c737e0b5 6dd9a8ce

+269 -175
+2 -2
examples/statusphere.rs
··· 102 102 } 103 103 104 104 async fn handle_stream(index: Arc<StatusIndex>, repos: ReposControl, mut stream: EventStream) { 105 - // get handle of did through the hydrant api 105 + // get handle for did through the hydrant api 106 106 let get_handle = async |did: &Did<'_>| { 107 107 repos 108 - .get(did) 108 + .info(did) 109 109 .await 110 110 .ok() 111 111 .flatten()
+1 -1
src/api/repos.rs
··· 133 133 134 134 let item = hydrant 135 135 .repos 136 - .get(&did) 136 + .info(&did) 137 137 .await 138 138 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 139 139
+63 -154
src/api/xrpc.rs
··· 1 1 use crate::control::Hydrant; 2 - use crate::db::types::DbRkey; 3 - use crate::db::{self, Db, keys}; 4 - use crate::state::AppState; 5 2 use axum::extract::FromRequest; 6 3 use axum::response::IntoResponse; 7 4 use axum::{Json, Router, extract::State, http::StatusCode}; 8 - use futures::TryFutureExt; 9 5 use jacquard_api::com_atproto::repo::{ 10 6 get_record::{GetRecordError, GetRecordOutput, GetRecordRequest}, 11 7 list_records::{ListRecordsOutput, ListRecordsRequest, Record as RepoRecord}, 12 8 }; 13 - use jacquard_common::CowStr; 14 - use jacquard_common::cowstr::ToCowStr; 15 9 use jacquard_common::types::ident::AtIdentifier; 16 10 use jacquard_common::xrpc::{XrpcEndpoint, XrpcMethod}; 17 11 use jacquard_common::{IntoStatic, xrpc::XrpcRequest}; 18 12 use jacquard_common::{ 19 - types::{ 20 - string::{AtUri, Cid}, 21 - value::Data, 22 - }, 13 + types::string::AtUri, 23 14 xrpc::{GenericXrpcError, XrpcError}, 24 15 }; 25 - use miette::IntoDiagnostic; 26 16 use serde::{Deserialize, Serialize}; 27 17 use smol_str::ToSmolStr; 28 - use std::{fmt::Display, sync::Arc}; 29 - use tokio::task::spawn_blocking; 18 + use std::fmt::Display; 30 19 31 20 pub fn router() -> Router<Hydrant> { 32 21 Router::new() ··· 123 112 } 124 113 125 114 pub async fn handle_get_record( 126 - State(state): State<Arc<AppState>>, 115 + State(hydrant): State<Hydrant>, 127 116 ExtractXrpc(req): ExtractXrpc<GetRecordRequest>, 128 117 ) -> Result<Json<GetRecordOutput<'static>>, XrpcErrorResponse<GetRecordError<'static>>> { 129 - let db = &state.db; 130 - let did = state 131 - .resolver 132 - .resolve_did(&req.repo) 118 + let record = hydrant 119 + .repos 120 + .resolve(&req.repo) 133 121 .await 134 - .map_err(|e| bad_request(GetRecordRequest::PATH, e))?; 135 - 136 - let db_key = keys::record_key( 137 - &did, 138 - req.collection.as_str(), 139 - &DbRkey::new(req.rkey.0.as_str()), 140 - ); 141 - 142 - let cid_bytes = Db::get(db.records.clone(), db_key) 122 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 123 + .get_record(&req.collection, &req.rkey.0) 143 124 .await 144 125 .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 145 - 146 - if let Some(cid_bytes) = cid_bytes { 147 - // lookup block using col|cid key 148 - let block_key = keys::block_key(req.collection.as_str(), &cid_bytes); 149 - let block_bytes = Db::get(db.blocks.clone(), block_key) 150 - .await 151 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 152 - .ok_or_else(|| internal_error(GetRecordRequest::PATH, "not found"))?; 153 - 154 - let value: Data = serde_ipld_dagcbor::from_slice(&block_bytes) 155 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 156 - 157 - let cid = Cid::new(&cid_bytes) 158 - .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 159 - .into_static(); 160 - 161 - Ok(Json(GetRecordOutput { 162 - uri: AtUri::from_parts_owned( 163 - did.as_str(), 164 - req.collection.as_str(), 165 - req.rkey.0.as_str(), 166 - ) 167 - .unwrap(), 168 - cid: Some(Cid::Str(cid.to_cowstr()).into_static()), 169 - value: value.into_static(), 170 - extra_data: Default::default(), 171 - })) 172 - } else { 173 - Err(XrpcErrorResponse { 126 + let Some(record) = record else { 127 + return Err(XrpcErrorResponse { 174 128 status: StatusCode::NOT_FOUND, 175 129 error: XrpcError::Xrpc(GetRecordError::RecordNotFound(None)), 176 - }) 177 - } 130 + }); 131 + }; 132 + 133 + Ok(Json(GetRecordOutput { 134 + uri: AtUri::from_parts_owned( 135 + record.did.as_str(), 136 + req.collection.as_str(), 137 + req.rkey.0.as_str(), 138 + ) 139 + .unwrap(), 140 + cid: Some(record.cid), 141 + value: record.value, 142 + extra_data: Default::default(), 143 + })) 178 144 } 179 145 180 146 pub async fn handle_list_records( 181 - State(state): State<Arc<AppState>>, 147 + State(hydrant): State<Hydrant>, 182 148 ExtractXrpc(req): ExtractXrpc<ListRecordsRequest>, 183 149 ) -> Result<Json<ListRecordsOutput<'static>>, XrpcErrorResponse<GenericXrpcError>> { 184 - let db = &state.db; 185 - let did = state 186 - .resolver 187 - .resolve_did(&req.repo) 188 - .await 189 - .map_err(|e| bad_request(ListRecordsRequest::PATH, e))?; 190 - 191 - let ks = db.records.clone(); 192 - 193 - let prefix = keys::record_prefix_collection(&did, req.collection.as_str()); 194 - 195 150 let limit = req.limit.unwrap_or(50).min(100) as usize; 196 151 let reverse = req.reverse.unwrap_or(false); 197 - let blocks_ks = db.blocks.clone(); 152 + let cursor = req.cursor.as_deref(); 198 153 199 - let (results, cursor) = tokio::task::spawn_blocking(move || { 200 - let mut results = Vec::new(); 201 - let mut cursor = None; 154 + let repo = hydrant 155 + .repos 156 + .resolve(&req.repo) 157 + .await 158 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))?; 159 + let list = repo 160 + .list_records(req.collection.as_str(), limit, reverse, cursor) 161 + .await 162 + .map_err(|e| bad_request(ListRecordsRequest::PATH, e))?; 202 163 203 - let iter: Box<dyn Iterator<Item = _>> = if !reverse { 204 - let mut end_prefix = prefix.clone(); 205 - if let Some(last) = end_prefix.last_mut() { 206 - *last += 1; 207 - } 208 - 209 - let end_key = if let Some(cursor) = &req.cursor { 210 - let mut k = prefix.clone(); 211 - k.extend_from_slice(cursor.as_bytes()); 212 - k 213 - } else { 214 - end_prefix 215 - }; 216 - 217 - Box::new(ks.range(prefix.as_slice()..end_key.as_slice()).rev()) 218 - } else { 219 - let start_key = if let Some(cursor) = &req.cursor { 220 - let mut k = prefix.clone(); 221 - k.extend_from_slice(cursor.as_bytes()); 222 - k.push(0); 223 - k 224 - } else { 225 - prefix.clone() 226 - }; 227 - 228 - Box::new(ks.range(start_key.as_slice()..)) 229 - }; 230 - 231 - for item in iter { 232 - let (key, cid_bytes) = item.into_inner().into_diagnostic()?; 233 - 234 - if !key.starts_with(prefix.as_slice()) { 235 - break; 236 - } 237 - 238 - let rkey = keys::parse_rkey(&key[prefix.len()..])?; 239 - if results.len() >= limit { 240 - cursor = Some(rkey); 241 - break; 242 - } 243 - 244 - // look up using col|cid key built from collection and binary cid bytes from the record 245 - if let Ok(Some(block_bytes)) = 246 - blocks_ks.get(&keys::block_key(req.collection.as_str(), &cid_bytes)) 247 - { 248 - let val: Data = serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 249 - let cid = 250 - Cid::Str(Cid::new(&cid_bytes).into_diagnostic()?.to_cowstr()).into_static(); 251 - results.push(RepoRecord { 252 - uri: AtUri::from_parts_owned( 253 - did.as_str(), 254 - req.collection.as_str(), 255 - rkey.to_smolstr(), 256 - ) 257 - .into_diagnostic()?, 258 - cid, 259 - value: val.into_static(), 260 - extra_data: Default::default(), 261 - }); 262 - } 263 - } 264 - Result::<_, miette::Report>::Ok((results, cursor)) 265 - }) 266 - .await 267 - .map_err(|e| internal_error(ListRecordsRequest::PATH, e))? 268 - .map_err(|e| internal_error(ListRecordsRequest::PATH, e))?; 164 + let records = list 165 + .records 166 + .into_iter() 167 + .filter_map(|r| { 168 + let uri = AtUri::from_parts_owned( 169 + repo.did.as_str(), 170 + req.collection.as_str(), 171 + r.rkey.as_str(), 172 + ) 173 + .ok()?; 174 + Some(RepoRecord { 175 + uri, 176 + cid: r.cid, 177 + value: r.value, 178 + extra_data: Default::default(), 179 + }) 180 + }) 181 + .collect(); 269 182 270 183 Ok(Json(ListRecordsOutput { 271 - records: results, 272 - cursor: cursor.map(|c| CowStr::Owned(c.to_smolstr())), 184 + records, 185 + cursor: list.cursor.map(|r| r.into()), 273 186 extra_data: Default::default(), 274 187 })) 275 188 } ··· 309 222 } 310 223 311 224 pub async fn handle_count_records( 312 - State(state): State<Arc<AppState>>, 225 + State(hydrant): State<Hydrant>, 313 226 ExtractXrpc(req): ExtractXrpc<CountRecords>, 314 227 ) -> XrpcResult<Json<CountRecordsOutput>> { 315 - let did = state 316 - .resolver 317 - .resolve_did(&req.identifier) 228 + let count = hydrant 229 + .repos 230 + .resolve(&req.identifier) 318 231 .await 319 - .map_err(|e| bad_request(CountRecords::PATH, e))?; 320 - 321 - let count = spawn_blocking(move || { 322 - db::get_record_count(&state.db, &did, &req.collection) 323 - .map_err(|e| internal_error(CountRecords::PATH, e)) 324 - }) 325 - .map_err(|e| internal_error(CountRecords::PATH, e)) 326 - .await??; 232 + .map_err(|e| internal_error(GetRecordRequest::PATH, e))? 233 + .count_records(&req.collection) 234 + .await 235 + .map_err(|e| internal_error(CountRecords::PATH, e))?; 327 236 328 237 Ok(Json(CountRecordsOutput { count })) 329 238 }
+203 -18
src/control.rs
··· 8 8 use chrono::{DateTime, Utc}; 9 9 use futures::{FutureExt, Stream}; 10 10 use jacquard_common::cowstr::ToCowStr; 11 - use jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}; 11 + use jacquard_common::types::cid::{ATP_CID_HASH, Cid, IpldCid}; 12 + use jacquard_common::types::ident::AtIdentifier; 12 13 use jacquard_common::types::nsid::Nsid; 13 14 use jacquard_common::types::string::{Did, Handle, Rkey}; 14 15 use jacquard_common::types::tid::Tid; 15 - use jacquard_common::{CowStr, IntoStatic, RawData}; 16 + use jacquard_common::{CowStr, Data, IntoStatic, RawData}; 16 17 use jacquard_repo::DAG_CBOR_CID_CODEC; 17 18 use miette::{IntoDiagnostic, Result}; 18 19 use rand::Rng; 19 20 use sha2::{Digest, Sha256}; 21 + use smol_str::ToSmolStr; 20 22 use tokio::sync::{mpsc, watch}; 21 23 use tracing::{debug, error, info}; 22 24 use url::Url; 23 25 24 26 use crate::backfill::BackfillWorker; 25 27 use crate::config::{Config, SignatureVerification}; 28 + use crate::db::types::DbRkey; 26 29 use crate::db::{self, filter as db_filter, keys, ser_repo_state}; 27 30 use crate::filter::{FilterMode, SetUpdate}; 28 31 use crate::ingest::{firehose::FirehoseIngestor, worker::FirehoseWorker}; ··· 1093 1096 pub struct ReposControl(Arc<AppState>); 1094 1097 1095 1098 impl ReposControl { 1096 - /// fetch the current state of a single repository. returns `None` if hydrant 1097 - /// has never seen this DID. 1098 - pub async fn get(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> { 1099 - let did_key = keys::repo_key(did); 1100 - let db = self.0.db.clone(); 1101 - let did = did.clone().into_static(); 1099 + /// gets a handle for a repository to allow acting upon it. 1100 + pub fn get<'i>(&self, did: &Did<'i>) -> Result<RepoHandle<'i>> { 1101 + Ok(RepoHandle { 1102 + state: self.0.clone(), 1103 + did: did.clone(), 1104 + }) 1105 + } 1102 1106 1103 - tokio::task::spawn_blocking(move || { 1104 - let bytes = db.repos.get(&did_key).into_diagnostic()?; 1105 - let state = bytes.as_deref().map(db::deser_repo_state).transpose()?; 1106 - Ok(state.map(|s| repo_state_to_info(did, s))) 1107 + /// same as [`ReposControl::get`] but allows you to pass in an identifier that can be 1108 + /// either a handle or a DID. 1109 + pub async fn resolve(&self, repo: &AtIdentifier<'_>) -> Result<RepoHandle<'static>> { 1110 + let did = self.0.resolver.resolve_did(repo).await?; 1111 + Ok(RepoHandle { 1112 + state: self.0.clone(), 1113 + did, 1107 1114 }) 1108 - .await 1109 - .into_diagnostic()? 1115 + } 1116 + 1117 + /// fetch the current state of a single repository. returns `None` if hydrant 1118 + /// has never seen this DID. 1119 + pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> { 1120 + self.get(did)?.info().await 1110 1121 } 1111 1122 1112 1123 /// explicitly track one or more repositories, enqueuing them for backfill if needed. ··· 1228 1239 } 1229 1240 } 1230 1241 1231 - pub fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>) -> RepoInfo { 1242 + pub(crate) fn repo_state_to_info(did: Did<'static>, s: RepoState<'_>) -> RepoInfo { 1232 1243 RepoInfo { 1233 1244 did, 1234 1245 status: s.status, ··· 1242 1253 last_message_at: s.last_message_time.and_then(DateTime::from_timestamp_secs), 1243 1254 } 1244 1255 } 1245 - 1246 - // --- db control --- 1247 1256 1248 1257 /// control over database maintenance operations. 1249 1258 /// ··· 1285 1294 } 1286 1295 } 1287 1296 1288 - // --- stream thread --- 1297 + pub struct Record { 1298 + pub did: Did<'static>, 1299 + pub cid: Cid<'static>, 1300 + pub value: Data<'static>, 1301 + } 1302 + 1303 + pub struct ListedRecord { 1304 + pub rkey: Rkey<'static>, 1305 + pub cid: Cid<'static>, 1306 + pub value: Data<'static>, 1307 + } 1308 + 1309 + pub struct RecordList { 1310 + pub records: Vec<ListedRecord>, 1311 + pub cursor: Option<Rkey<'static>>, 1312 + } 1313 + 1314 + /// handle to access data related to this repository. 1315 + #[derive(Clone)] 1316 + pub struct RepoHandle<'i> { 1317 + state: Arc<AppState>, 1318 + pub did: Did<'i>, 1319 + } 1320 + 1321 + impl<'i> RepoHandle<'i> { 1322 + pub async fn info(&self) -> Result<Option<RepoInfo>> { 1323 + let did_key = keys::repo_key(&self.did); 1324 + let state = self.state.clone(); 1325 + let did = self.did.clone().into_static(); 1326 + 1327 + tokio::task::spawn_blocking(move || { 1328 + let bytes = state.db.repos.get(&did_key).into_diagnostic()?; 1329 + let state = bytes.as_deref().map(db::deser_repo_state).transpose()?; 1330 + Ok(state.map(|s| repo_state_to_info(did, s))) 1331 + }) 1332 + .await 1333 + .into_diagnostic()? 1334 + } 1335 + 1336 + pub async fn get_record(&self, collection: &str, rkey: &str) -> Result<Option<Record>> { 1337 + let did = self.did.clone().into_static(); 1338 + let db_key = keys::record_key(&did, collection, &DbRkey::new(rkey)); 1339 + 1340 + let collection = collection.to_smolstr(); 1341 + let state = self.state.clone(); 1342 + tokio::task::spawn_blocking(move || { 1343 + use miette::WrapErr; 1344 + 1345 + let cid_bytes = state.db.records.get(db_key).into_diagnostic()?; 1346 + let Some(cid_bytes) = cid_bytes else { 1347 + return Ok(None); 1348 + }; 1349 + 1350 + // lookup block using col|cid key 1351 + let block_key = keys::block_key(&collection, &cid_bytes); 1352 + let Some(block_bytes) = state.db.blocks.get(block_key).into_diagnostic()? else { 1353 + miette::bail!("block {cid_bytes:?} not found, this is a bug!!"); 1354 + }; 1355 + 1356 + let value = serde_ipld_dagcbor::from_slice::<Data>(&block_bytes) 1357 + .into_diagnostic() 1358 + .wrap_err("cant parse block")? 1359 + .into_static(); 1360 + let cid = Cid::new(&cid_bytes) 1361 + .into_diagnostic() 1362 + .wrap_err("cant parse block cid")? 1363 + .into_static(); 1364 + 1365 + Ok(Some(Record { did, cid, value })) 1366 + }) 1367 + .await 1368 + .into_diagnostic()? 1369 + } 1370 + 1371 + pub async fn list_records( 1372 + &self, 1373 + collection: &str, 1374 + limit: usize, 1375 + reverse: bool, 1376 + cursor: Option<&str>, 1377 + ) -> Result<RecordList> { 1378 + let did = self.did.clone().into_static(); 1379 + 1380 + let state = self.state.clone(); 1381 + let prefix = keys::record_prefix_collection(&did, collection); 1382 + let collection = collection.to_smolstr(); 1383 + let cursor = cursor.map(|c| c.to_smolstr()); 1384 + 1385 + tokio::task::spawn_blocking(move || { 1386 + let mut results = Vec::new(); 1387 + let mut next_cursor = None; 1388 + 1389 + let iter: Box<dyn Iterator<Item = _>> = if !reverse { 1390 + let mut end_prefix = prefix.clone(); 1391 + if let Some(last) = end_prefix.last_mut() { 1392 + *last += 1; 1393 + } 1394 + 1395 + let end_key = if let Some(cursor) = &cursor { 1396 + let mut k = prefix.clone(); 1397 + k.extend_from_slice(cursor.as_bytes()); 1398 + k 1399 + } else { 1400 + end_prefix 1401 + }; 1402 + 1403 + Box::new( 1404 + state 1405 + .db 1406 + .records 1407 + .range(prefix.as_slice()..end_key.as_slice()) 1408 + .rev(), 1409 + ) 1410 + } else { 1411 + let start_key = if let Some(cursor) = &cursor { 1412 + let mut k = prefix.clone(); 1413 + k.extend_from_slice(cursor.as_bytes()); 1414 + k.push(0); 1415 + k 1416 + } else { 1417 + prefix.clone() 1418 + }; 1419 + 1420 + Box::new(state.db.records.range(start_key.as_slice()..)) 1421 + }; 1422 + 1423 + for item in iter { 1424 + let (key, cid_bytes) = item.into_inner().into_diagnostic()?; 1425 + 1426 + if !key.starts_with(prefix.as_slice()) { 1427 + break; 1428 + } 1429 + 1430 + let rkey = keys::parse_rkey(&key[prefix.len()..])?; 1431 + if results.len() >= limit { 1432 + next_cursor = Some(rkey); 1433 + break; 1434 + } 1435 + 1436 + // look up using col|cid key built from collection and binary cid bytes 1437 + if let Ok(Some(block_bytes)) = state 1438 + .db 1439 + .blocks 1440 + .get(&keys::block_key(collection.as_str(), &cid_bytes)) 1441 + { 1442 + let value: Data = 1443 + serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 1444 + let cid = Cid::new(&cid_bytes).into_diagnostic()?.into_static(); 1445 + results.push(ListedRecord { 1446 + rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())) 1447 + .expect("that rkey is validated"), 1448 + cid, 1449 + value: value.into_static(), 1450 + }); 1451 + } 1452 + } 1453 + Result::<_, miette::Report>::Ok((results, next_cursor)) 1454 + }) 1455 + .await 1456 + .into_diagnostic()? 1457 + .map(|(records, next_cursor)| RecordList { 1458 + records, 1459 + cursor: next_cursor.map(|rkey| { 1460 + Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())).expect("that rkey is validated") 1461 + }), 1462 + }) 1463 + } 1464 + 1465 + pub async fn count_records(&self, collection: &str) -> Result<u64> { 1466 + let did = self.did.clone().into_static(); 1467 + let state = self.state.clone(); 1468 + let collection = collection.to_string(); 1469 + tokio::task::spawn_blocking(move || db::get_record_count(&state.db, &did, &collection)) 1470 + .await 1471 + .into_diagnostic()? 1472 + } 1473 + } 1289 1474 1290 1475 fn event_stream_thread(state: Arc<AppState>, tx: mpsc::Sender<Event>, cursor: Option<u64>) { 1291 1476 let db = &state.db;