Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'fix-consumer-shutdown' into 'main'

Proper Firehose Consumer Shutdown

See merge request parakeet-social/parakeet!9

Mia 69e8f708 2a756bea

+73 -19
+10 -9
Cargo.lock
··· 695 695 "serde_ipld_dagcbor", 696 696 "serde_json", 697 697 "sled", 698 + "thiserror 2.0.12", 698 699 "tokio", 699 700 "tokio-postgres", 700 701 "tokio-stream", ··· 941 942 "reqwest", 942 943 "serde", 943 944 "serde_json", 944 - "thiserror 2.0.11", 945 + "thiserror 2.0.12", 945 946 "tokio", 946 947 ] 947 948 ··· 2594 2595 "eyre", 2595 2596 "serde", 2596 2597 "serde_json", 2597 - "thiserror 2.0.11", 2598 + "thiserror 2.0.12", 2598 2599 "walkdir", 2599 2600 ] 2600 2601 ··· 3536 3537 dependencies = [ 3537 3538 "num-bigint", 3538 3539 "num-traits", 3539 - "thiserror 2.0.11", 3540 + "thiserror 2.0.12", 3540 3541 "time", 3541 3542 ] 3542 3543 ··· 3718 3719 3719 3720 [[package]] 3720 3721 name = "thiserror" 3721 - version = "2.0.11" 3722 + version = "2.0.12" 3722 3723 source = "registry+https://github.com/rust-lang/crates.io-index" 3723 - checksum = "d452f284b73e6d76dd36758a0c8684b1d5be31f92b89d07fd5822175732206fc" 3724 + checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" 3724 3725 dependencies = [ 3725 - "thiserror-impl 2.0.11", 3726 + "thiserror-impl 2.0.12", 3726 3727 ] 3727 3728 3728 3729 [[package]] ··· 3738 3739 3739 3740 [[package]] 3740 3741 name = "thiserror-impl" 3741 - version = "2.0.11" 3742 + version = "2.0.12" 3742 3743 source = "registry+https://github.com/rust-lang/crates.io-index" 3743 - checksum = "26afc1baea8a989337eeb52b6e72a039780ce45c3edfcc9c5b9d112feeb173c2" 3744 + checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" 3744 3745 dependencies = [ 3745 3746 "proc-macro2", 3746 3747 "quote", ··· 4142 4143 "native-tls", 4143 4144 "rand", 4144 4145 "sha1", 4145 - "thiserror 2.0.11", 4146 + "thiserror 2.0.12", 4146 4147 "utf-8", 4147 4148 ] 4148 4149
+1
consumer/Cargo.toml
··· 28 28 serde_ipld_dagcbor = "0.6.1" 29 29 serde_json = "1.0.134" 30 30 sled = "0.34.7" 31 + thiserror = "2" 31 32 tokio = { version = "1.42.0", features = ["full"] } 32 33 tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } 33 34 tokio-stream = "0.1.17"
+12
consumer/src/firehose/error.rs
··· 1 + use thiserror::Error; 2 + use std::io::Error as IoError; 3 + 4 + #[derive(Debug, Error)] 5 + pub enum FirehoseError { 6 + #[error("{0}")] 7 + Cbor(#[from] ciborium::de::Error<IoError>), 8 + #[error("{0}")] 9 + IpldCbor(#[from] serde_ipld_dagcbor::error::DecodeError<IoError>), 10 + #[error("{0}")] 11 + Websocket(#[from] tokio_tungstenite::tungstenite::error::Error), 12 + }
+19 -3
consumer/src/firehose/mod.rs
··· 4 4 use std::io::Cursor; 5 5 use tokio::net::TcpStream; 6 6 use tokio_tungstenite::tungstenite::client::IntoClientRequest; 7 + use tokio_tungstenite::tungstenite::error::{Error as WsError, ProtocolError}; 7 8 use tokio_tungstenite::tungstenite::http::header::USER_AGENT; 8 9 use tokio_tungstenite::tungstenite::Message; 9 10 use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; 10 11 pub use types::*; 11 12 13 + pub mod error; 12 14 pub mod types; 13 15 14 16 pub struct FirehoseConsumer { ··· 51 53 self.seq 52 54 } 53 55 54 - pub async fn drive(&mut self) -> eyre::Result<FirehoseOutput> { 55 - let Some(message) = self.stream.next().await.transpose()? else { 56 - return Ok(FirehoseOutput::Close); 56 + pub async fn drive(&mut self) -> Result<FirehoseOutput, error::FirehoseError> { 57 + let message = match self.stream.next().await { 58 + Some(Ok(data)) => data, 59 + Some(Err(err)) if is_ws_reset(&err) => return Ok(FirehoseOutput::Close), 60 + Some(Err(err)) => return Err(err.into()), 61 + None => return Ok(FirehoseOutput::Close), 57 62 }; 58 63 59 64 match message { ··· 128 133 Ok(FirehoseOutput::Continue) 129 134 } 130 135 } 136 + 137 + fn is_ws_reset(err: &WsError) -> bool { 138 + use std::io::ErrorKind; 139 + 140 + match err { 141 + WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake) 142 + | WsError::ConnectionClosed => true, 143 + WsError::Io(ioerr) => matches!(ioerr.kind(), ErrorKind::BrokenPipe | ErrorKind::ConnectionReset), 144 + _ => false, 145 + } 146 + }
+31 -7
consumer/src/indexer/mod.rs
··· 76 76 } 77 77 78 78 pub async fn run(mut self, threads: u8) -> eyre::Result<()> { 79 - let (submit, _handles) = (0..threads) 79 + let (submit, handles) = (0..threads) 80 80 .map(|idx| { 81 81 let pool = self.pool.clone(); 82 82 let mut state = self.state.clone(); ··· 110 110 }) 111 111 .unzip::<_, _, Vec<_>, Vec<_>>(); 112 112 113 + let mut join_set = tokio::task::JoinSet::from_iter(handles); 114 + 113 115 let threads = threads as u64; 114 116 // timer to log the current seq every 10s. 115 117 let mut timer = tokio::time::interval(tokio::time::Duration::from_secs(10)); ··· 118 120 tokio::select! { 119 121 _ = timer.tick() => { 120 122 let seq = self.firehose.current_seq(); 121 - self.resume.insert("firehose", &seq.to_le_bytes())?; 123 + if let Err(e) = self.resume.insert("firehose", &seq.to_le_bytes()) { 124 + tracing::error!("Failed to update seq ({seq}) in resume: {e}"); 125 + } 122 126 counter!("firehose_seq").absolute(seq); 123 127 }, 124 128 out = self.firehose.drive() => { 125 - match out? { 126 - FirehoseOutput::Close => break, 127 - FirehoseOutput::Continue => continue, 128 - FirehoseOutput::Error(err) => { 129 + match out { 130 + Ok(FirehoseOutput::Close) => break, 131 + Ok(FirehoseOutput::Continue) => continue, 132 + Ok(FirehoseOutput::Error(err)) => { 129 133 tracing::error!("Firehose sent an error, exiting: {err:?}"); 130 134 break; 131 135 } 132 - FirehoseOutput::Event(event) => { 136 + Ok(FirehoseOutput::Event(event)) => { 133 137 self.consume_firehose_event(*event, threads, &submit).await; 134 138 } 139 + Err(err) => { 140 + tracing::error!("Firehose drive error: {err:?}"); 141 + break; 142 + } 135 143 } 136 144 } 137 145 } 138 146 } 147 + 148 + tracing::info!("Firehose closed - draining then exiting"); 149 + 150 + // when we get here, explicitly drop the senders - this should cause the channels to drain 151 + // and then return None and stop. After returning from this function, idxc_tx also drops 152 + // of scope and the whole program should then exit (unless backfill or labels are running) 153 + drop(submit); 154 + 155 + while let Some(res) = join_set.join_next().await { 156 + match res { 157 + Ok(Ok(_)) => {} 158 + Ok(Err(err)) | Err(err) => return Err(err.into()), 159 + } 160 + } 161 + 162 + tracing::info!("indexer exiting."); 139 163 140 164 Ok(()) 141 165 }