Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(consumer): automatic resume for labels and firehose.

see #25.

Mia ddd37400 a5e8de21

+100 -26
+1
Cargo.lock
··· 694 694 "serde_bytes", 695 695 "serde_ipld_dagcbor", 696 696 "serde_json", 697 + "sled", 697 698 "tokio", 698 699 "tokio-postgres", 699 700 "tokio-stream",
+1
consumer/Cargo.toml
··· 27 27 serde_bytes = "0.11" 28 28 serde_ipld_dagcbor = "0.6.1" 29 29 serde_json = "1.0.134" 30 + sled = "0.34.7" 30 31 tokio = { version = "1.42.0", features = ["full"] } 31 32 tokio-postgres = { version = "0.7.12", features = ["with-chrono-0_4"] } 32 33 tokio-stream = "0.1.17"
+2
consumer/src/config.rs
··· 23 23 /// DIDs of label services to force subscription to. 24 24 #[serde(default)] 25 25 pub initial_label_services: Vec<String>, 26 + /// Where to store the resume information for labels and indexer only. 27 + pub resume_path: Option<String>, 26 28 27 29 /// Configuration items specific to indexer 28 30 pub indexer: Option<IndexerConfig>,
+4 -1
consumer/src/indexer/mod.rs
··· 43 43 state: RelayIndexerState, 44 44 firehose: FirehoseConsumer, 45 45 hasher: RandomState, 46 + resume: sled::Db, 46 47 } 47 48 48 49 impl RelayIndexer { ··· 51 52 idxc_tx: Sender<parakeet_index::AggregateDeltaReq>, 52 53 resolver: Arc<Resolver>, 53 54 firehose: FirehoseConsumer, 55 + resume: sled::Db, 54 56 opts: RelayIndexerOpts, 55 57 ) -> eyre::Result<Self> { 56 58 let indexer = RelayIndexer { ··· 63 65 idxc_tx, 64 66 }, 65 67 hasher: RandomState::default(), 68 + resume, 66 69 }; 67 70 68 71 if opts.skip_handle_validation { ··· 115 118 tokio::select! { 116 119 _ = timer.tick() => { 117 120 let seq = self.firehose.current_seq(); 118 - tracing::info!("Current firehose seq is: {seq}"); 121 + self.resume.insert("firehose", &seq.to_le_bytes())?; 119 122 counter!("firehose_seq").absolute(seq); 120 123 }, 121 124 out = self.firehose.drive() => {
+55 -24
consumer/src/label_indexer/mod.rs
··· 7 7 use std::time::Duration; 8 8 use tokio::sync::mpsc::{channel, Receiver, Sender}; 9 9 use tokio::task::JoinHandle; 10 + use tokio::time::Instant; 10 11 use tokio_postgres::binary_copy::BinaryCopyInWriter; 11 12 use tokio_postgres::types::Type; 12 13 use tokio_postgres::NoTls; 13 14 use tracing::instrument; 15 + use tokio::sync::watch::Receiver as WatchReceiver; 14 16 15 17 const LABELER_SERVICE_ID: &str = "#atproto_labeler"; 16 18 ··· 20 22 resolver: Arc<Resolver>, 21 23 services: HashMap<String, JoinHandle<()>>, 22 24 user_agent: String, 25 + resume: sled::Db, 23 26 } 24 27 25 28 impl LabelServiceManager { 26 29 pub async fn new( 27 30 pg_url: &str, 28 31 resolver: Arc<Resolver>, 32 + resume: sled::Db, 29 33 user_agent: String, 30 34 ) -> eyre::Result<(Self, Sender<String>)> { 31 35 let (client, connection) = tokio_postgres::connect(pg_url, NoTls).await?; ··· 42 46 client, 43 47 rx, 44 48 resolver, 49 + resume, 45 50 services: HashMap::new(), 46 51 user_agent, 47 52 }; ··· 52 57 pub async fn run(mut self, inital_services: Vec<String>) -> eyre::Result<()> { 53 58 let (db_tx, mut db_rx) = channel(8192); 54 59 60 + let (watch_tx, watch_rx) = tokio::sync::watch::channel(Instant::now()); 61 + 55 62 for service in inital_services { 56 63 let service_endpoint = resolve_service_endpoint(&self.resolver, &service).await?; 57 64 let service_endpoint = service_endpoint.replace("https://", "wss://"); 58 65 59 66 let handle = tokio::spawn(label_consumer( 67 + self.resume.clone(), 68 + watch_rx.clone(), 60 69 service.clone(), 61 70 service_endpoint, 62 71 self.user_agent.clone(), ··· 66 75 } 67 76 68 77 let mut timer = tokio::time::interval(Duration::from_millis(250)); 78 + let mut seq_timer = tokio::time::interval(Duration::from_secs(10)); 69 79 70 80 let mut buf = Vec::with_capacity(8192); 71 81 ··· 80 90 let service_endpoint = service_endpoint.replace("https://", "wss://"); 81 91 82 92 let handle = tokio::spawn(label_consumer( 93 + self.resume.clone(), 94 + watch_rx.clone(), 83 95 service.clone(), 84 96 service_endpoint, 85 97 self.user_agent.clone(), ··· 88 100 89 101 self.services.insert(service, handle); 90 102 103 + continue; 104 + } 105 + inst = seq_timer.tick() => { 106 + watch_tx.send(inst)?; 91 107 continue; 92 108 } 93 109 _ = timer.tick() => { ··· 161 177 Ok(count) 162 178 } 163 179 164 - #[instrument(skip(service_did, user_agent, db_tx))] 180 + #[instrument(skip(resume, trigger, service_did, user_agent, db_tx))] 165 181 async fn label_consumer( 182 + resume: sled::Db, 183 + mut trigger: WatchReceiver<Instant>, 166 184 service_did: String, 167 185 service: String, 168 186 user_agent: String, 169 187 db_tx: Sender<AtpLabel>, 170 188 ) { 171 - let mut consumer = match FirehoseConsumer::new_labeler(&service, None, &user_agent).await { 189 + let start_seq = resume.get(&service_did).ok().flatten().and_then(crate::utils::u64_from_ivec); 190 + 191 + if let Some(start_seq) = start_seq { 192 + tracing::info!("starting {service_did} label consumer from {start_seq}"); 193 + } 194 + 195 + let mut consumer = match FirehoseConsumer::new_labeler(&service, start_seq, &user_agent).await { 172 196 Ok(consumer) => consumer, 173 197 Err(err) => { 174 198 tracing::error!("Failed to connect to labeler: {err}"); ··· 176 200 } 177 201 }; 178 202 179 - while let Ok(output) = consumer.drive().await { 180 - match output { 181 - FirehoseOutput::Close => break, 182 - FirehoseOutput::Continue => continue, 183 - FirehoseOutput::Error(err) => { 184 - tracing::error!("Firehose sent an error, exiting: {err:?}"); 185 - break; 203 + loop { 204 + tokio::select! { 205 + _ = trigger.changed() => { 206 + resume.insert(&service_did, &consumer.current_seq().to_le_bytes()).unwrap(); 186 207 } 187 - FirehoseOutput::Event(event) => { 188 - if let FirehoseEvent::Label(label_event) = *event { 189 - let count = label_event.labels.len() as u64; 208 + Ok(output) = consumer.drive() => { 209 + match output { 210 + FirehoseOutput::Close => break, 211 + FirehoseOutput::Continue => continue, 212 + FirehoseOutput::Error(err) => { 213 + tracing::error!("Firehose sent an error, exiting: {err:?}"); 214 + break; 215 + } 216 + FirehoseOutput::Event(event) => { 217 + if let FirehoseEvent::Label(label_event) = *event { 218 + let count = label_event.labels.len() as u64; 190 219 191 - for label in label_event.labels { 192 - if label.src != service_did { 193 - tracing::warn!("labeler sent incorrect src"); 194 - continue; 195 - } 220 + for label in label_event.labels { 221 + if label.src != service_did { 222 + tracing::warn!("labeler sent incorrect src"); 223 + continue; 224 + } 225 + 226 + if let Err(e) = db_tx.send(label).await { 227 + tracing::error!("Failed to send label event: {e:?}"); 228 + return; 229 + } 230 + } 196 231 197 - if let Err(e) = db_tx.send(label).await { 198 - tracing::error!("Failed to send label event: {e:?}"); 199 - return; 232 + counter!("seen_labels", "service" => service_did.clone()).increment(count); 233 + } else { 234 + tracing::warn!("got non #label event from a labeler"); 200 235 } 201 236 } 202 - 203 - counter!("seen_labels", "service" => service_did.clone()).increment(count); 204 - } else { 205 - tracing::warn!("got non #label event from a labeler"); 206 237 } 207 238 } 208 239 }
+28 -1
consumer/src/main.rs
··· 37 37 38 38 let mut join_set = tokio::task::JoinSet::new(); 39 39 40 + let resume = (cli.labels || cli.indexer) 41 + .then::<Result<_, eyre::Report>, _>(|| { 42 + let resume_path = conf.resume_path.ok_or_eyre( 43 + "Config item resume_path must be specified when using --indexer or --labels", 44 + )?; 45 + 46 + let db = sled::open(resume_path)?; 47 + 48 + Ok(db) 49 + }) 50 + .transpose()?; 51 + 40 52 if cli.labels { 53 + let resume = resume.clone().unwrap(); 54 + 41 55 let (label_mgr, _label_svc_tx) = label_indexer::LabelServiceManager::new( 42 56 &conf.database_url, 43 57 resolver.clone(), 58 + resume, 44 59 user_agent.clone(), 45 60 ) 46 61 .await?; ··· 57 72 } 58 73 59 74 if cli.indexer { 75 + let resume = resume.clone().unwrap(); 76 + 60 77 let indexer_cfg = conf 61 78 .indexer 62 79 .ok_or_eyre("Config item [indexer] must be specified when using --indexer")?; 63 80 64 81 let (idxc_tx, idxc_rx) = tokio::sync::mpsc::channel(128); 65 82 83 + let start_seq = resume 84 + .get("firehose")? 85 + .and_then(utils::u64_from_ivec) 86 + .or(indexer_cfg.start_commit_seq); 87 + 88 + if let Some(start_seq) = start_seq { 89 + tracing::info!("starting firehose consumer from {start_seq}"); 90 + } 91 + 66 92 let relay_firehose = firehose::FirehoseConsumer::new_relay( 67 93 &indexer_cfg.relay_source, 68 - indexer_cfg.start_commit_seq, 94 + start_seq, 69 95 &user_agent, 70 96 ) 71 97 .await?; ··· 80 106 idxc_tx, 81 107 resolver.clone(), 82 108 relay_firehose, 109 + resume, 83 110 indexer_opts, 84 111 ) 85 112 .await?;
+9
consumer/src/utils.rs
··· 48 48 false => Some(input), 49 49 } 50 50 } 51 + 52 + pub fn u64_from_ivec(val: sled::IVec) -> Option<u64> { 53 + if val.len() == 8 { 54 + let bytes = val[0..8].try_into().ok()?; 55 + Some(u64::from_le_bytes(bytes)) 56 + } else { 57 + None 58 + } 59 + }