very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] implement request resync method, return the list of repos that were acted upon in mutating endpoints

dawn 41c076cf 008dfd75

+144 -39
+10 -1
README.md
··· 263 263 also returns the handle, PDS URL and the atproto signing key (these won't be 264 264 available before the repo has been backfilled once at least). 265 265 - `PUT /repos`: explicitly track repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 266 - - `DELETE /repos`: untrack repositories. accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 266 + only affects repositories that are not known or are untracked. 267 + returns a JSON array of the DIDs that were tracked. 268 + - `DELETE /repos`: untrack repositories. 269 + accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 270 + only affects repositories that are currently tracked. 271 + returns a JSON array of the DIDs that were untracked. 272 + - `POST /repos/resync`: force a new backfill for one or more repositories. 273 + accepts an NDJSON body of `{"did": "..."}` (or JSON array of the same). 274 + only affects repositories hydrant already knows about. 275 + returns a JSON array of the DIDs that were queued. 267 276 268 277 ### database operations 269 278
+28 -7
src/api/repos.rs
··· 6 6 extract::{Path, Query, State}, 7 7 http::{StatusCode, header}, 8 8 response::{IntoResponse, Response}, 9 - routing::{delete, get, put}, 9 + routing::{delete, get, post, put}, 10 10 }; 11 11 use jacquard_common::types::did::Did; 12 12 use serde::Deserialize; ··· 14 14 pub fn router() -> Router<Hydrant> { 15 15 Router::new() 16 16 .route("/repos", get(handle_get_repos)) 17 + .route("/repos/resync", post(handle_post_resync)) 17 18 .route("/repos/{did}", get(handle_get_repo)) 18 19 .route("/repos", put(handle_put_repos)) 19 20 .route("/repos", delete(handle_delete_repos)) ··· 144 145 pub async fn handle_put_repos( 145 146 State(hydrant): State<Hydrant>, 146 147 req: axum::extract::Request, 147 - ) -> Result<StatusCode, (StatusCode, String)> { 148 + ) -> Result<Json<Vec<String>>, (StatusCode, String)> { 148 149 let items = parse_body(req).await?; 149 150 150 151 let dids: Vec<Did<'static>> = items ··· 152 153 .filter_map(|item| Did::new_owned(&item.did).ok()) 153 154 .collect(); 154 155 155 - hydrant 156 + let queued = hydrant 156 157 .repos 157 158 .track(dids) 158 159 .await 159 160 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 160 161 161 - Ok(StatusCode::OK) 162 + Ok(Json(queued.into_iter().map(|d| d.to_string()).collect())) 162 163 } 163 164 164 165 pub async fn handle_delete_repos( 165 166 State(hydrant): State<Hydrant>, 166 167 req: axum::extract::Request, 167 - ) -> Result<StatusCode, (StatusCode, String)> { 168 + ) -> Result<Json<Vec<String>>, (StatusCode, String)> { 168 169 let items = parse_body(req).await?; 169 170 170 171 let dids: Vec<Did<'static>> = items ··· 172 173 .filter_map(|item| Did::new_owned(&item.did).ok()) 173 174 .collect(); 174 175 175 - hydrant 176 + let untracked = hydrant 176 177 .repos 177 178 .untrack(dids) 178 179 .await 179 180 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 180 181 181 - Ok(StatusCode::OK) 182 + Ok(Json(untracked.into_iter().map(|d| d.to_string()).collect())) 183 + } 184 + 185 + pub async fn handle_post_resync( 186 + State(hydrant): State<Hydrant>, 187 + req: axum::extract::Request, 188 + ) -> Result<Json<Vec<String>>, (StatusCode, String)> { 189 + let items = parse_body(req).await?; 190 + 191 + let dids: Vec<Did<'static>> = items 192 + .into_iter() 193 + .filter_map(|item| Did::new_owned(&item.did).ok()) 194 + .collect(); 195 + 196 + let queued = hydrant 197 + .repos 198 + .resync(dids) 199 + .await 200 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 201 + 202 + Ok(Json(queued.into_iter().map(|d| d.to_string()).collect())) 182 203 } 183 204 184 205 async fn parse_body(req: axum::extract::Request) -> Result<Vec<RepoRequest>, (StatusCode, String)> {
+106 -31
src/control/repos.rs
··· 1 1 use std::sync::Arc; 2 2 3 3 use chrono::{DateTime, Utc}; 4 + use fjall::OwnedWriteBatch; 4 5 use jacquard_common::cowstr::ToCowStr; 5 6 use jacquard_common::types::cid::{Cid, IpldCid}; 6 7 use jacquard_common::types::ident::AtIdentifier; ··· 13 14 use url::Url; 14 15 15 16 use crate::db::types::DbRkey; 16 - use crate::db::{self, keys, ser_repo_state}; 17 + use crate::db::{self, Db, keys, ser_repo_state}; 17 18 use crate::state::AppState; 18 19 use crate::types::{GaugeState, RepoState, RepoStatus}; 19 20 ··· 84 85 }) 85 86 } 86 87 87 - /// fetch the current state of a single repository. returns `None` if hydrant 88 - /// has never seen this DID. 88 + /// fetch the current state of repository. 89 + /// returns `None` if hydrant has never seen this repository. 89 90 pub async fn info(&self, did: &Did<'_>) -> Result<Option<RepoInfo>> { 90 91 self.get(did)?.info().await 91 92 } 92 93 94 + fn _resync( 95 + db: &Db, 96 + did: &Did<'_>, 97 + batch: &mut OwnedWriteBatch, 98 + transitions: &mut Vec<(GaugeState, GaugeState)>, 99 + ) -> Result<bool> { 100 + let did_key = keys::repo_key(did); 101 + let repo_bytes = db.repos.get(&did_key).into_diagnostic()?; 102 + let existing = repo_bytes 103 + .as_deref() 104 + .map(db::deser_repo_state) 105 + .transpose()?; 106 + 107 + if let Some(mut repo_state) = existing { 108 + let resync = db.resync.get(&did_key).into_diagnostic()?; 109 + let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref()); 110 + repo_state.tracked = true; 111 + repo_state.status = RepoStatus::Backfilling; 112 + batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?); 113 + batch.insert( 114 + &db.pending, 115 + keys::pending_key(repo_state.index_id), 116 + &did_key, 117 + ); 118 + batch.remove(&db.resync, &did_key); 119 + transitions.push((old, GaugeState::Pending)); 120 + return Ok(true); 121 + } 122 + 123 + Ok(false) 124 + } 125 + 126 + /// request one or more repositories to be resynced. 127 + /// 128 + /// note that they may not immediately start backfilling if: 129 + /// - other repos already filled the backfill concurrency limit, 130 + /// - or there are many repos pending already. 131 + pub async fn resync( 132 + &self, 133 + dids: impl IntoIterator<Item = Did<'_>>, 134 + ) -> Result<Vec<Did<'static>>> { 135 + let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 136 + let state = self.0.clone(); 137 + 138 + let (queued, transitions) = tokio::task::spawn_blocking(move || { 139 + let db = &state.db; 140 + let mut batch = db.inner.batch(); 141 + let mut queued: Vec<Did<'static>> = Vec::new(); 142 + let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 143 + 144 + for did in dids { 145 + if Self::_resync(db, &did, &mut batch, &mut transitions)? { 146 + queued.push(did); 147 + } 148 + } 149 + 150 + batch.commit().into_diagnostic()?; 151 + Ok::<_, miette::Report>((queued, transitions)) 152 + }) 153 + .await 154 + .into_diagnostic()??; 155 + 156 + for (old, new) in transitions { 157 + self.0.db.update_gauge_diff_async(&old, &new).await; 158 + } 159 + if !queued.is_empty() { 160 + self.0.notify_backfill(); 161 + } 162 + 163 + Ok(queued) 164 + } 165 + 93 166 /// explicitly track one or more repositories, enqueuing them for backfill if needed. 94 167 /// 95 - /// - if a DID is new, a fresh [`RepoState`] is created and backfill is queued. 96 - /// - if a DID is already known but untracked, it is marked tracked and re-enqueued. 97 - /// - if a DID is already tracked, this is a no-op. 98 - pub async fn track(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> { 168 + /// - if a repo is new, a fresh [`RepoState`] is created and backfill is queued. 169 + /// - if a repo is already known but untracked, it is marked tracked and re-enqueued. 170 + /// - if a repo is already tracked, this is a no-op. 171 + pub async fn track( 172 + &self, 173 + dids: impl IntoIterator<Item = Did<'_>>, 174 + ) -> Result<Vec<Did<'static>>> { 99 175 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 100 176 let state = self.0.clone(); 101 177 102 - let (new_count, transitions) = tokio::task::spawn_blocking(move || { 178 + let (new_count, queued, transitions) = tokio::task::spawn_blocking(move || { 103 179 let db = &state.db; 104 180 let mut batch = db.inner.batch(); 105 181 let mut added = 0i64; 182 + let mut queued: Vec<Did<'static>> = Vec::new(); 106 183 let mut transitions: Vec<(GaugeState, GaugeState)> = Vec::new(); 107 184 let mut rng = rand::rng(); 108 185 109 - for did in &dids { 110 - let did_key = keys::repo_key(did); 186 + for did in dids { 187 + let did_key = keys::repo_key(&did); 111 188 let repo_bytes = db.repos.get(&did_key).into_diagnostic()?; 112 189 let existing = repo_bytes 113 190 .as_deref() 114 191 .map(db::deser_repo_state) 115 192 .transpose()?; 116 193 117 - if let Some(mut repo_state) = existing { 118 - if !repo_state.tracked { 119 - let resync = db.resync.get(&did_key).into_diagnostic()?; 120 - let old = db::Db::repo_gauge_state(&repo_state, resync.as_deref()); 121 - repo_state.tracked = true; 122 - batch.insert(&db.repos, &did_key, ser_repo_state(&repo_state)?); 123 - batch.insert( 124 - &db.pending, 125 - keys::pending_key(repo_state.index_id), 126 - &did_key, 127 - ); 128 - batch.remove(&db.resync, &did_key); 129 - transitions.push((old, GaugeState::Pending)); 194 + if let Some(repo_state) = existing { 195 + // the double read here is an ok tradeoff, the block will be in read-cache anyway 196 + if !repo_state.tracked && Self::_resync(db, &did, &mut batch, &mut transitions)? 197 + { 198 + queued.push(did); 130 199 } 131 200 } else { 132 201 let repo_state = RepoState::backfilling(rng.next_u64()); ··· 137 206 &did_key, 138 207 ); 139 208 added += 1; 209 + queued.push(did); 140 210 transitions.push((GaugeState::Synced, GaugeState::Pending)); 141 211 } 142 212 } 143 213 144 214 batch.commit().into_diagnostic()?; 145 - Ok::<_, miette::Report>((added, transitions)) 215 + Ok::<_, miette::Report>((added, queued, transitions)) 146 216 }) 147 217 .await 148 218 .into_diagnostic()??; ··· 154 224 self.0.db.update_gauge_diff_async(&old, &new).await; 155 225 } 156 226 self.0.notify_backfill(); 157 - Ok(()) 227 + Ok(queued) 158 228 } 159 229 160 230 /// stop tracking one or more repositories. hydrant will stop processing new events 161 231 /// for them and remove them from the pending/resync queues, but existing indexed 162 232 /// records are **not** deleted. 163 - pub async fn untrack(&self, dids: impl IntoIterator<Item = Did<'_>>) -> Result<()> { 233 + pub async fn untrack( 234 + &self, 235 + dids: impl IntoIterator<Item = Did<'_>>, 236 + ) -> Result<Vec<Did<'static>>> { 164 237 let dids: Vec<Did<'static>> = dids.into_iter().map(|d| d.into_static()).collect(); 165 238 let state = self.0.clone(); 166 239 167 - let gauge_decrements = tokio::task::spawn_blocking(move || { 240 + let (untracked, gauge_decrements) = tokio::task::spawn_blocking(move || { 168 241 let db = &state.db; 169 242 let mut batch = db.inner.batch(); 243 + let mut untracked: Vec<Did<'static>> = Vec::new(); 170 244 let mut gauge_decrements = Vec::new(); 171 245 172 - for did in &dids { 173 - let did_key = keys::repo_key(did); 246 + for did in dids { 247 + let did_key = keys::repo_key(&did); 174 248 let repo_bytes = db.repos.get(&did_key).into_diagnostic()?; 175 249 let existing = repo_bytes 176 250 .as_deref() ··· 189 263 if old != GaugeState::Synced { 190 264 gauge_decrements.push(old); 191 265 } 266 + untracked.push(did); 192 267 } 193 268 } 194 269 } 195 270 196 271 batch.commit().into_diagnostic()?; 197 - Ok::<_, miette::Report>(gauge_decrements) 272 + Ok::<_, miette::Report>((untracked, gauge_decrements)) 198 273 }) 199 274 .await 200 275 .into_diagnostic()??; ··· 205 280 .update_gauge_diff_async(&gauge, &GaugeState::Synced) 206 281 .await; 207 282 } 208 - Ok(()) 283 + Ok(untracked) 209 284 } 210 285 } 211 286