very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] implement runtime firehose management

dawn 71a42799 a3658e24

+492 -97
+28 -2
README.md
··· 155 155 the source to restart from the beginning when re-added. 156 156 - returns `200 OK` if the source was found and removed, `404 Not Found` otherwise. 157 157 158 + #### firehose source management 159 + 160 + - `GET /firehose/sources`: list all currently active firehose relay sources. 161 + - returns a JSON array of `{ "url": string, "persisted": bool }`. 162 + - `persisted: true` means the source was added via the API and is stored in the 163 + database, it will survive a restart. `persisted: false` means the source 164 + came from `RELAY_HOSTS` and is not written to the database. 165 + - `POST /firehose/sources`: add a firehose relay at runtime. 166 + - body: `{ "url": string }`. 167 + - the source is persisted to the database before the ingestor task is started. 168 + - if a relay with the same URL already exists, it is replaced: the running 169 + task is stopped and a new one is started. any existing cursor state for that 170 + URL is preserved. 171 + - returns `201 Created` on success. 172 + - `DELETE /firehose/sources`: remove a firehose relay at runtime. 173 + - body: `{ "url": string }`. 174 + - the ingestor task is stopped immediately. 175 + - if the source was added via the API (`persisted: true`), it is removed from 176 + the database and will not reappear on restart. if it came from `RELAY_HOSTS` 177 + (`persisted: false`), only the running task is stopped; the source reappears 178 + on the next restart. 179 + - cursor state is not cleared. use `DELETE /cursors` separately if you want 180 + the relay to restart from the beginning when re-added. 181 + - returns `200 OK` if the relay was found and removed, `404 Not Found` otherwise. 182 + 158 183 #### database operations 159 184 160 185 - `POST /db/train`: train zstd compression dictionaries for the `repos`, ··· 165 190 in parallel. the crawler, firehose, and backfill worker are paused for the 166 191 duration and restored on completion. 167 192 - `DELETE /cursors`: reset all stored cursors for a given URL. body: `{ "key": "..." }` 168 - where key is a URL. clears the relay crawler cursor, and any by-collection cursors 169 - associated with that URL. causes the next crawler pass to restart from the beginning. 193 + where key is a URL. clears both the firehose cursor and the relay crawler cursor, 194 + as well as any by-collection cursors associated with that URL. causes the next 195 + firehose connection and crawler pass to restart from the beginning. 170 196 171 197 #### filter mode 172 198
+5
src/api/db.rs
··· 39 39 .reset_cursor(&body.key) 40 40 .await 41 41 .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 42 + hydrant 43 + .firehose 44 + .reset_cursor(&body.key) 45 + .await 46 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; 42 47 Ok(StatusCode::OK) 43 48 } 44 49
+59
src/api/firehose.rs
··· 1 + use axum::{ 2 + Json, Router, 3 + extract::State, 4 + http::StatusCode, 5 + routing::{delete, get, post}, 6 + }; 7 + use serde::Deserialize; 8 + use url::Url; 9 + 10 + use crate::control::{FirehoseSourceInfo, Hydrant}; 11 + 12 + pub fn router() -> Router<Hydrant> { 13 + Router::new() 14 + .route("/firehose/sources", get(list_sources)) 15 + .route("/firehose/sources", post(add_source)) 16 + .route("/firehose/sources", delete(remove_source)) 17 + } 18 + 19 + pub async fn list_sources(State(hydrant): State<Hydrant>) -> Json<Vec<FirehoseSourceInfo>> { 20 + Json(hydrant.firehose.list_sources().await) 21 + } 22 + 23 + #[derive(Deserialize)] 24 + pub struct AddSourceRequest { 25 + pub url: Url, 26 + } 27 + 28 + pub async fn add_source( 29 + State(hydrant): State<Hydrant>, 30 + Json(body): Json<AddSourceRequest>, 31 + ) -> Result<StatusCode, (StatusCode, String)> { 32 + hydrant 33 + .firehose 34 + .add_source(body.url) 35 + .await 36 + .map(|_| StatusCode::CREATED) 37 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 38 + } 39 + 40 + #[derive(Deserialize)] 41 + pub struct RemoveSourceRequest { 42 + pub url: Url, 43 + } 44 + 45 + pub async fn remove_source( 46 + State(hydrant): State<Hydrant>, 47 + Json(body): Json<RemoveSourceRequest>, 48 + ) -> Result<StatusCode, (StatusCode, String)> { 49 + hydrant 50 + .firehose 51 + .remove_source(&body.url) 52 + .await 53 + .map(|found| { 54 + found 55 + .then_some(StatusCode::OK) 56 + .unwrap_or(StatusCode::NOT_FOUND) 57 + }) 58 + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) 59 + }
+2
src/api/mod.rs
··· 9 9 mod db; 10 10 mod debug; 11 11 mod filter; 12 + mod firehose; 12 13 mod ingestion; 13 14 mod repos; 14 15 mod stats; ··· 26 27 .merge(repos::router()) 27 28 .merge(ingestion::router()) 28 29 .merge(crawler::router()) 30 + .merge(firehose::router()) 29 31 .merge(db::router()); 30 32 31 33 #[cfg(feature = "backlinks")]
+20 -17
src/config.rs
··· 99 99 } 100 100 101 101 impl CrawlerSource { 102 - /// parse `[mode::]url` — mode prefix is optional, falls back to `default_mode`. 102 + /// parse `[mode::]url`. mode prefix is optional, falls back to `default_mode`. 103 103 fn parse(s: &str, default_mode: CrawlerMode) -> Option<Self> { 104 104 if let Some((prefix, rest)) = s.split_once("::") { 105 105 let mode = prefix.parse().ok()?; ··· 391 391 .then(Self::full_network) 392 392 .unwrap_or_else(Self::default); 393 393 394 - let relay_host: Url = cfg!("RELAY_HOST", defaults.relays[0].clone()); 395 - let relay_hosts = std::env::var("HYDRANT_RELAY_HOSTS") 396 - .ok() 397 - .and_then(|hosts| { 398 - hosts 399 - .split(',') 400 - .map(|s| Url::parse(s.trim())) 401 - .collect::<Result<Vec<_>, _>>() 402 - .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}")) 403 - .ok() 404 - }) 405 - .unwrap_or_default(); 406 - let relay_hosts = relay_hosts 407 - .is_empty() 408 - .then(|| vec![relay_host]) 409 - .unwrap_or(relay_hosts); 394 + let relay_hosts = match std::env::var("HYDRANT_RELAY_HOSTS") { 395 + Ok(hosts) if !hosts.trim().is_empty() => hosts 396 + .split(',') 397 + .filter_map(|s| { 398 + let s = s.trim(); 399 + (!s.is_empty()) 400 + .then(|| { 401 + Url::parse(s) 402 + .inspect_err(|e| tracing::warn!("invalid relay host URL: {e}")) 403 + .ok() 404 + }) 405 + .flatten() 406 + }) 407 + .collect(), 408 + // HYDRANT_RELAY_HOSTS explicitly set to "" 409 + Ok(_) => vec![], 410 + // not set at all, fall back to RELAY_HOST 411 + Err(_) => vec![cfg!("RELAY_HOST", defaults.relays[0].clone())], 412 + }; 410 413 411 414 let plc_urls: Vec<Url> = std::env::var("HYDRANT_PLC_URL") 412 415 .ok()
+229 -46
src/control.rs
··· 26 26 use crate::backfill::BackfillWorker; 27 27 use crate::config::{Config, SignatureVerification}; 28 28 use crate::db::types::DbRkey; 29 - use crate::db::{self, filter as db_filter, keys, ser_repo_state}; 29 + use crate::db::{ 30 + self, filter as db_filter, keys, load_persisted_crawler_sources, 31 + load_persisted_firehose_sources, ser_repo_state, 32 + }; 30 33 use crate::filter::{FilterMode, SetUpdate}; 31 34 use crate::ingest::{firehose::FirehoseIngestor, worker::FirehoseWorker}; 32 35 use crate::state::AppState; ··· 150 153 tasks: Arc::new(scc::HashMap::new()), 151 154 persisted: Arc::new(scc::HashSet::new()), 152 155 }, 153 - firehose: FirehoseHandle(state.clone()), 156 + firehose: FirehoseHandle { 157 + state: state.clone(), 158 + shared: Arc::new(std::sync::OnceLock::new()), 159 + tasks: Arc::new(scc::HashMap::new()), 160 + persisted: Arc::new(scc::HashSet::new()), 161 + }, 154 162 backfill: BackfillHandle(state.clone()), 155 163 filter: FilterControl(state.clone()), 156 164 repos: ReposControl(state.clone()), ··· 181 189 let state = self.state.clone(); 182 190 let config = self.config.clone(); 183 191 let crawler = self.crawler.clone(); 192 + let firehose = self.firehose.clone(); 184 193 185 194 if self.started.swap(true, Ordering::SeqCst) { 186 195 miette::bail!("Hydrant::run() called more than once"); ··· 241 250 move || loop { 242 251 std::thread::sleep(persist_interval); 243 252 244 - for (relay, cursor) in &state.relay_cursors { 253 + state.relay_cursors.iter_sync(|relay, cursor| { 245 254 let seq = cursor.load(Ordering::SeqCst); 246 255 if seq > 0 { 247 256 if let Err(e) = db::set_firehose_cursor(&state.db, relay, seq) { ··· 249 258 db::check_poisoned_report(&e); 250 259 } 251 260 } 252 - } 261 + true 262 + }); 253 263 254 264 if let Err(e) = db::persist_counts(&state.db) { 255 265 error!(err = %e, "failed to persist counts"); ··· 306 316 "starting ingestion" 307 317 ); 308 318 309 - // 10. spawn one firehose ingestor per relay (fatal tasks) 319 + // 10. set shared and spawn firehose ingestors 320 + firehose 321 + .shared 322 + .set(FirehoseShared { 323 + buffer_tx: buffer_tx.clone(), 324 + verify_signatures: matches!( 325 + config.verify_signatures, 326 + SignatureVerification::Full 327 + ), 328 + }) 329 + .ok() 330 + .expect("firehose shared already set"); 331 + let fire_shared = firehose.shared.get().unwrap(); 332 + 310 333 let relay_hosts = config.relays.clone(); 311 334 if !relay_hosts.is_empty() { 312 335 info!( ··· 319 342 "starting firehose ingestor(s)" 320 343 ); 321 344 for relay_url in &relay_hosts { 322 - let ingestor = FirehoseIngestor::new( 323 - state.clone(), 324 - buffer_tx.clone(), 325 - relay_url.clone(), 326 - state.filter.clone(), 327 - state.firehose_enabled.subscribe(), 328 - matches!(config.verify_signatures, SignatureVerification::Full), 329 - ); 330 - let tx = Arc::clone(&fatal_tx); 331 - tokio::spawn(async move { 332 - let result = ingestor.run().await; 333 - let _ = tx.send(Some(result.map_err(|e| e.to_string()))); 334 - }); 345 + let enabled_rx = state.firehose_enabled.subscribe(); 346 + let handle = 347 + spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?; 348 + let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await; 349 + } 350 + } 351 + 352 + let persisted_relay_urls = tokio::task::spawn_blocking({ 353 + let state = state.clone(); 354 + move || load_persisted_firehose_sources(&state.db) 355 + }) 356 + .await 357 + .into_diagnostic()??; 358 + 359 + for relay_url in &persisted_relay_urls { 360 + let _ = firehose.persisted.insert_async(relay_url.clone()).await; 361 + if firehose.tasks.contains_async(relay_url).await { 362 + continue; 335 363 } 364 + let enabled_rx = state.firehose_enabled.subscribe(); 365 + let handle = 366 + spawn_firehose_ingestor(relay_url, &state, fire_shared, enabled_rx).await?; 367 + let _ = firehose.tasks.insert_async(relay_url.clone(), handle).await; 336 368 } 337 369 338 370 // 11. spawn crawler infrastructure (always, to support dynamic source management) ··· 434 466 let _ = crawler.tasks.insert_async(source.url.clone(), handle).await; 435 467 } 436 468 437 - // load and spawn any sources persisted in the database 438 - let db = state.db.clone(); 439 - let persisted_sources = 440 - tokio::task::spawn_blocking(move || load_persisted_crawler_sources(&db)) 441 - .await 442 - .into_diagnostic()??; 469 + let persisted_sources = tokio::task::spawn_blocking({ 470 + let state = state.clone(); 471 + move || load_persisted_crawler_sources(&state.db) 472 + }) 473 + .await 474 + .into_diagnostic()??; 443 475 444 476 for source in &persisted_sources { 445 477 let _ = crawler.persisted.insert_async(source.url.clone()).await; ··· 754 786 } 755 787 } 756 788 757 - /// load all crawler sources persisted in the database. 758 - fn load_persisted_crawler_sources(db: &crate::db::Db) -> Result<Vec<crate::config::CrawlerSource>> { 759 - use crate::db::keys::CRAWLER_SOURCE_PREFIX; 789 + async fn spawn_firehose_ingestor( 790 + relay_url: &Url, 791 + state: &Arc<AppState>, 792 + shared: &FirehoseShared, 793 + enabled: watch::Receiver<bool>, 794 + ) -> Result<FirehoseIngestorHandle> { 795 + use std::sync::atomic::AtomicI64; 796 + 797 + let start = db::get_firehose_cursor(&state.db, relay_url).await?; 798 + // insert into relay_cursors if not already present; existing in-memory cursor takes precedence 799 + let _ = state 800 + .relay_cursors 801 + .insert_async(relay_url.clone(), AtomicI64::new(start.unwrap_or(0))) 802 + .await; 803 + 804 + info!(relay = %relay_url, cursor = ?start, "starting firehose ingestor"); 805 + 806 + let ingestor = FirehoseIngestor::new( 807 + state.clone(), 808 + shared.buffer_tx.clone(), 809 + relay_url.clone(), 810 + state.filter.clone(), 811 + enabled, 812 + shared.verify_signatures, 813 + ); 814 + 815 + let relay_for_log = relay_url.clone(); 816 + let abort = tokio::spawn(async move { 817 + if let Err(e) = ingestor.run().await { 818 + error!(relay = %relay_for_log, err = %e, "firehose ingestor exited with error"); 819 + } 820 + }) 821 + .abort_handle(); 760 822 761 - let mut sources = Vec::new(); 762 - for entry in db.crawler.prefix(CRAWLER_SOURCE_PREFIX) { 763 - let (key, val) = entry.into_inner().into_diagnostic()?; 764 - let url_bytes = &key[CRAWLER_SOURCE_PREFIX.len()..]; 765 - let url_str = std::str::from_utf8(url_bytes).into_diagnostic()?; 766 - let url = Url::parse(url_str).into_diagnostic()?; 767 - let mode: crate::config::CrawlerMode = rmp_serde::from_slice(&val).into_diagnostic()?; 768 - sources.push(crate::config::CrawlerSource { url, mode }); 769 - } 770 - Ok(sources) 823 + Ok(FirehoseIngestorHandle { abort }) 771 824 } 772 825 773 826 /// runtime control over the crawler component. ··· 918 971 } 919 972 } 920 973 974 + struct FirehoseIngestorHandle { 975 + abort: tokio::task::AbortHandle, 976 + } 977 + 978 + impl Drop for FirehoseIngestorHandle { 979 + fn drop(&mut self) { 980 + self.abort.abort(); 981 + } 982 + } 983 + 984 + struct FirehoseShared { 985 + buffer_tx: crate::ingest::BufferTx, 986 + verify_signatures: bool, 987 + } 988 + 989 + /// a snapshot of a single firehose relay's runtime state. 990 + #[derive(Debug, Clone, serde::Serialize)] 991 + pub struct FirehoseSourceInfo { 992 + pub url: Url, 993 + /// true if added via the API and persisted to the database; false for `RELAY_HOSTS` sources. 994 + pub persisted: bool, 995 + } 996 + 921 997 /// runtime control over the firehose ingestor component. 922 998 #[derive(Clone)] 923 - pub struct FirehoseHandle(Arc<AppState>); 999 + pub struct FirehoseHandle { 1000 + state: Arc<AppState>, 1001 + /// set once by [`Hydrant::run`]; `None` means run() has not been called yet. 1002 + shared: Arc<std::sync::OnceLock<FirehoseShared>>, 1003 + /// per-relay running tasks, keyed by url. 1004 + tasks: Arc<scc::HashMap<Url, FirehoseIngestorHandle>>, 1005 + /// set of urls persisted in the database (dynamically added sources). 1006 + persisted: Arc<scc::HashSet<Url>>, 1007 + } 924 1008 925 1009 impl FirehoseHandle { 926 - /// enable the firehose, no-op if already enabled. 1010 + /// enable the firehose. no-op if already enabled. 927 1011 pub fn enable(&self) { 928 - self.0.firehose_enabled.send_replace(true); 1012 + self.state.firehose_enabled.send_replace(true); 929 1013 } 930 - /// disable the firehose, the current message finishes processing before the connection closes. 1014 + /// disable the firehose. the current message finishes processing before the connection closes. 931 1015 pub fn disable(&self) { 932 - self.0.firehose_enabled.send_replace(false); 1016 + self.state.firehose_enabled.send_replace(false); 933 1017 } 934 1018 /// returns the current enabled state of the firehose. 935 1019 pub fn is_enabled(&self) -> bool { 936 - *self.0.firehose_enabled.borrow() 1020 + *self.state.firehose_enabled.borrow() 1021 + } 1022 + 1023 + /// reset the stored cursor for the given relay URL. 1024 + /// 1025 + /// clears the `firehose_cursor|{url}` entry from the cursors keyspace and zeroes the 1026 + /// in-memory cursor. the next connection will tail live events from the current head. 1027 + pub async fn reset_cursor(&self, url: &str) -> Result<()> { 1028 + let db = self.state.db.clone(); 1029 + let key = keys::firehose_cursor_key(url); 1030 + tokio::task::spawn_blocking(move || db.cursors.remove(key).into_diagnostic()) 1031 + .await 1032 + .into_diagnostic()??; 1033 + 1034 + if let Ok(relay_url) = Url::parse(url) { 1035 + self.state.relay_cursors.peek_with(&relay_url, |_, c| { 1036 + c.store(0, std::sync::atomic::Ordering::SeqCst); 1037 + }); 1038 + } 1039 + Ok(()) 1040 + } 1041 + 1042 + /// return info on all currently active firehose sources. 1043 + pub async fn list_sources(&self) -> Vec<FirehoseSourceInfo> { 1044 + let mut sources = Vec::new(); 1045 + self.tasks 1046 + .iter_async(|url, _| { 1047 + sources.push(FirehoseSourceInfo { 1048 + url: url.clone(), 1049 + persisted: self.persisted.contains_sync(url), 1050 + }); 1051 + true 1052 + }) 1053 + .await; 1054 + sources 1055 + } 1056 + 1057 + /// add a new firehose relay at runtime. 1058 + /// 1059 + /// the URL is persisted to the database and will be re-spawned on restart. if a relay with 1060 + /// the same URL already exists it is replaced: the running task is stopped and a new one 1061 + /// is started. any cursor state for that URL is preserved. 1062 + /// 1063 + /// returns an error if called before [`Hydrant::run`]. 1064 + pub async fn add_source(&self, url: Url) -> Result<()> { 1065 + let Some(shared) = self.shared.get() else { 1066 + miette::bail!("firehose not yet started: call Hydrant::run() first"); 1067 + }; 1068 + 1069 + let db = self.state.db.clone(); 1070 + let key = keys::firehose_source_key(url.as_str()); 1071 + tokio::task::spawn_blocking(move || db.crawler.insert(key, b"").into_diagnostic()) 1072 + .await 1073 + .into_diagnostic()??; 1074 + 1075 + let enabled_rx = self.state.firehose_enabled.subscribe(); 1076 + let handle = spawn_firehose_ingestor(&url, &self.state, shared, enabled_rx).await?; 1077 + 1078 + let _ = self.persisted.insert_async(url.clone()).await; 1079 + match self.tasks.entry_async(url).await { 1080 + scc::hash_map::Entry::Vacant(e) => { 1081 + e.insert_entry(handle); 1082 + } 1083 + scc::hash_map::Entry::Occupied(mut e) => { 1084 + *e.get_mut() = handle; 1085 + } 1086 + } 1087 + Ok(()) 1088 + } 1089 + 1090 + /// remove a firehose relay at runtime by URL. 1091 + /// 1092 + /// aborts the running ingestor task. if the source was added via the API it is removed from 1093 + /// the database and will not reappear on restart. `RELAY_HOSTS` sources are only stopped for 1094 + /// the current session; they reappear on the next restart. 1095 + /// 1096 + /// returns `true` if the relay was found and removed, `false` if it was not running. 1097 + /// returns an error if called before [`Hydrant::run`]. 1098 + pub async fn remove_source(&self, url: &Url) -> Result<bool> { 1099 + if self.shared.get().is_none() { 1100 + miette::bail!("firehose not yet started: call Hydrant::run() first"); 1101 + } 1102 + 1103 + if self.tasks.remove_async(url).await.is_none() { 1104 + return Ok(false); 1105 + } 1106 + 1107 + // remove from relay_cursors (persist thread will stop tracking it) 1108 + self.state.relay_cursors.remove_async(url).await; 1109 + 1110 + if self.persisted.remove_async(url).await.is_some() { 1111 + let db = self.state.db.clone(); 1112 + let key = keys::firehose_source_key(url.as_str()); 1113 + tokio::task::spawn_blocking(move || db.crawler.remove(key).into_diagnostic()) 1114 + .await 1115 + .into_diagnostic()??; 1116 + } 1117 + 1118 + Ok(true) 937 1119 } 938 1120 } 939 1121 ··· 1577 1759 .into_static(); 1578 1760 let cid = Cid::new(&cid_bytes) 1579 1761 .into_diagnostic() 1580 - .wrap_err("cant parse block cid")? 1581 - .into_static(); 1762 + .wrap_err("cant parse block cid")?; 1763 + let cid = Cid::Str(cid.to_cowstr().into_static()); 1582 1764 1583 1765 Ok(Some(Record { did, cid, value })) 1584 1766 }) ··· 1659 1841 { 1660 1842 let value: Data = 1661 1843 serde_ipld_dagcbor::from_slice(&block_bytes).unwrap_or(Data::Null); 1662 - let cid = Cid::new(&cid_bytes).into_diagnostic()?.into_static(); 1844 + let cid = Cid::new(&cid_bytes).into_diagnostic()?; 1845 + let cid = Cid::Str(cid.to_cowstr().into_static()); 1663 1846 results.push(ListedRecord { 1664 1847 rkey: Rkey::new_cow(CowStr::Owned(rkey.to_smolstr())) 1665 1848 .expect("that rkey is validated"),
+9
src/db/keys.rs
··· 193 193 key 194 194 } 195 195 196 + pub const FIREHOSE_SOURCE_PREFIX: &[u8] = b"firehose|"; 197 + 198 + pub fn firehose_source_key(url: &str) -> Vec<u8> { 199 + let mut key = Vec::with_capacity(FIREHOSE_SOURCE_PREFIX.len() + url.len()); 200 + key.extend_from_slice(FIREHOSE_SOURCE_PREFIX); 201 + key.extend_from_slice(url.as_bytes()); 202 + key 203 + } 204 + 196 205 // key format: {collection}|{cid_bytes} 197 206 pub fn block_key(collection: &str, cid: &[u8]) -> Vec<u8> { 198 207 let mut key = Vec::with_capacity(collection.len() + 1 + cid.len());
+31
src/db/mod.rs
··· 854 854 .transpose()?; 855 855 Ok(count.unwrap_or(0)) 856 856 } 857 + 858 + pub fn load_persisted_firehose_sources(db: &crate::db::Db) -> Result<Vec<Url>> { 859 + use crate::db::keys::FIREHOSE_SOURCE_PREFIX; 860 + 861 + let mut urls = Vec::new(); 862 + for entry in db.crawler.prefix(FIREHOSE_SOURCE_PREFIX) { 863 + let (key, _) = entry.into_inner().into_diagnostic()?; 864 + let url_bytes = &key[FIREHOSE_SOURCE_PREFIX.len()..]; 865 + let url_str = std::str::from_utf8(url_bytes).into_diagnostic()?; 866 + let url = Url::parse(url_str).into_diagnostic()?; 867 + urls.push(url); 868 + } 869 + Ok(urls) 870 + } 871 + 872 + pub fn load_persisted_crawler_sources( 873 + db: &crate::db::Db, 874 + ) -> Result<Vec<crate::config::CrawlerSource>> { 875 + use crate::db::keys::CRAWLER_SOURCE_PREFIX; 876 + 877 + let mut sources = Vec::new(); 878 + for entry in db.crawler.prefix(CRAWLER_SOURCE_PREFIX) { 879 + let (key, val) = entry.into_inner().into_diagnostic()?; 880 + let url_bytes = &key[CRAWLER_SOURCE_PREFIX.len()..]; 881 + let url_str = std::str::from_utf8(url_bytes).into_diagnostic()?; 882 + let url = Url::parse(url_str).into_diagnostic()?; 883 + let mode: crate::config::CrawlerMode = rmp_serde::from_slice(&val).into_diagnostic()?; 884 + sources.push(crate::config::CrawlerSource { url, mode }); 885 + } 886 + Ok(sources) 887 + }
+11 -3
src/ingest/firehose.rs
··· 1 - use crate::db::{self, deser_repo_state}; 1 + use crate::db::deser_repo_state; 2 2 use crate::filter::{FilterHandle, FilterMode}; 3 3 use crate::ingest::stream::{FirehoseStream, SubscribeReposMessage, decode_frame}; 4 4 use crate::ingest::{BufferTx, IngestMessage}; ··· 8 8 use jacquard_common::types::did::Did; 9 9 use miette::{IntoDiagnostic, Result}; 10 10 use std::sync::Arc; 11 + use std::sync::atomic::Ordering; 11 12 use std::time::Duration; 12 13 use tokio::sync::watch; 13 14 use tracing::{Span, debug, error, info, trace}; ··· 46 47 loop { 47 48 self.enabled.wait_enabled("firehose").await; 48 49 49 - let start_cursor = db::get_firehose_cursor(&self.state.db, &self.relay_host).await?; 50 + let start_cursor = self 51 + .state 52 + .relay_cursors 53 + .peek_with(&self.relay_host, |_, c| { 54 + let val = c.load(Ordering::SeqCst); 55 + (val > 0).then_some(val) 56 + }) 57 + .flatten(); 50 58 51 59 match start_cursor { 52 60 Some(c) => info!(cursor = %c, "resuming from cursor"), ··· 160 168 } 161 169 162 170 if !filter.signals.is_empty() { 163 - trace!(did = %did, "unknown — passing to worker for signal check"); 171 + trace!(did = %did, "unknown, passing to worker for signal check"); 164 172 Ok(true) 165 173 } else { 166 174 trace!(did = %did, "unknown and no signals configured, skipping");
+1 -1
src/ingest/stream.rs
··· 102 102 chrono::DateTime::parse_from_rfc3339(&s) 103 103 .map(Self) 104 104 .or_else(|_| { 105 - // no timezone — warn and assume UTC 105 + // no timezone, warn and assume UTC 106 106 tracing::warn!( 107 107 value = %s, 108 108 "datetime missing timezone suffix, assuming UTC"
+11 -19
src/ingest/worker.rs
··· 20 20 use std::collections::hash_map::DefaultHasher; 21 21 use std::hash::{Hash, Hasher}; 22 22 use std::sync::Arc; 23 + use std::sync::atomic::Ordering::SeqCst; 23 24 use thiserror::Error; 24 25 use tokio::sync::mpsc; 25 26 use tracing::{debug, error, info, trace, warn}; ··· 253 254 db::check_poisoned_report(r); 254 255 } 255 256 error!(did = %did, err = %e, "error in check_repo_state"); 256 - if let Some(cursor) = state.relay_cursors.get(&relay) { 257 - cursor.store(seq, std::sync::atomic::Ordering::SeqCst); 258 - } 257 + state 258 + .relay_cursors 259 + .peek_with(&relay, |_, c| c.store(seq, SeqCst)); 259 260 continue; 260 261 } 261 262 }; ··· 307 308 did = %did, err = %e, 308 309 "failed to transition inactive repo to synced" 309 310 ); 310 - if let Some(cursor) = 311 - state.relay_cursors.get(&relay) 312 - { 313 - cursor.store( 314 - seq, 315 - std::sync::atomic::Ordering::SeqCst, 316 - ); 317 - } 311 + state 312 + .relay_cursors 313 + .peek_with(&relay, |_, c| c.store(seq, SeqCst)); 318 314 continue; 319 315 } 320 316 } ··· 359 355 } 360 356 } 361 357 362 - if let Some(cursor) = state.relay_cursors.get(&relay) { 363 - cursor.store(seq, std::sync::atomic::Ordering::SeqCst); 364 - } 358 + state 359 + .relay_cursors 360 + .peek_with(&relay, |_, c| c.store(seq, SeqCst)); 365 361 } 366 362 } 367 363 ··· 489 485 *ctx.added_blocks += res.blocks_count; 490 486 *ctx.records_delta += res.records_delta; 491 487 ctx.broadcast_events.push(BroadcastEvent::Persisted( 492 - ctx.state 493 - .db 494 - .next_event_id 495 - .load(std::sync::atomic::Ordering::SeqCst) 496 - - 1, 488 + ctx.state.db.next_event_id.load(SeqCst) - 1, 497 489 )); 498 490 499 491 Ok(RepoProcessResult::Ok(repo_state))
+8 -7
src/state.rs
··· 1 1 use std::sync::atomic::AtomicI64; 2 - use std::{collections::HashMap, time::Duration}; 2 + use std::time::Duration; 3 3 4 4 use miette::Result; 5 5 use tokio::sync::{Notify, watch}; ··· 16 16 pub db: Db, 17 17 pub resolver: Resolver, 18 18 pub filter: FilterHandle, 19 - pub relay_cursors: HashMap<Url, AtomicI64>, 19 + /// per-relay firehose cursors. values use interior mutability so they can be 20 + /// updated through the lock-free `peek_with` reads in the ingest worker. 21 + pub relay_cursors: scc::HashIndex<Url, AtomicI64>, 20 22 pub backfill_notify: Notify, 21 23 pub crawler_enabled: watch::Sender<bool>, 22 24 pub firehose_enabled: watch::Sender<bool>, ··· 41 43 42 44 let filter = new_handle(filter_config); 43 45 44 - let relay_cursors = config 45 - .relays 46 - .iter() 47 - .map(|url| (url.clone(), AtomicI64::new(0))) 48 - .collect(); 46 + let relay_cursors = scc::HashIndex::new(); 47 + for url in &config.relays { 48 + let _ = relay_cursors.insert_sync(url.clone(), AtomicI64::new(0)); 49 + } 49 50 50 51 let (crawler_enabled, _) = watch::channel(crawler_default); 51 52 let (firehose_enabled, _) = watch::channel(config.enable_firehose);
+77 -2
tests/api_test.nu
··· 45 45 if not $s.persisted { 46 46 fail "expected persisted=true for dynamically added source" $pid 47 47 } 48 - print $" ok: 1 source — url=($s.url), mode=($s.mode), persisted=($s.persisted)" 48 + print $" ok: 1 source, url=($s.url), mode=($s.mode), persisted=($s.persisted)" 49 49 50 50 # posting the same URL with a different mode replaces the existing entry 51 51 print " POST /crawler/sources (should override)..." ··· 212 212 print "config source persistence test passed!" 213 213 } 214 214 215 + def test-firehose-sources [url: string, pid: int] { 216 + print "=== test: firehose sources ===" 217 + 218 + # initial state: no sources (we start with HYDRANT_RELAY_HOSTS="") 219 + print " GET /firehose/sources (expect empty)..." 220 + let initial = (http get $"($url)/firehose/sources") 221 + if ($initial | length) != 0 { 222 + fail $"expected empty list, got ($initial | length) entries" $pid 223 + } 224 + print " ok: starts empty" 225 + 226 + # add a relay source 227 + print " POST /firehose/sources..." 228 + let resp_add = (http post -f -e -t application/json $"($url)/firehose/sources" { 229 + url: "wss://test.bsky.network" 230 + }) 231 + if $resp_add.status != 201 { 232 + fail $"expected 201, got ($resp_add.status)" $pid 233 + } 234 + print " ok: 201 Created" 235 + 236 + # verify it appears 237 + print " GET /firehose/sources (expect 1 entry)..." 238 + let sources = (http get $"($url)/firehose/sources") 239 + if ($sources | length) != 1 { 240 + fail $"expected 1 source, got ($sources | length)" $pid 241 + } 242 + let s = ($sources | first) 243 + if not $s.persisted { 244 + fail "expected persisted=true for dynamically added source" $pid 245 + } 246 + print $" ok: 1 source, url=($s.url), persisted=($s.persisted)" 247 + 248 + # posting the same URL replaces the existing entry 249 + print " POST /firehose/sources (should override)..." 250 + let resp_replace = (http post -f -e -t application/json $"($url)/firehose/sources" { 251 + url: "wss://test.bsky.network" 252 + }) 253 + if $resp_replace.status != 201 { 254 + fail $"expected 201, got ($resp_replace.status)" $pid 255 + } 256 + let after_replace = (http get $"($url)/firehose/sources") 257 + if ($after_replace | length) != 1 { 258 + fail $"expected 1 source after override, got ($after_replace | length)" $pid 259 + } 260 + print " ok: duplicate add replaced existing entry" 261 + 262 + # remove the source 263 + print " DELETE /firehose/sources..." 264 + let resp_del = (http delete -f -e -t application/json $"($url)/firehose/sources" --data { 265 + url: "wss://test.bsky.network" 266 + }) 267 + if $resp_del.status != 200 { 268 + fail $"expected 200, got ($resp_del.status)" $pid 269 + } 270 + let after_del = (http get $"($url)/firehose/sources") 271 + if ($after_del | length) != 0 { 272 + fail "expected empty list after delete" $pid 273 + } 274 + print " ok: source removed" 275 + 276 + # deleting a non-existent source returns 404 277 + print " DELETE /firehose/sources (should be 404)..." 278 + let resp_del_missing = (http delete -f -e -t application/json $"($url)/firehose/sources" --data { 279 + url: "wss://test.bsky.network" 280 + }) 281 + if $resp_del_missing.status != 404 { 282 + fail $"expected 404, got ($resp_del_missing.status)" $pid 283 + } 284 + print " ok: 404 for non-existent source" 285 + 286 + print "firehose source tests passed!" 287 + } 288 + 215 289 def main [] { 216 290 let port = 3007 217 291 let url = $"http://localhost:($port)" ··· 221 295 let db = (mktemp -d -t hydrant_api_test.XXXXXX) 222 296 print $"db: ($db)" 223 297 224 - let instance = (with-env { HYDRANT_CRAWLER_URLS: "" } { 298 + let instance = (with-env { HYDRANT_CRAWLER_URLS: "", HYDRANT_RELAY_HOSTS: "" } { 225 299 start-hydrant $binary $db $port 226 300 }) 227 301 if not (wait-for-api $url) { ··· 229 303 } 230 304 231 305 test-crawler-sources $url $instance.pid 306 + test-firehose-sources $url $instance.pid 232 307 233 308 kill $instance.pid 234 309 sleep 2sec
+1
tests/common.nu
··· 75 75 HYDRANT_API_PORT: ($port | into string), 76 76 HYDRANT_ENABLE_DEBUG: "true", 77 77 HYDRANT_DEBUG_PORT: ($port + 1 | into string), 78 + HYDRANT_PLC_URL: "https://plc.gaze.systems", 78 79 RUST_LOG: "debug,hyper=error,tokio=error,h2=error,tower=error,rustls=error" 79 80 } | merge $hydrant_vars 80 81