Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
66
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(consumer): move label consumption to the blacksky/rsky mod relay

Mia 80baef1e e0347740

+21 -90
+1 -3
consumer/src/config.rs
··· 19 19 pub plc_directory: Option<String>, 20 20 /// Adds contact details (email / bluesky handle / website) to the UA header. 21 21 pub ua_contact: Option<String>, 22 - /// DIDs of label services to force subscription to. 23 - #[serde(default)] 24 - pub initial_label_services: Vec<String>, 22 + pub label_source: Option<String>, 25 23 /// Where to store the resume information for labels and indexer only. 26 24 pub resume_path: Option<String>, 27 25
+16 -84
consumer/src/label_indexer/mod.rs
··· 1 1 use crate::firehose::{AtpLabel, FirehoseConsumer, FirehoseEvent, FirehoseOutput}; 2 - use did_resolver::Resolver; 3 2 use futures::pin_mut; 4 3 use metrics::counter; 5 - use std::collections::HashMap; 6 - use std::sync::Arc; 7 4 use std::time::Duration; 8 - use tokio::sync::mpsc::{channel, Receiver, Sender}; 5 + use tokio::sync::mpsc::{channel, Sender}; 9 6 use tokio::sync::watch::Receiver as WatchReceiver; 10 - use tokio::task::JoinHandle; 11 7 use tokio::time::Instant; 12 8 use tokio_postgres::binary_copy::BinaryCopyInWriter; 13 9 use tokio_postgres::types::Type; 14 10 use tracing::instrument; 15 11 16 - const LABELER_SERVICE_ID: &str = "#atproto_labeler"; 17 - 18 12 pub struct LabelServiceManager { 19 13 conn: deadpool_postgres::Object, 20 - rx: Receiver<String>, 21 - resolver: Arc<Resolver>, 22 - services: HashMap<String, JoinHandle<()>>, 23 14 user_agent: String, 24 15 resume: sled::Db, 25 16 } ··· 27 18 impl LabelServiceManager { 28 19 pub async fn new( 29 20 pool: deadpool_postgres::Pool, 30 - resolver: Arc<Resolver>, 31 21 resume: sled::Db, 32 22 user_agent: String, 33 - ) -> eyre::Result<(Self, Sender<String>)> { 23 + ) -> eyre::Result<Self> { 34 24 let conn = pool.get().await?; 35 - let (tx, rx) = channel(8); 36 25 37 26 let lsm = LabelServiceManager { 38 27 conn, 39 - rx, 40 - resolver, 41 28 resume, 42 - services: HashMap::new(), 43 29 user_agent, 44 30 }; 45 31 46 - Ok((lsm, tx)) 32 + Ok(lsm) 47 33 } 48 34 49 - pub async fn run(mut self, inital_services: Vec<String>) -> eyre::Result<()> { 35 + pub async fn run(mut self, label_relay: String) -> eyre::Result<()> { 50 36 let (db_tx, mut db_rx) = channel(8192); 51 - 52 37 let (watch_tx, watch_rx) = tokio::sync::watch::channel(Instant::now()); 53 38 54 - for service in inital_services { 55 - let service_endpoint = resolve_service_endpoint(&self.resolver, &service).await?; 56 - let service_endpoint = service_endpoint.replace("https://", "wss://"); 57 - 58 - let handle = tokio::spawn(label_consumer( 59 - self.resume.clone(), 60 - watch_rx.clone(), 61 - service.clone(), 62 - service_endpoint, 63 - self.user_agent.clone(), 64 - db_tx.clone(), 65 - )); 66 - self.services.insert(service, handle); 67 - } 39 + let handle = tokio::spawn(label_consumer( 40 + self.resume, 41 + watch_rx, 42 + label_relay, 43 + self.user_agent.clone(), 44 + db_tx, 45 + )); 68 46 69 47 let mut timer = tokio::time::interval(Duration::from_millis(250)); 70 48 let mut seq_timer = tokio::time::interval(Duration::from_secs(10)); ··· 73 51 74 52 loop { 75 53 let res = tokio::select! { 76 - Some(service) = self.rx.recv() => { 77 - if self.services.contains_key(&service) { 78 - continue; 79 - } 80 - 81 - let service_endpoint = resolve_service_endpoint(&self.resolver, &service).await?; 82 - let service_endpoint = service_endpoint.replace("https://", "wss://"); 83 - 84 - let handle = tokio::spawn(label_consumer( 85 - self.resume.clone(), 86 - watch_rx.clone(), 87 - service.clone(), 88 - service_endpoint, 89 - self.user_agent.clone(), 90 - db_tx.clone(), 91 - )); 92 - 93 - self.services.insert(service, handle); 94 - 95 - continue; 96 - } 97 54 inst = seq_timer.tick() => { 98 55 watch_tx.send(inst)?; 99 56 continue; ··· 169 126 Ok(count) 170 127 } 171 128 172 - #[instrument(skip(resume, trigger, service_did, user_agent, db_tx))] 129 + #[instrument(skip(resume, trigger, user_agent, db_tx))] 173 130 async fn label_consumer( 174 131 resume: sled::Db, 175 132 mut trigger: WatchReceiver<Instant>, 176 - service_did: String, 177 133 service: String, 178 134 user_agent: String, 179 135 db_tx: Sender<AtpLabel>, 180 136 ) { 181 137 let start_seq = resume 182 - .get(&service_did) 138 + .get(&service) 183 139 .ok() 184 140 .flatten() 185 141 .and_then(crate::utils::u64_from_ivec); 186 142 187 143 if let Some(start_seq) = start_seq { 188 - tracing::info!("starting {service_did} label consumer from {start_seq}"); 144 + tracing::info!("starting {service} label consumer from {start_seq}"); 189 145 } 190 146 191 147 let mut consumer = match FirehoseConsumer::new_labeler(&service, start_seq, &user_agent).await { ··· 199 155 loop { 200 156 tokio::select! { 201 157 _ = trigger.changed() => { 202 - resume.insert(&service_did, &consumer.current_seq().to_le_bytes()).unwrap(); 158 + resume.insert(&service, &consumer.current_seq().to_le_bytes()).unwrap(); 203 159 } 204 160 Ok(output) = consumer.drive() => { 205 161 match output { ··· 211 167 } 212 168 FirehoseOutput::Event(event) => { 213 169 if let FirehoseEvent::Label(label_event) = *event { 214 - let count = label_event.labels.len() as u64; 215 - 216 170 for label in label_event.labels { 217 - if label.src != service_did { 218 - tracing::warn!("labeler sent incorrect src"); 219 - continue; 220 - } 171 + counter!("seen_labels", "service" => label.src.clone()).increment(1); 221 172 222 173 if let Err(e) = db_tx.send(label).await { 223 174 tracing::error!("Failed to send label event: {e:?}"); 224 175 return; 225 176 } 226 177 } 227 - 228 - counter!("seen_labels", "service" => service_did.clone()).increment(count); 229 178 } else { 230 179 tracing::warn!("got non #label event from a labeler"); 231 180 } ··· 235 184 } 236 185 } 237 186 } 238 - 239 - async fn resolve_service_endpoint(resolver: &Resolver, did: &str) -> eyre::Result<String> { 240 - // resolve the did to a PDS (also validates the handle) 241 - let Some(did_doc) = resolver.resolve_did(did).await? else { 242 - eyre::bail!("missing did doc"); 243 - }; 244 - 245 - let Some(service) = did_doc.service.and_then(|services| { 246 - services 247 - .into_iter() 248 - .find(|svc| svc.id == LABELER_SERVICE_ID) 249 - }) else { 250 - eyre::bail!("DID doc contained no service endpoint"); 251 - }; 252 - 253 - Ok(service.service_endpoint) 254 - }
+4 -3
consumer/src/main.rs
··· 59 59 if cli.labels { 60 60 let resume = resume.clone().unwrap(); 61 61 62 - let (label_mgr, _label_svc_tx) = label_indexer::LabelServiceManager::new( 62 + let label_mgr = label_indexer::LabelServiceManager::new( 63 63 pool.clone(), 64 - resolver.clone(), 65 64 resume, 66 65 user_agent.clone(), 67 66 ) 68 67 .await?; 69 68 70 - join_set.spawn(label_mgr.run(conf.initial_label_services)); 69 + if let Some(label_source) = conf.label_source { 70 + join_set.spawn(label_mgr.run(label_source)); 71 + } 71 72 } 72 73 73 74 if cli.backfill {