very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[api] separate the cursor delete endpoint

dawn 008dfd75 aec2e459

+44 -36
+7 -6
README.md
··· 215 215 (`persisted: false`), only the running task is stopped, the source will 216 216 reappear on the next restart since `CRAWLER_URLS` is re-applied at startup. 217 217 (unless you remove it manually from your configuration of course). 218 - - cursor state is not cleared. use `DELETE /cursors` separately if you want 218 + - cursor state is not cleared. use `DELETE /crawler/cursors` separately if you want 219 219 the source to restart from the beginning when re-added. 220 220 - returns `200 OK` if the source was found and removed, `404 Not Found` otherwise. 221 + - `DELETE /crawler/cursors`: reset stored cursors for a given crawler URL. body: `{ "key": "..." }` 222 + where key is a URL. clears the relay crawler cursor as well as any by-collection 223 + cursors associated with that URL. causes the next crawler pass to restart from the beginning. 221 224 222 225 ### firehose management 223 226 ··· 242 245 the database and will not reappear on restart. if it came from `RELAY_HOSTS` 243 246 (`persisted: false`), only the running task is stopped; the source reappears 244 247 on the next restart. 245 - - cursor state is not cleared. use `DELETE /cursors` separately if you want 248 + - cursor state is not cleared. use `DELETE /firehose/cursors` separately if you want 246 249 the relay to restart from the beginning when re-added. 247 250 - returns `200 OK` if the relay was found and removed, `404 Not Found` otherwise. 251 + - `DELETE /firehose/cursors`: reset the stored cursor for a given firehose relay URL. body: `{ "key": "..." }` 252 + where key is a URL. causes the next firehose connection to restart from the beginning. 248 253 249 254 ### repository management 250 255 ··· 269 274 - `POST /db/compact`: trigger a full major compaction of all database keyspaces 270 275 in parallel. the crawler, firehose, and backfill worker are paused for the 271 276 duration and restored on completion. 272 - - `DELETE /cursors`: reset all stored cursors for a given URL. body: `{ "key": "..." }` 273 - where key is a URL. clears both the firehose cursor and the relay crawler cursor, 274 - as well as any by-collection cursors associated with that URL. causes the next 275 - firehose connection and crawler pass to restart from the beginning. 276 277 277 278 ## data access (xrpc) 278 279
+18
src/api/crawler.rs
··· 15 15 .route("/crawler/sources", get(list_sources)) 16 16 .route("/crawler/sources", post(add_source)) 17 17 .route("/crawler/sources", delete(remove_source)) 18 + .route("/crawler/cursors", delete(reset_cursor)) 18 19 } 19 20 20 21 pub async fn list_sources(State(hydrant): State<Hydrant>) -> Json<Vec<CrawlerSourceInfo>> { ··· 62 63 }) 63 64 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 64 65 } 66 + 67 + #[derive(Deserialize)] 68 + pub struct ResetCursorBody { 69 + pub key: String, 70 + } 71 + 72 + pub async fn reset_cursor( 73 + State(hydrant): State<Hydrant>, 74 + Json(body): Json<ResetCursorBody>, 75 + ) -> Result<StatusCode, (StatusCode, String)> { 76 + hydrant 77 + .crawler 78 + .reset_cursor(&body.key) 79 + .await 80 + .map(|_| StatusCode::OK) 81 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 82 + }
+1 -30
src/api/db.rs
··· 1 1 use crate::control::Hydrant; 2 - use axum::{ 3 - Json, Router, 4 - extract::State, 5 - http::StatusCode, 6 - routing::{delete, post}, 7 - }; 8 - use serde::Deserialize; 2 + use axum::{Router, extract::State, http::StatusCode, routing::post}; 9 3 10 4 pub fn router() -> Router<Hydrant> { 11 5 Router::new() 12 6 .route("/db/train", post(handle_train_dict)) 13 7 .route("/db/compact", post(handle_compact)) 14 - .route("/cursors", delete(handle_reset_cursor)) 15 8 } 16 9 17 10 pub async fn handle_train_dict( ··· 20 13 hydrant 21 14 .db 22 15 .train_dicts() 23 - .await 24 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 25 - Ok(StatusCode::OK) 26 - } 27 - 28 - #[derive(Deserialize)] 29 - pub struct ResetCursorBody { 30 - pub key: String, 31 - } 32 - 33 - pub async fn handle_reset_cursor( 34 - State(hydrant): State<Hydrant>, 35 - Json(body): Json<ResetCursorBody>, 36 - ) -> Result<StatusCode, (StatusCode, String)> { 37 - hydrant 38 - .crawler 39 - .reset_cursor(&body.key) 40 - .await 41 - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 42 - hydrant 43 - .firehose 44 - .reset_cursor(&body.key) 45 16 .await 46 17 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 47 18 Ok(StatusCode::OK)
+18
src/api/firehose.rs
··· 14 14 .route("/firehose/sources", get(list_sources)) 15 15 .route("/firehose/sources", post(add_source)) 16 16 .route("/firehose/sources", delete(remove_source)) 17 + .route("/firehose/cursors", delete(reset_cursor)) 17 18 } 18 19 19 20 pub async fn list_sources(State(hydrant): State<Hydrant>) -> Json<Vec<FirehoseSourceInfo>> { ··· 57 58 }) 58 59 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 59 60 } 61 + 62 + #[derive(Deserialize)] 63 + pub struct ResetCursorBody { 64 + pub key: String, 65 + } 66 + 67 + pub async fn reset_cursor( 68 + State(hydrant): State<Hydrant>, 69 + Json(body): Json<ResetCursorBody>, 70 + ) -> Result<StatusCode, (StatusCode, String)> { 71 + hydrant 72 + .firehose 73 + .reset_cursor(&body.key) 74 + .await 75 + .map(|_| StatusCode::OK) 76 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 77 + }