very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
59
fork

Configure Feed

Select the types of activity you want to include in your feed.

[lib,api] chunk streaming on replay, add a live buffer, chunk ephmeral gc deletes

dawn 904ecb28 82e340fd

+979 -163
+1 -1
docs/api/README.md
··· 6 6 7 7 ## public 8 8 9 - - `GET /stream`: subscribe to the event stream. query params: `cursor` (optional, start from a specific event ID). 9 + - `GET /stream`: subscribe to the event stream. query params: `cursor` (optional, start from a specific event ID). slow consumers may receive a `{"type":"error","error":"ConsumerTooSlow",...}` frame before the connection closes. 10 10 - `GET /stats`: get stats about the database (counts of repos, records, events; sizes of keyspaces on disk). 11 11 - `GET /health` / `GET /_health`: health check. 12 12
+1 -1
docs/xrpc/atproto.md
··· 15 15 - `com.atproto.sync.listRepos` 16 16 - `com.atproto.sync.getLatestCommit` 17 17 - `com.atproto.sync.requestCrawl` (adds the host to firehose sources in relay mode) 18 - - `com.atproto.sync.subscribeRepos` (WebSocket firehose stream, requires `relay` feature) 18 + - `com.atproto.sync.subscribeRepos` (WebSocket firehose stream, requires `relay` feature; slow consumers may receive a `ConsumerTooSlow` error frame before the connection closes)
+8 -1
examples/statusphere.rs
··· 113 113 .map(|h| h.to_string()) 114 114 .unwrap_or_else(|| did.to_string()) 115 115 }; 116 - while let Some(event) = stream.next().await { 116 + while let Some(item) = stream.next().await { 117 + let event = match item { 118 + Ok(event) => event, 119 + Err(err) => { 120 + tracing::warn!(err = %err, "hydrant stream closed"); 121 + break; 122 + } 123 + }; 117 124 if let Some(rec) = event.record { 118 125 let did = rec.did.as_str().to_owned(); 119 126 match rec.action.as_str() {
+43 -3
src/api/stream.rs
··· 28 28 } 29 29 30 30 async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: StreamQuery) { 31 + let send_timeout = hydrant.stream_send_timeout(); 31 32 let mut stream = hydrant.subscribe(query.cursor); 32 33 33 - while let Some(evt) = stream.next().await { 34 + while let Some(item) = stream.next().await { 35 + let evt = match item { 36 + Ok(evt) => evt, 37 + Err(err) => { 38 + let json = serde_json::json!({ 39 + "type": "error", 40 + "error": err.code(), 41 + "message": err.to_string(), 42 + }); 43 + let _ = tokio::time::timeout( 44 + send_timeout, 45 + socket.send(Message::text(json.to_string())), 46 + ) 47 + .await; 48 + let _ = 49 + tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()).await; 50 + break; 51 + } 52 + }; 53 + 34 54 match serde_json::to_string(&evt) { 35 55 Ok(json) => { 36 - if socket.send(Message::text(json)).await.is_err() { 37 - break; 56 + match tokio::time::timeout(send_timeout, socket.send(Message::text(json))).await { 57 + Ok(Ok(())) => {} 58 + Ok(Err(_)) => break, 59 + Err(_) => { 60 + let err = serde_json::json!({ 61 + "type": "error", 62 + "error": "ConsumerTooSlow", 63 + "message": format!( 64 + "stream socket send blocked for at least {} seconds", 65 + send_timeout.as_secs() 66 + ), 67 + }); 68 + let _ = tokio::time::timeout( 69 + std::time::Duration::from_secs(1), 70 + socket.send(Message::text(err.to_string())), 71 + ) 72 + .await; 73 + let _ = 74 + tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()) 75 + .await; 76 + break; 77 + } 38 78 } 39 79 } 40 80 Err(e) => {
+42 -4
src/api/xrpc/subscribe_repos.rs
··· 5 5 use axum_tws::{Message, WebSocket, WebSocketUpgrade}; 6 6 use futures::StreamExt; 7 7 use serde::Deserialize; 8 + use tracing::error; 8 9 9 - use crate::control::Hydrant; 10 + use crate::control::{Hydrant, RelayStreamError}; 11 + use crate::ingest::stream::encode_error_frame; 10 12 11 13 #[derive(Deserialize)] 12 14 pub struct SubscribeReposQuery { ··· 22 24 } 23 25 24 26 async fn handle_socket(mut socket: WebSocket, hydrant: Hydrant, query: SubscribeReposQuery) { 27 + let send_timeout = hydrant.stream_send_timeout(); 25 28 let mut stream = hydrant.subscribe_repos(query.cursor); 26 29 27 - while let Some(frame) = stream.next().await { 28 - if socket.send(Message::binary(frame)).await.is_err() { 29 - break; 30 + while let Some(item) = stream.next().await { 31 + let frame = match item { 32 + Ok(frame) => frame, 33 + Err(err) => { 34 + send_error_frame(&mut socket, send_timeout, &err).await; 35 + break; 36 + } 37 + }; 38 + 39 + match tokio::time::timeout(send_timeout, socket.send(Message::binary(frame))).await { 40 + Ok(Ok(())) => {} 41 + Ok(Err(_)) => break, 42 + Err(_) => { 43 + let err = RelayStreamError::ConsumerTooSlow { 44 + reason: format!( 45 + "relay stream socket send blocked for at least {} seconds", 46 + send_timeout.as_secs() 47 + ), 48 + }; 49 + send_error_frame(&mut socket, std::time::Duration::from_secs(1), &err).await; 50 + break; 51 + } 30 52 } 31 53 } 32 54 } 55 + 56 + async fn send_error_frame( 57 + socket: &mut WebSocket, 58 + timeout: std::time::Duration, 59 + err: &RelayStreamError, 60 + ) { 61 + match encode_error_frame(err.code(), Some(&err.to_string())) { 62 + Ok(frame) => { 63 + let _ = tokio::time::timeout(timeout, socket.send(Message::binary(frame))).await; 64 + } 65 + Err(e) => { 66 + error!(err = %e, "failed to encode relay stream error frame"); 67 + } 68 + } 69 + let _ = tokio::time::timeout(std::time::Duration::from_secs(1), socket.close()).await; 70 + }
+47
src/config.rs
··· 453 453 /// in-memory write buffer (memtable) size for the records keyspace in MB. 454 454 /// set via `HYDRANT_DB_RECORDS_MEMTABLE_SIZE_MB`. 455 455 pub db_records_memtable_size_mb: u64, 456 + 457 + /// maximum number of persisted events read from the database per replay batch. 458 + /// set via `HYDRANT_STREAM_REPLAY_CHUNK_SIZE`. 459 + pub stream_replay_chunk_size: usize, 460 + /// pause between replay batches, giving database maintenance work a chance to run. 461 + /// set via `HYDRANT_STREAM_REPLAY_CHUNK_PAUSE` (humantime duration, e.g. `2ms`). 462 + pub stream_replay_chunk_pause: Duration, 463 + /// maximum number of live in-memory stream events buffered per subscriber while it catches up. 464 + /// set via `HYDRANT_STREAM_PENDING_EVENT_LIMIT`. 465 + pub stream_pending_event_limit: usize, 466 + /// maximum time a subscriber may block stream delivery before being disconnected. 467 + /// set via `HYDRANT_STREAM_SEND_TIMEOUT` (humantime duration, e.g. `30sec`). 468 + pub stream_send_timeout: Duration, 456 469 } 457 470 458 471 impl Default for Config { ··· 526 539 db_repos_memtable_size_mb: BASE_MEMTABLE_MB / 2, 527 540 db_events_memtable_size_mb: BASE_MEMTABLE_MB, 528 541 db_records_memtable_size_mb: BASE_MEMTABLE_MB / 3 * 2, 542 + stream_replay_chunk_size: 64, 543 + stream_replay_chunk_pause: Duration::from_millis(2), 544 + stream_pending_event_limit: 4096, 545 + stream_send_timeout: Duration::from_secs(30), 529 546 } 530 547 } 531 548 } ··· 650 667 "DB_REPOS_MEMTABLE_SIZE_MB", 651 668 defaults.db_repos_memtable_size_mb 652 669 ); 670 + let stream_replay_chunk_size = cfg!( 671 + "STREAM_REPLAY_CHUNK_SIZE", 672 + defaults.stream_replay_chunk_size 673 + ); 674 + let stream_replay_chunk_pause = cfg!( 675 + "STREAM_REPLAY_CHUNK_PAUSE", 676 + defaults.stream_replay_chunk_pause, 677 + sec 678 + ); 679 + let stream_pending_event_limit = cfg!( 680 + "STREAM_PENDING_EVENT_LIMIT", 681 + defaults.stream_pending_event_limit 682 + ); 683 + let stream_send_timeout = cfg!("STREAM_SEND_TIMEOUT", defaults.stream_send_timeout, sec); 653 684 654 685 let crawler_max_pending_repos = cfg!( 655 686 "CRAWLER_MAX_PENDING_REPOS", ··· 825 856 db_repos_memtable_size_mb, 826 857 db_events_memtable_size_mb, 827 858 db_records_memtable_size_mb, 859 + stream_replay_chunk_size, 860 + stream_replay_chunk_pause, 861 + stream_pending_event_limit, 862 + stream_send_timeout, 828 863 }) 829 864 } 830 865 } ··· 901 936 f, 902 937 "db records memtable", 903 938 format_args!("{} mb", self.db_records_memtable_size_mb) 939 + )?; 940 + config_line!(f, "stream replay chunk", self.stream_replay_chunk_size)?; 941 + config_line!( 942 + f, 943 + "stream replay pause", 944 + format_args!("{}ms", self.stream_replay_chunk_pause.as_millis()) 945 + )?; 946 + config_line!(f, "stream pending limit", self.stream_pending_event_limit)?; 947 + config_line!( 948 + f, 949 + "stream send timeout", 950 + format_args!("{}sec", self.stream_send_timeout.as_secs()) 904 951 )?; 905 952 config_line!(f, "crawler max pending", self.crawler_max_pending_repos)?; 906 953 config_line!(
+27 -5
src/control/indexer.rs
··· 4 4 /// a stream of [`Event`]s. returned by [`Hydrant::subscribe`]. 5 5 /// 6 6 /// implements [`futures::Stream`] and can be used with `StreamExt::next`, 7 - /// `while let Some(evt) = stream.next().await`, `forward`, etc. 7 + /// `while let Some(item) = stream.next().await`, `forward`, etc. 8 8 /// the stream terminates when the underlying channel closes (i.e. hydrant shuts down). 9 - pub struct EventStream(mpsc::Receiver<Event>); 9 + pub struct EventStream(mpsc::Receiver<Result<Event, StreamError>>); 10 + 11 + #[cfg(feature = "indexer_stream")] 12 + #[derive(Debug, Clone, thiserror::Error)] 13 + pub enum StreamError { 14 + #[error("stream consumer too slow: {reason}")] 15 + ConsumerTooSlow { reason: String }, 16 + } 17 + 18 + #[cfg(feature = "indexer_stream")] 19 + impl StreamError { 20 + pub fn code(&self) -> &'static str { 21 + match self { 22 + Self::ConsumerTooSlow { .. } => "ConsumerTooSlow", 23 + } 24 + } 25 + } 10 26 11 27 #[cfg(feature = "indexer_stream")] 12 28 impl Stream for EventStream { 13 - type Item = Event; 29 + type Item = Result<Event, StreamError>; 14 30 15 31 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 16 32 self.0.poll_recv(cx) ··· 59 75 /// a specific repository. 60 76 /// 61 77 /// multiple concurrent subscribers each receive a full independent copy of the stream. 62 - /// the stream ends when the `EventStream` is dropped. 78 + /// the stream ends when the `EventStream` is dropped. slow consumers receive 79 + /// [`StreamError::ConsumerTooSlow`] before the stream terminates when possible. 63 80 pub fn subscribe(&self, cursor: Option<u64>) -> EventStream { 64 81 let (tx, rx) = mpsc::channel(500); 65 82 let state = self.state.clone(); 66 83 let runtime = tokio::runtime::Handle::current(); 84 + let opts = stream::StreamOptions::from_config(&self.config); 67 85 68 86 std::thread::Builder::new() 69 87 .name("hydrant-stream".into()) 70 88 .spawn(move || { 71 89 let _g = runtime.enter(); 72 - event_stream_thread(state, tx, cursor); 90 + event_stream_thread(state, tx, cursor, opts); 73 91 }) 74 92 .expect("failed to spawn stream thread"); 75 93 76 94 EventStream(rx) 95 + } 96 + 97 + pub(crate) fn stream_send_timeout(&self) -> std::time::Duration { 98 + self.config.stream_send_timeout 77 99 } 78 100 }
+25 -3
src/control/relay.rs
··· 1 1 use super::*; 2 2 3 3 /// the relay event stream produced by [`Hydrant::subscribe_repos`]. 4 - pub struct RelayEventStream(mpsc::Receiver<bytes::Bytes>); 4 + pub struct RelayEventStream(mpsc::Receiver<Result<bytes::Bytes, RelayStreamError>>); 5 + 6 + #[derive(Debug, Clone, thiserror::Error)] 7 + pub enum RelayStreamError { 8 + #[error("relay stream consumer too slow: {reason}")] 9 + ConsumerTooSlow { reason: String }, 10 + } 11 + 12 + impl RelayStreamError { 13 + pub fn code(&self) -> &'static str { 14 + match self { 15 + Self::ConsumerTooSlow { .. } => "ConsumerTooSlow", 16 + } 17 + } 18 + } 5 19 6 20 impl futures::Stream for RelayEventStream { 7 - type Item = bytes::Bytes; 21 + type Item = Result<bytes::Bytes, RelayStreamError>; 8 22 9 23 fn poll_next( 10 24 mut self: std::pin::Pin<&mut Self>, ··· 22 36 /// 23 37 /// - if `cursor` is `None`, streaming starts from the current head (live tail only). 24 38 /// - if `cursor` is `Some(seq)`, all persisted events from that seq onward are replayed first. 39 + /// 40 + /// slow consumers receive [`RelayStreamError::ConsumerTooSlow`] before the stream terminates 41 + /// when possible. 25 42 pub fn subscribe_repos(&self, cursor: Option<u64>) -> RelayEventStream { 26 43 let (tx, rx) = mpsc::channel(500); 27 44 let state = self.state.clone(); 28 45 let runtime = tokio::runtime::Handle::current(); 46 + let opts = stream::StreamOptions::from_config(&self.config); 29 47 30 48 std::thread::Builder::new() 31 49 .name("hydrant-relay-stream".into()) 32 50 .spawn(move || { 33 51 let _g = runtime.enter(); 34 - relay_stream_thread(state, tx, cursor); 52 + relay_stream_thread(state, tx, cursor, opts); 35 53 }) 36 54 .expect("failed to spawn relay stream thread"); 37 55 38 56 RelayEventStream(rx) 57 + } 58 + 59 + pub(crate) fn stream_send_timeout(&self) -> std::time::Duration { 60 + self.config.stream_send_timeout 39 61 } 40 62 }
+698 -138
src/control/stream.rs
··· 1 + use std::collections::VecDeque; 2 + use std::fmt; 1 3 use std::sync::Arc; 4 + use std::time::{Duration, Instant}; 2 5 3 - use tokio::sync::mpsc; 4 - use tracing::error; 6 + use tokio::sync::mpsc::error::TrySendError; 7 + use tokio::sync::{broadcast, mpsc}; 8 + use tracing::{error, warn}; 5 9 10 + use crate::config::Config; 6 11 use crate::db::keys; 7 12 use crate::state::AppState; 8 13 use std::sync::atomic::Ordering; 9 14 10 15 #[cfg(feature = "indexer_stream")] 11 16 use { 12 - super::Event, 17 + super::{Event, StreamError}, 13 18 crate::db, 14 19 crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent}, 15 20 jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}, ··· 20 25 sha2::{Digest, Sha256}, 21 26 }; 22 27 28 + #[cfg(feature = "relay")] 29 + use super::RelayStreamError; 30 + 31 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 32 + const STREAM_SEND_RETRY_PAUSE: Duration = Duration::from_millis(10); 33 + 34 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 35 + #[derive(Debug, Clone, Copy)] 36 + pub(crate) struct StreamOptions { 37 + replay_chunk_size: usize, 38 + replay_chunk_pause: Duration, 39 + pending_event_limit: usize, 40 + send_timeout: Duration, 41 + } 42 + 43 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 44 + impl StreamOptions { 45 + pub(crate) fn from_config(config: &Config) -> Self { 46 + Self { 47 + replay_chunk_size: config.stream_replay_chunk_size.max(1), 48 + replay_chunk_pause: config.stream_replay_chunk_pause, 49 + pending_event_limit: config.stream_pending_event_limit.max(1), 50 + send_timeout: config.stream_send_timeout, 51 + } 52 + } 53 + } 54 + 55 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 56 + #[derive(Debug, Clone)] 57 + struct StreamTooSlow { 58 + reason: String, 59 + } 60 + 61 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 62 + impl StreamTooSlow { 63 + fn pending_limit(limit: usize) -> Self { 64 + Self { 65 + reason: format!("pending stream event buffer exceeded {limit} events"), 66 + } 67 + } 68 + 69 + fn lagged(skipped: u64) -> Self { 70 + Self { 71 + reason: format!("subscriber lagged past {skipped} broadcast events"), 72 + } 73 + } 74 + 75 + fn send_timeout(timeout: Duration) -> Self { 76 + Self { 77 + reason: format!( 78 + "stream delivery blocked for at least {} seconds", 79 + timeout.as_secs() 80 + ), 81 + } 82 + } 83 + } 84 + 85 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 86 + impl fmt::Display for StreamTooSlow { 87 + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 88 + write!(f, "stream consumer too slow: {}", self.reason) 89 + } 90 + } 91 + 23 92 #[cfg(feature = "indexer_stream")] 24 - pub(super) fn event_stream_thread( 25 - state: Arc<AppState>, 26 - tx: mpsc::Sender<Event>, 27 - cursor: Option<u64>, 28 - ) { 29 - let db = &state.db; 30 - let mut event_rx = db.event_tx.subscribe(); 31 - let ks = db.events.clone(); 32 - let mut current_id = match cursor { 33 - Some(c) => c.checked_sub(1), 34 - None => db.next_event_id.load(Ordering::SeqCst).checked_sub(1), 35 - }; 36 - let mut needs_catch_up = cursor.is_some(); 93 + impl From<StreamTooSlow> for StreamError { 94 + fn from(err: StreamTooSlow) -> Self { 95 + Self::ConsumerTooSlow { reason: err.reason } 96 + } 97 + } 98 + 99 + #[cfg(feature = "relay")] 100 + impl From<StreamTooSlow> for RelayStreamError { 101 + fn from(err: StreamTooSlow) -> Self { 102 + Self::ConsumerTooSlow { reason: err.reason } 103 + } 104 + } 105 + 106 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 107 + trait StreamBroadcast { 108 + fn sequence(&self) -> u64; 109 + fn is_persisted_marker(&self) -> bool; 110 + } 111 + 112 + #[cfg(feature = "indexer_stream")] 113 + impl StreamBroadcast for BroadcastEvent { 114 + fn sequence(&self) -> u64 { 115 + match self { 116 + BroadcastEvent::Persisted(id) => *id, 117 + BroadcastEvent::LiveRecord(evt) => evt.id, 118 + BroadcastEvent::Ephemeral(evt) => evt.id, 119 + } 120 + } 121 + 122 + fn is_persisted_marker(&self) -> bool { 123 + matches!(self, BroadcastEvent::Persisted(_)) 124 + } 125 + } 126 + 127 + #[cfg(feature = "relay")] 128 + impl StreamBroadcast for crate::types::RelayBroadcast { 129 + fn sequence(&self) -> u64 { 130 + match self { 131 + Self::Persisted(seq) | Self::Ephemeral(seq, _) => *seq, 132 + } 133 + } 134 + 135 + fn is_persisted_marker(&self) -> bool { 136 + matches!(self, Self::Persisted(_)) 137 + } 138 + } 139 + 140 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 141 + struct PendingLiveEvents<T> { 142 + queue: VecDeque<T>, 143 + persisted_head: Option<u64>, 144 + limit: usize, 145 + } 146 + 147 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 148 + impl<T> PendingLiveEvents<T> 149 + where 150 + T: StreamBroadcast, 151 + { 152 + fn new(limit: usize) -> Self { 153 + Self { 154 + queue: VecDeque::new(), 155 + persisted_head: None, 156 + limit, 157 + } 158 + } 159 + 160 + fn push(&mut self, event: T) -> Result<(), StreamTooSlow> { 161 + if event.is_persisted_marker() { 162 + let seq = event.sequence(); 163 + self.persisted_head = Some(self.persisted_head.unwrap_or(0).max(seq)); 164 + return Ok(()); 165 + } 166 + 167 + if self.queue.len() >= self.limit { 168 + return Err(StreamTooSlow::pending_limit(self.limit)); 169 + } 170 + self.queue.push_back(event); 171 + Ok(()) 172 + } 173 + 174 + fn next_sequence(&self) -> Option<u64> { 175 + self.queue.front().map(StreamBroadcast::sequence) 176 + } 177 + 178 + fn pop_front(&mut self) -> Option<T> { 179 + self.queue.pop_front() 180 + } 181 + 182 + fn push_front(&mut self, event: T) { 183 + self.queue.push_front(event); 184 + } 185 + 186 + fn take_persisted_after(&mut self, current_id: Option<u64>) -> Option<u64> { 187 + let head = self.persisted_head.take()?; 188 + stream_seq_after(head, current_id).then_some(head) 189 + } 190 + } 191 + 192 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 193 + struct ReplayChunk<T> { 194 + events: Vec<T>, 195 + last_seen_seq: Option<u64>, 196 + exhausted: bool, 197 + } 198 + 199 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 200 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 201 + enum SendOutcome { 202 + Sent, 203 + ReceiverDropped, 204 + } 205 + 206 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 207 + fn run_ordered_stream<B, O, E>( 208 + tx: mpsc::Sender<Result<O, E>>, 209 + mut event_rx: broadcast::Receiver<B>, 210 + mut current_seq: Option<u64>, 211 + mut catch_up_target: Option<u64>, 212 + opts: StreamOptions, 213 + mut read_replay_chunk: impl FnMut(Option<u64>, u64, usize) -> ReplayChunk<O>, 214 + mut live_to_output: impl FnMut(B) -> Option<O>, 215 + ) where 216 + B: StreamBroadcast + Clone, 217 + E: From<StreamTooSlow> + fmt::Display, 218 + { 219 + let mut replay_gap_target = None; 220 + let mut pending = PendingLiveEvents::new(opts.pending_event_limit); 37 221 38 222 loop { 39 - if needs_catch_up { 40 - // catch up from db (record events only; ids are sparse due to ephemeral events) 41 - let start = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 42 - for item in ks.range(keys::event_key(start)..) { 43 - let (k, v) = match item.into_inner() { 44 - Ok(kv) => kv, 45 - Err(e) => { 46 - error!(err = %e, "failed to read event from db"); 47 - break; 223 + if let Err(err) = drain_pending_broadcasts(&mut event_rx, &mut pending) { 224 + send_stream_error(&tx, err.into()); 225 + return; 226 + } 227 + 228 + drop_delivered_pending(&mut pending, current_seq); 229 + 230 + if let Some(target) = replay_gap_target { 231 + advance_replay_gap(&mut current_seq, target, &pending); 232 + if current_seq.is_some_and(|seq| seq >= target) { 233 + replay_gap_target = None; 234 + } 235 + } 236 + 237 + if catch_up_target.is_none() { 238 + catch_up_target = pending.take_persisted_after(current_seq); 239 + } 240 + 241 + let pending_next_seq = pending.next_sequence(); 242 + let pending_is_ready = 243 + pending_next_seq.is_some_and(|seq| seq == next_expected_seq(current_seq)); 244 + 245 + if let Some(target) = catch_up_target.filter(|_| !pending_is_ready) { 246 + let effective_target = pending_next_seq 247 + .and_then(|seq| seq.checked_sub(1)) 248 + .map(|before_pending| before_pending.min(target)) 249 + .unwrap_or(target); 250 + let chunk = read_replay_chunk(current_seq, effective_target, opts.replay_chunk_size); 251 + current_seq = chunk.last_seen_seq.or(current_seq); 252 + 253 + for event in chunk.events { 254 + match send_stream_event(&tx, event, &mut event_rx, &mut pending, opts) { 255 + Ok(SendOutcome::Sent) => {} 256 + Ok(SendOutcome::ReceiverDropped) => return, 257 + Err(err) => { 258 + send_stream_error(&tx, err.into()); 259 + return; 48 260 } 49 - }; 261 + } 262 + } 50 263 51 - let id = match k.as_ref().try_into().map(u64::from_be_bytes) { 52 - Ok(id) => id, 53 - Err(_) => { 54 - error!("failed to parse event id"); 55 - continue; 56 - } 57 - }; 58 - current_id = Some(id); 264 + if chunk.exhausted || current_seq.is_some_and(|seq| seq >= effective_target) { 265 + if effective_target == target { 266 + catch_up_target = None; 267 + } 268 + replay_gap_target = Some(effective_target); 269 + } else if !opts.replay_chunk_pause.is_zero() { 270 + std::thread::sleep(opts.replay_chunk_pause); 271 + } 59 272 60 - let stored: StoredEvent = match rmp_serde::from_slice(&v) { 61 - Ok(e) => e, 62 - Err(e) => { 63 - error!(err = %e, "failed to deserialize stored event"); 64 - continue; 65 - } 66 - }; 273 + continue; 274 + } 67 275 68 - let Some(out_evt) = stored_to_event(&state, id, stored, None) else { 69 - continue; 70 - }; 276 + if let Some(event) = pending.pop_front() { 277 + let seq = event.sequence(); 278 + if !stream_seq_after(seq, current_seq) { 279 + continue; 280 + } 71 281 72 - if tx.blocking_send(out_evt).is_err() { 73 - return; // receiver dropped 282 + let expected = next_expected_seq(current_seq); 283 + if seq != expected { 284 + catch_up_target = seq.checked_sub(1); 285 + pending.push_front(event); 286 + continue; 287 + } 288 + 289 + let Some(out_event) = live_to_output(event) else { 290 + catch_up_target = Some(seq); 291 + continue; 292 + }; 293 + 294 + match send_stream_event(&tx, out_event, &mut event_rx, &mut pending, opts) { 295 + Ok(SendOutcome::Sent) => { 296 + current_seq = Some(seq); 297 + } 298 + Ok(SendOutcome::ReceiverDropped) => return, 299 + Err(err) => { 300 + send_stream_error(&tx, err.into()); 301 + return; 74 302 } 75 303 } 76 - needs_catch_up = false; 304 + continue; 77 305 } 78 306 79 - // wait for live events 80 307 match event_rx.blocking_recv() { 81 - Ok(BroadcastEvent::Persisted(_)) => needs_catch_up = true, 82 - Ok(BroadcastEvent::LiveRecord(evt)) => { 83 - let expected = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 84 - if needs_catch_up || evt.id != expected { 85 - needs_catch_up = true; 308 + Ok(event) => { 309 + if event.is_persisted_marker() { 310 + let seq = event.sequence(); 311 + if stream_seq_after(seq, current_seq) { 312 + catch_up_target = Some(seq); 313 + } 86 314 continue; 87 315 } 88 316 89 - let stored = evt.stored.clone(); 90 - let Some(out_evt) = 91 - stored_to_event(&state, evt.id, stored, evt.inline_block.clone()) 92 - else { 93 - needs_catch_up = true; 94 - continue; 95 - }; 96 - let out_id = out_evt.id; 97 - if tx.blocking_send(out_evt).is_err() { 317 + if let Err(err) = pending.push(event) { 318 + send_stream_error(&tx, err.into()); 98 319 return; 99 320 } 100 - current_id = Some(out_id); 101 321 } 102 - Ok(BroadcastEvent::Ephemeral(evt)) => { 103 - let evt_id = evt.id; 104 - if tx.blocking_send(*evt).is_err() { 105 - return; 106 - } 107 - current_id = Some(current_id.unwrap_or(0).max(evt_id)); 322 + Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => { 323 + let err = StreamTooSlow::lagged(skipped); 324 + warn!(%err, "closing slow stream subscriber"); 325 + send_stream_error(&tx, err.into()); 326 + return; 108 327 } 109 - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true, 110 328 Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 111 329 } 112 330 } 113 331 } 114 332 115 - #[cfg(feature = "relay")] 116 - pub(super) fn relay_stream_thread( 333 + #[cfg(feature = "indexer_stream")] 334 + pub(super) fn event_stream_thread( 117 335 state: Arc<AppState>, 118 - tx: mpsc::Sender<bytes::Bytes>, 336 + tx: mpsc::Sender<Result<Event, StreamError>>, 119 337 cursor: Option<u64>, 338 + opts: StreamOptions, 120 339 ) { 121 - use crate::types::RelayBroadcast; 122 - use std::sync::atomic::Ordering; 123 - 124 - let mut relay_rx = state.db.relay_broadcast_tx.subscribe(); 125 - let ks = state.db.relay_events.clone(); 126 - let mut current_seq = match cursor { 127 - Some(c) => c.saturating_sub(1), 128 - None => ks 129 - .iter() 130 - .next_back() 131 - .and_then(|guard| { 132 - guard 133 - .key() 134 - .ok() 135 - .and_then(|k| k.as_ref().try_into().ok()) 136 - .map(u64::from_be_bytes) 137 - }) 138 - .unwrap_or(0), 340 + let db = &state.db; 341 + let event_rx = db.event_tx.subscribe(); 342 + let ks = db.events.clone(); 343 + let current_id = match cursor { 344 + Some(c) => c.checked_sub(1), 345 + None => db.next_event_id.load(Ordering::SeqCst).checked_sub(1), 139 346 }; 140 - let mut head_seq = current_seq; 141 - let mut needs_catch_up = true; 347 + let catch_up_target = cursor 348 + .and_then(|_| db.next_event_id.load(Ordering::SeqCst).checked_sub(1)) 349 + .filter(|target| stream_seq_after(*target, current_id)); 350 + let replay_state = state.clone(); 351 + 352 + run_ordered_stream( 353 + tx, 354 + event_rx, 355 + current_id, 356 + catch_up_target, 357 + opts, 358 + move |current_id, target, chunk_size| { 359 + read_event_replay_chunk(&replay_state, &ks, current_id, target, chunk_size) 360 + }, 361 + move |event| broadcast_to_event(&state, event), 362 + ); 363 + } 364 + 365 + #[cfg(feature = "indexer_stream")] 366 + fn read_event_replay_chunk( 367 + state: &AppState, 368 + ks: &fjall::Keyspace, 369 + current_id: Option<u64>, 370 + target: u64, 371 + chunk_size: usize, 372 + ) -> ReplayChunk<Event> { 373 + let start = current_id.map(|id| id.saturating_add(1)).unwrap_or(0); 374 + if start > target { 375 + return ReplayChunk { 376 + events: Vec::new(), 377 + last_seen_seq: current_id, 378 + exhausted: true, 379 + }; 380 + } 381 + 382 + let mut events = Vec::with_capacity(chunk_size); 383 + let mut last_seen_seq = current_id; 384 + let mut exhausted = false; 385 + let max_scanned = chunk_size.saturating_mul(4).max(chunk_size); 386 + let mut scanned = 0usize; 387 + let mut iter = ks.range(keys::event_key(start)..=keys::event_key(target)); 388 + 389 + while events.len() < chunk_size && scanned < max_scanned { 390 + let Some(item) = iter.next() else { 391 + exhausted = true; 392 + break; 393 + }; 394 + scanned += 1; 395 + 396 + let (k, v) = match item.into_inner() { 397 + Ok(kv) => kv, 398 + Err(e) => { 399 + error!(err = %e, "failed to read event from db"); 400 + exhausted = true; 401 + break; 402 + } 403 + }; 404 + 405 + let id = match k.as_ref().try_into().map(u64::from_be_bytes) { 406 + Ok(id) => id, 407 + Err(_) => { 408 + error!("failed to parse event id"); 409 + continue; 410 + } 411 + }; 412 + last_seen_seq = Some(id); 413 + 414 + let stored: StoredEvent = match rmp_serde::from_slice(&v) { 415 + Ok(e) => e, 416 + Err(e) => { 417 + error!(err = %e, "failed to deserialize stored event"); 418 + continue; 419 + } 420 + }; 421 + 422 + let Some(out_evt) = stored_to_event(state, id, stored, None) else { 423 + continue; 424 + }; 425 + 426 + events.push(out_evt); 427 + } 428 + 429 + ReplayChunk { 430 + events, 431 + last_seen_seq, 432 + exhausted, 433 + } 434 + } 435 + 436 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 437 + fn send_stream_event<O, E, B>( 438 + tx: &mpsc::Sender<Result<O, E>>, 439 + event: O, 440 + event_rx: &mut broadcast::Receiver<B>, 441 + pending: &mut PendingLiveEvents<B>, 442 + opts: StreamOptions, 443 + ) -> Result<SendOutcome, StreamTooSlow> 444 + where 445 + B: StreamBroadcast + Clone, 446 + { 447 + let mut item = Ok(event); 448 + let started = Instant::now(); 142 449 143 450 loop { 144 - if needs_catch_up { 145 - // catch up from db: send all stored frames from current_seq+1 onward 146 - for item in ks.range(crate::db::keys::relay_event_key(current_seq + 1)..) { 147 - let (k, v) = match item.into_inner() { 148 - Ok(kv) => kv, 149 - Err(e) => { 150 - error!(err = %e, "relay stream: failed to read relay_events"); 151 - break; 152 - } 153 - }; 154 - let seq = match k.as_ref().try_into().map(u64::from_be_bytes) { 155 - Ok(s) => s, 156 - Err(_) => { 157 - error!("relay stream: failed to parse relay event seq"); 158 - continue; 159 - } 160 - }; 161 - if seq != current_seq + 1 { 162 - break; 451 + match tx.try_send(item) { 452 + Ok(()) => return Ok(SendOutcome::Sent), 453 + Err(TrySendError::Closed(_)) => return Ok(SendOutcome::ReceiverDropped), 454 + Err(TrySendError::Full(returned)) => { 455 + item = returned; 456 + drain_pending_broadcasts(event_rx, pending)?; 457 + if started.elapsed() >= opts.send_timeout { 458 + return Err(StreamTooSlow::send_timeout(opts.send_timeout)); 163 459 } 164 - if tx.blocking_send(bytes::Bytes::copy_from_slice(&v)).is_err() { 165 - return; // subscriber dropped 166 - } 167 - current_seq = seq; 168 - if current_seq >= head_seq { 169 - break; 170 - } 460 + std::thread::sleep(STREAM_SEND_RETRY_PAUSE); 171 461 } 172 - needs_catch_up = false; 173 462 } 463 + } 464 + } 174 465 175 - // wait for live events 176 - match relay_rx.blocking_recv() { 177 - Ok(RelayBroadcast::Persisted(seq)) => { 178 - head_seq = head_seq.max(seq); 179 - needs_catch_up = current_seq < head_seq; 466 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 467 + fn send_stream_error<O, E>(tx: &mpsc::Sender<Result<O, E>>, err: E) 468 + where 469 + E: fmt::Display, 470 + { 471 + warn!(%err, "closing stream subscriber"); 472 + let _ = tx.try_send(Err(err)); 473 + } 474 + 475 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 476 + fn drain_pending_broadcasts<B>( 477 + event_rx: &mut broadcast::Receiver<B>, 478 + pending: &mut PendingLiveEvents<B>, 479 + ) -> Result<(), StreamTooSlow> 480 + where 481 + B: StreamBroadcast + Clone, 482 + { 483 + loop { 484 + match event_rx.try_recv() { 485 + Ok(event) => pending.push(event)?, 486 + Err(broadcast::error::TryRecvError::Empty) => return Ok(()), 487 + Err(broadcast::error::TryRecvError::Closed) => return Ok(()), 488 + Err(broadcast::error::TryRecvError::Lagged(skipped)) => { 489 + return Err(StreamTooSlow::lagged(skipped)); 180 490 } 181 - Ok(RelayBroadcast::Ephemeral(seq, frame)) => { 182 - head_seq = head_seq.max(seq); 183 - if seq != current_seq + 1 { 184 - // out-of-order or gap: fall back to db catch-up to preserve ordering. 185 - needs_catch_up = true; 186 - continue; 187 - } 188 - if tx.blocking_send(frame).is_err() { 189 - return; 190 - } 191 - current_seq = seq; 491 + } 492 + } 493 + } 494 + 495 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 496 + fn drop_delivered_pending<B>(pending: &mut PendingLiveEvents<B>, current_id: Option<u64>) 497 + where 498 + B: StreamBroadcast, 499 + { 500 + while pending 501 + .next_sequence() 502 + .is_some_and(|id| !stream_seq_after(id, current_id)) 503 + { 504 + pending.pop_front(); 505 + } 506 + } 507 + 508 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 509 + fn advance_replay_gap<B>(current_id: &mut Option<u64>, target: u64, pending: &PendingLiveEvents<B>) 510 + where 511 + B: StreamBroadcast, 512 + { 513 + let next_pending_id = pending.next_sequence().filter(|id| *id <= target); 514 + let advance_to = next_pending_id 515 + .and_then(|id| id.checked_sub(1)) 516 + .unwrap_or(target); 517 + 518 + if stream_seq_after(advance_to, *current_id) { 519 + *current_id = Some(advance_to); 520 + } 521 + } 522 + 523 + #[cfg(feature = "indexer_stream")] 524 + fn broadcast_to_event(state: &AppState, event: BroadcastEvent) -> Option<Event> { 525 + match event { 526 + BroadcastEvent::Persisted(_) => None, 527 + BroadcastEvent::LiveRecord(evt) => { 528 + let stored = evt.stored.clone(); 529 + stored_to_event(state, evt.id, stored, evt.inline_block.clone()) 530 + } 531 + BroadcastEvent::Ephemeral(evt) => Some(*evt), 532 + } 533 + } 534 + 535 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 536 + fn stream_seq_after(id: u64, current_id: Option<u64>) -> bool { 537 + current_id.is_none_or(|current| id > current) 538 + } 539 + 540 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 541 + fn next_expected_seq(current_id: Option<u64>) -> u64 { 542 + current_id.map(|id| id.saturating_add(1)).unwrap_or(0) 543 + } 544 + 545 + #[cfg(feature = "relay")] 546 + pub(super) fn relay_stream_thread( 547 + state: Arc<AppState>, 548 + tx: mpsc::Sender<Result<bytes::Bytes, RelayStreamError>>, 549 + cursor: Option<u64>, 550 + opts: StreamOptions, 551 + ) { 552 + let relay_rx = state.db.relay_broadcast_tx.subscribe(); 553 + let ks = state.db.relay_events.clone(); 554 + let current_seq = match cursor { 555 + Some(c) => Some(c.saturating_sub(1)), 556 + None => Some( 557 + state 558 + .db 559 + .next_relay_seq 560 + .load(Ordering::SeqCst) 561 + .saturating_sub(1), 562 + ), 563 + }; 564 + let catch_up_target = cursor 565 + .and_then(|_| { 566 + state 567 + .db 568 + .next_relay_seq 569 + .load(Ordering::SeqCst) 570 + .checked_sub(1) 571 + }) 572 + .filter(|target| stream_seq_after(*target, current_seq)); 573 + 574 + run_ordered_stream( 575 + tx, 576 + relay_rx, 577 + current_seq, 578 + catch_up_target, 579 + opts, 580 + move |current_seq, target, chunk_size| { 581 + read_relay_replay_chunk(&ks, current_seq, target, chunk_size) 582 + }, 583 + relay_broadcast_to_frame, 584 + ); 585 + } 586 + 587 + #[cfg(feature = "relay")] 588 + fn read_relay_replay_chunk( 589 + ks: &fjall::Keyspace, 590 + current_seq: Option<u64>, 591 + target: u64, 592 + chunk_size: usize, 593 + ) -> ReplayChunk<bytes::Bytes> { 594 + let start = current_seq.map(|seq| seq.saturating_add(1)).unwrap_or(0); 595 + if start > target { 596 + return ReplayChunk { 597 + events: Vec::new(), 598 + last_seen_seq: current_seq, 599 + exhausted: true, 600 + }; 601 + } 602 + 603 + let mut events = Vec::with_capacity(chunk_size); 604 + let mut last_seen_seq = current_seq; 605 + let mut exhausted = false; 606 + let max_scanned = chunk_size.saturating_mul(4).max(chunk_size); 607 + let mut scanned = 0usize; 608 + let mut iter = ks.range(keys::relay_event_key(start)..=keys::relay_event_key(target)); 609 + 610 + while events.len() < chunk_size && scanned < max_scanned { 611 + let Some(item) = iter.next() else { 612 + exhausted = true; 613 + break; 614 + }; 615 + scanned += 1; 616 + 617 + let (k, v) = match item.into_inner() { 618 + Ok(kv) => kv, 619 + Err(e) => { 620 + error!(err = %e, "relay stream: failed to read relay_events"); 621 + exhausted = true; 622 + break; 192 623 } 193 - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => needs_catch_up = true, 194 - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, 624 + }; 625 + let seq = match k.as_ref().try_into().map(u64::from_be_bytes) { 626 + Ok(seq) => seq, 627 + Err(_) => { 628 + error!("relay stream: failed to parse relay event seq"); 629 + continue; 630 + } 631 + }; 632 + last_seen_seq = Some(seq); 633 + events.push(bytes::Bytes::copy_from_slice(&v)); 634 + } 635 + 636 + ReplayChunk { 637 + events, 638 + last_seen_seq, 639 + exhausted, 640 + } 641 + } 642 + 643 + #[cfg(feature = "relay")] 644 + fn relay_broadcast_to_frame(event: crate::types::RelayBroadcast) -> Option<bytes::Bytes> { 645 + match event { 646 + crate::types::RelayBroadcast::Persisted(_) => None, 647 + crate::types::RelayBroadcast::Ephemeral(_, frame) => Some(frame), 648 + } 649 + } 650 + 651 + #[cfg(all(test, any(feature = "indexer_stream", feature = "relay")))] 652 + mod tests { 653 + use super::*; 654 + 655 + #[derive(Clone, Debug)] 656 + enum TestBroadcast { 657 + Persisted(u64), 658 + Live(u64), 659 + } 660 + 661 + impl StreamBroadcast for TestBroadcast { 662 + fn sequence(&self) -> u64 { 663 + match self { 664 + Self::Persisted(seq) | Self::Live(seq) => *seq, 665 + } 195 666 } 667 + 668 + fn is_persisted_marker(&self) -> bool { 669 + matches!(self, Self::Persisted(_)) 670 + } 671 + } 672 + 673 + #[test] 674 + fn ordered_stream_replays_chunks_then_live_tail() { 675 + let opts = StreamOptions { 676 + replay_chunk_size: 2, 677 + replay_chunk_pause: Duration::ZERO, 678 + pending_event_limit: 4, 679 + send_timeout: Duration::from_secs(1), 680 + }; 681 + let (out_tx, mut out_rx) = mpsc::channel(16); 682 + let (broadcast_tx, broadcast_rx) = broadcast::channel(16); 683 + 684 + let handle = std::thread::spawn(move || { 685 + run_ordered_stream::<TestBroadcast, u64, StreamTooSlow>( 686 + out_tx, 687 + broadcast_rx, 688 + Some(0), 689 + Some(5), 690 + opts, 691 + |current, target, chunk_size| { 692 + let start = current.map(|seq| seq.saturating_add(1)).unwrap_or(0); 693 + if start > target { 694 + return ReplayChunk { 695 + events: Vec::new(), 696 + last_seen_seq: current, 697 + exhausted: true, 698 + }; 699 + } 700 + 701 + let end = target.min(start.saturating_add(chunk_size as u64).saturating_sub(1)); 702 + let events = (start..=end).collect::<Vec<_>>(); 703 + ReplayChunk { 704 + last_seen_seq: events.last().copied().or(current), 705 + exhausted: end >= target, 706 + events, 707 + } 708 + }, 709 + |event| match event { 710 + TestBroadcast::Persisted(_) => None, 711 + TestBroadcast::Live(seq) => Some(seq), 712 + }, 713 + ); 714 + }); 715 + 716 + broadcast_tx.send(TestBroadcast::Live(6)).unwrap(); 717 + broadcast_tx.send(TestBroadcast::Persisted(6)).unwrap(); 718 + drop(broadcast_tx); 719 + 720 + let mut out = Vec::new(); 721 + while let Some(item) = out_rx.blocking_recv() { 722 + out.push(item.unwrap()); 723 + } 724 + handle.join().unwrap(); 725 + 726 + assert_eq!(out, vec![1, 2, 3, 4, 5, 6]); 727 + } 728 + 729 + #[test] 730 + fn ordered_stream_closes_when_output_queue_is_full() { 731 + let opts = StreamOptions { 732 + replay_chunk_size: 2, 733 + replay_chunk_pause: Duration::ZERO, 734 + pending_event_limit: 4, 735 + send_timeout: Duration::ZERO, 736 + }; 737 + let (out_tx, _out_rx) = mpsc::channel(1); 738 + let (_broadcast_tx, broadcast_rx) = broadcast::channel(16); 739 + 740 + run_ordered_stream::<TestBroadcast, u64, StreamTooSlow>( 741 + out_tx, 742 + broadcast_rx, 743 + Some(0), 744 + Some(2), 745 + opts, 746 + |_, _, _| ReplayChunk { 747 + events: vec![1, 2], 748 + last_seen_seq: Some(2), 749 + exhausted: true, 750 + }, 751 + |event| match event { 752 + TestBroadcast::Persisted(_) => None, 753 + TestBroadcast::Live(seq) => Some(seq), 754 + }, 755 + ); 196 756 } 197 757 } 198 758
+48 -7
src/db/ephemeral.rs
··· 11 11 12 12 #[cfg(any(feature = "indexer_stream", feature = "relay"))] 13 13 const AUTO_COMPACT_PRUNED_SEQ_INTERVAL: u64 = 250_000; 14 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 15 + const TTL_PRUNE_BATCH_SIZE: usize = 10_000; 16 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 17 + const TTL_PRUNE_BATCH_PAUSE: Duration = Duration::from_millis(10); 14 18 #[cfg(feature = "indexer_stream")] 15 19 static LAST_EVENTS_COMPACTED_SEQ: AtomicU64 = AtomicU64::new(0); 16 20 #[cfg(feature = "relay")] ··· 128 132 .map(u64::from_be_bytes) 129 133 .unwrap_or(0); 130 134 131 - let start_key_events = keys::event_key(last_pruned_seq); 132 - let cutoff_key_events = keys::event_key(cutoff_seq); 133 - let mut batch = db.inner.batch(); 134 135 let mut pruned = 0usize; 136 + let mut next_prune_seq = last_pruned_seq; 135 137 136 - for guard in events_ks.range(start_key_events..cutoff_key_events) { 137 - let k = guard.key().into_diagnostic()?; 138 - batch.remove(events_ks, k); 139 - pruned += 1; 138 + loop { 139 + let start_key_events = keys::event_key(next_prune_seq); 140 + let cutoff_key_events = keys::event_key(cutoff_seq); 141 + let mut keys_to_remove = Vec::with_capacity(TTL_PRUNE_BATCH_SIZE); 142 + let mut last_removed_seq = None; 143 + 144 + for guard in events_ks.range(start_key_events..cutoff_key_events) { 145 + let k = guard.key().into_diagnostic()?; 146 + last_removed_seq = Some(read_event_seq(&k)?); 147 + keys_to_remove.push(k); 148 + 149 + if keys_to_remove.len() >= TTL_PRUNE_BATCH_SIZE { 150 + break; 151 + } 152 + } 153 + 154 + let Some(last_removed_seq) = last_removed_seq else { 155 + break; 156 + }; 157 + 158 + let mut batch = db.inner.batch(); 159 + for key in keys_to_remove { 160 + batch.remove(events_ks, key); 161 + pruned += 1; 162 + } 163 + batch.insert( 164 + &db.cursors, 165 + pruned_key.clone(), 166 + last_removed_seq.to_be_bytes(), 167 + ); 168 + batch.commit().into_diagnostic()?; 169 + 170 + next_prune_seq = last_removed_seq.saturating_add(1); 171 + std::thread::sleep(TTL_PRUNE_BATCH_PAUSE); 140 172 } 141 173 174 + let mut batch = db.inner.batch(); 142 175 batch.insert(&db.cursors, pruned_key, cutoff_seq.to_be_bytes()); 143 176 144 177 // clean up consumed watermark entries (everything up to and including cutoff_ts) ··· 186 219 187 220 Ok(()) 188 221 } 222 + 223 + #[cfg(any(feature = "indexer_stream", feature = "relay"))] 224 + fn read_event_seq(key: &[u8]) -> miette::Result<u64> { 225 + key.try_into() 226 + .into_diagnostic() 227 + .wrap_err("event key must be 8 bytes") 228 + .map(u64::from_be_bytes) 229 + }
+39
src/ingest/stream.rs
··· 635 635 } 636 636 637 637 #[cfg(feature = "relay")] 638 + #[derive(Serialize)] 639 + struct EncodeErrorHeader { 640 + op: i64, 641 + } 642 + 643 + #[cfg(feature = "relay")] 644 + #[derive(Serialize)] 645 + struct EncodeErrorFrame<'a> { 646 + error: &'a str, 647 + #[serde(skip_serializing_if = "Option::is_none")] 648 + message: Option<&'a str>, 649 + } 650 + 651 + #[cfg(feature = "relay")] 638 652 pub fn encode_frame<T: serde::Serialize>(t: &str, msg: &T) -> miette::Result<bytes::Bytes> { 639 653 let mut buf = serde_ipld_dagcbor::to_vec(&EncodeHeader { op: 1, t }) 640 654 .map_err(|e| miette::miette!("encode_frame header: {e}"))?; ··· 644 658 Ok(bytes::Bytes::from(buf)) 645 659 } 646 660 661 + #[cfg(feature = "relay")] 662 + pub fn encode_error_frame(error: &str, message: Option<&str>) -> miette::Result<bytes::Bytes> { 663 + let mut buf = serde_ipld_dagcbor::to_vec(&EncodeErrorHeader { op: -1 }) 664 + .map_err(|e| miette::miette!("encode_error_frame header: {e}"))?; 665 + buf.extend_from_slice( 666 + &serde_ipld_dagcbor::to_vec(&EncodeErrorFrame { error, message }) 667 + .map_err(|e| miette::miette!("encode_error_frame body: {e}"))?, 668 + ); 669 + Ok(bytes::Bytes::from(buf)) 670 + } 671 + 647 672 #[cfg(test)] 648 673 mod test { 674 + #[cfg(feature = "relay")] 675 + use super::FirehoseError; 649 676 use super::{SubscribeReposMessage, decode_frame}; 650 677 651 678 #[test] ··· 668 695 panic!("expected Commit"); 669 696 }; 670 697 assert!(c.since.is_none(), "since should be None for empty string"); 698 + } 699 + 700 + #[cfg(feature = "relay")] 701 + #[test] 702 + fn test_decode_encoded_error_frame() { 703 + let bytes = super::encode_error_frame("ConsumerTooSlow", Some("blocked")).unwrap(); 704 + let Err(FirehoseError::RelayError { error, message }) = decode_frame(&bytes) else { 705 + panic!("expected relay error"); 706 + }; 707 + 708 + assert_eq!(error, "ConsumerTooSlow"); 709 + assert_eq!(message.as_deref(), Some("blocked")); 671 710 } 672 711 }